How to create a new Connector¶
Sometimes native connectors won’t cover all your needs, you will need to write your own.
A connector is an ensemble of jobs able to import and export data using a specific format. Each job is composed of several steps.
Note
For more details about these concepts, see import/export Import and Export data.
Let’s say we want to create a connector that can export CSV data (like the native one), but at the end of each export we want to notify another application. We also want the notification to contain the path to the directory of the exported file.
Note
Here we use a very simple case to have overview of the connectors, for more complex cases (like adding support for new file formats) you can refer to the next chapters.
Configure a job¶
Jobs and steps are actually Symfony services. The first thing we need is to declare a new service for our product export job:
1services:
2 acme_notifyconnector.csv_product_export_notify:
3 class: 'Akeneo\Tool\Component\Batch\Job\Job'
4 arguments:
5 - 'csv_product_export_notify' # Job name
6 - '@event_dispatcher'
7 - '@akeneo_batch.job_repository'
8 -
9 - '@pim_connector.step.csv_product.export'
10 - true # Does the job is stoppable ?
11 tags:
12 - { name: akeneo_batch.job, connector: 'Acme CSV Notify Connector', type: 'export' }
Warning
Make sure that the file containing your declaration is correctly loaded by your bundle extension. For more info please see the Symfony documentation.
As you can see there is almost no difference with the native CSV export job.
The only new info here is the name (first parameter) and the connector name (the connector
property of the tag).
How can we add our notification behaviour to this job? The simplest way is to write a new step that will be executed after the export step.
Add a new step¶
A step class needs two things: extend Akeneo\Tool\Component\Batch\Step\AbstractStep
and implement a doExecute()
method. This method will contain your custom behavior:
1<?php
2
3namespace Acme\Bundle\NotifyConnectorBundle\Step;
4
5use Akeneo\Tool\Component\Batch\Step\AbstractStep;
6use Akeneo\Tool\Component\Batch\Model\StepExecution;
7
8class NotifyStep extends AbstractStep
9{
10 protected function doExecute(StepExecution $stepExecution)
11 {
12 // inject the step execution in the step item to be able to log summary info during execution
13 $jobParameters = $stepExecution->getJobParameters();
14
15 $directory = dirname($jobParameters->get('storage')['file_path']);
16 $fields = sprintf('directory=%s', urlencode($directory));
17 $url = $jobParameters->get('urlToNotify');
18
19 $ch = curl_init();
20 curl_setopt($ch, CURLOPT_URL, $url);
21 curl_setopt($ch, CURLOPT_POST, true);
22 curl_setopt($ch, CURLOPT_POSTFIELDS, $fields);
23
24 if (false !== curl_exec($ch)) {
25 $stepExecution->addSummaryInfo('notified', 'yes');
26 } else {
27 $stepExecution->addSummaryInfo('notified', 'no');
28 $stepExecution->addError('Failed to call target URL: '.curl_error($ch));
29 }
30
31 curl_close($ch);
32 }
33}
We can now declare the step as a service:
1services:
2 acme_notifyconnector.step.notify:
3 class: 'Acme\Bundle\NotifyConnectorBundle\Step\NotifyStep'
4 arguments:
5 - 'notify' # The step name
6 - '@event_dispatcher'
7 - '@akeneo_batch.job_repository'
And add it to the job we previously declared:
1services:
2 acme_notifyconnector.csv_product_export_notify:
3 class: 'Akeneo\Tool\Component\Batch\Job\Job'
4 arguments:
5 - 'csv_product_export_notify' # Job name
6 - '@event_dispatcher'
7 - '@akeneo_batch.job_repository'
8 -
9 - '@pim_connector.step.csv_product.export'
10 - '@acme_notifyconnector.step.notify' # You new custom step
11 - true # Does the job is stoppable ?
12 tags:
13 - { name: akeneo_batch.job, connector: 'Acme CSV Notify Connector', type: 'export' }
Tip
Thanks to Symfony’s dependency injection, it’s quite easy to reuse a step for several jobs. For example, our notification step can be added to any export job just by putting it in the job service declaration.
Configure a job instance¶
A job can be seen as a template, it cannot be executed on its own: it needs parameters.
For example our new job needs file_path
and urlToNotify
parameters to work properly (plus the ones needed by the native export step).
Each set of parameters for a given job is called a job instance.
A job instance can be executed, modified or deleted using the UI or the akeneo:batch:*
Symfony commands.
A job also needs a way to get default values for parameters and a way to validate this parameters.
Let’s write it! For convenience reasons we can use the same class for both roles, it must then implement both Akeneo\Tool\Component\Batch\Job\JobParameters\DefaultValuesProviderInterface
and Akeneo\Tool\Component\Batch\Job\JobParameters\ConstraintCollectionProviderInterface
.
We want also to keep the default values and validation constraints needed by the native export step. The easiest way to do that is to use the decoration pattern:
1<?php
2
3namespace Acme\Bundle\NotifyConnectorBundle\JobParameters;
4
5use Akeneo\Tool\Component\Batch\Job\JobInterface;
6use Akeneo\Tool\Component\Batch\Job\JobParameters\ConstraintCollectionProviderInterface;
7use Akeneo\Tool\Component\Batch\Job\JobParameters\DefaultValuesProviderInterface;
8use Symfony\Component\Validator\Constraints\Collection;
9use Symfony\Component\Validator\Constraints\Url;
10
11class ProductCsvExportNotify implements
12 ConstraintCollectionProviderInterface,
13 DefaultValuesProviderInterface
14{
15 /** @var DefaultValuesProviderInterface */
16 private $baseDefaultValuesProvider;
17
18 /** @var ConstraintCollectionProviderInterface */
19 private $baseConstraintCollectionProvider;
20
21 /** @var string[] */
22 private $supportedJobNames;
23
24 /**
25 * @param DefaultValuesProviderInterface $baseDefaultValuesProvider
26 * @param ConstraintCollectionProviderInterface $baseConstraintCollectionProvider
27 * @param string[] $supportedJobNames
28 */
29 public function __construct(
30 DefaultValuesProviderInterface $baseDefaultValuesProvider,
31 ConstraintCollectionProviderInterface $baseConstraintCollectionProvider,
32 array $supportedJobNames
33 ) {
34 $this->baseDefaultValuesProvider = $baseDefaultValuesProvider;
35 $this->baseConstraintCollectionProvider = $baseConstraintCollectionProvider;
36 $this->supportedJobNames = $supportedJobNames;
37 }
38
39 /**
40 * {@inheritdoc}
41 */
42 public function getDefaultValues()
43 {
44 return array_merge(
45 $this->baseDefaultValuesProvider->getDefaultValues(),
46 ['urlToNotify' => 'http://']
47 );
48 }
49
50 /**
51 * {@inheritdoc}
52 */
53 public function getConstraintCollection()
54 {
55 $baseConstraints = $this->baseConstraintCollectionProvider->getConstraintCollection();
56 $constraintFields = array_merge(
57 $baseConstraints->fields,
58 ['urlToNotify' => new Url()]
59 );
60
61 return new Collection(['fields' => $constraintFields]);
62 }
63
64 /**
65 * {@inheritdoc}
66 */
67 public function supports(JobInterface $job)
68 {
69 return in_array($job->getName(), $this->supportedJobNames);
70 }
71}
Tip
If the job doesn’t need any particular parameters, it’s possible to use directly the classes Akeneo\Tool\Component\Batch\Job\JobParameters\EmptyDefaultValuesProvider
and Akeneo\Tool\Component\Batch\Job\JobParameters\EmptyConstraintCollectionProvider
.
1services:
2 acme_notifyconnector.job.job_parameters.csv_product_export_notify:
3 class: 'Acme\Bundle\NotifyConnectorBundle\JobParameters\ProductCsvExportNotify'
4 arguments:
5 - '@pim_connector.job.job_parameters.default_values_provider.product_csv_export'
6 - '@pim_connector.job.job_parameters.constraint_collection_provider.product_csv_export'
7 - ['%acme_notifyconnector.job_name.csv_product_export_notify%']
8 tags:
9 - { name: akeneo_batch.job.job_parameters.constraint_collection_provider }
10 - { name: akeneo_batch.job.job_parameters.default_values_provider }
Your job instances parameters can now be populated by default and validated.
Create a job instance via the command¶
We can create an instance with the following command:
php bin/console cache:clear
# akeneo:batch:create-job <connector> <job> <type> <code> <config> [<label>]
php bin/console akeneo:batch:create-job 'Acme CSV Notify Connector' csv_product_export_notify export my_app_product_export '{"urlToNotify": "http://my-app.com/product-export-done"}'
You can also list the existing job instances with the following command:
php bin/console akeneo:batch:list-jobs
Execute our new job instance¶
You can run the job with the following command:
php bin/console akeneo:batch:job my_app_product_export
[2017-04-18 18:43:55] batch.DEBUG: Job execution starting: startTime=, endTime=, updatedTime=, status=2, exitStatus=[UNKNOWN] , exitDescription=[], job=[my_app_product_export] [] []
[2017-04-18 18:43:55] batch.INFO: Step execution starting: id=0, name=[export], status=[2], exitCode=[EXECUTING], exitDescription=[] [] []
[2017-04-18 18:43:55] batch.DEBUG: Step execution success: id= 42 [] []
[2017-04-18 18:43:55] batch.DEBUG: Step execution complete: id=42, name=[export], status=[1], exitCode=[EXECUTING], exitDescription=[] [] []
[2017-04-18 18:43:55] batch.INFO: Step execution starting: id=0, name=[notify], status=[2], exitCode=[EXECUTING], exitDescription=[] [] []
[2017-04-18 18:43:55] batch.DEBUG: Step execution success: id= 43 [] []
[2017-04-18 18:43:55] batch.DEBUG: Step execution complete: id=43, name=[notify], status=[1], exitCode=[EXECUTING], exitDescription=[] [] []
[2017-04-18 18:43:55] batch.DEBUG: Upgrading JobExecution status: startTime=2017-04-18T16:43:55+00:00, endTime=, updatedTime=, status=3, exitStatus=[UNKNOWN] , exitDescription=[], job=[my_app_product_export] [] []
Export my_app_product_export has been successfully executed.
The --config
option can be used to override the job instance parameters at runtime, for instance, to change the file path:
php bin/console akeneo:batch:publish-job-to-queue csv_product_import -c '{"storage":{"type":"local","file_path":"/test.csv"}}'
Warning
In production, use this command instead:
php bin/console akeneo:batch:publish-job-to-queue my_app_product_export --env=prod
One daemon or several daemon processes have to be started to execute the jobs. Please follow the documentation Setting up the job queue daemon if it’s not the case.
Configure the UI for our new job¶
At this point the job instance is usable in command line, but it cannot be configured via the UI.
Since our job is based on the native Product CSV export job, we can copy and paste the native configuration files, then customize it.
We need to provide a form name to the frontend to be able to render it properly. If your connector doesn’t require extra fields, you can use the basic forms shipped with Akeneo. There are actually two forms for each job: one for edit mode and one for view mode. This way we can tune very finely what is displayed for each mode.
For our form we’ll need to copy:
vendor/akeneo/pim-community-dev/src/Akeneo/Platform/Bundle/UIBundle/Resources/config/form_extensions/job_instance/csv_product_export_edit.yml
tosrc/Acme/Bundle/NotifyConnectorBundle/Resources/config/form_extensions/job_instance/csv_product_export_notify_edit.yml
vendor/akeneo/pim-community-dev/src/Akeneo/Platform/Bundle/UIBundle/Resources/config/form_extensions/job_instance/csv_product_export_show.yml
tosrc/Acme/Bundle/NotifyConnectorBundle/Resources/config/form_extensions/job_instance/csv_product_export_notify_show.yml
Now replace all occurrence of csv-product-export
in these files by, let’s say, csv-product-export-notify
.
Indeed, each key in form configuration files must be unique across the whole application.
Note
We are aware that this is not an ideal solution and we’re working on a more satisfactory way to handle relations between forms. If you have any idea feel free to propose it or even write a contribution!
Now we need to declare a provider to link your job to the right form root:
1services:
2 acme_notifyconnector.provider.form.job_instance:
3 class: 'Akeneo\Platform\Bundle\ImportExportBundle\Provider\Form\JobInstanceFormProvider'
4 arguments:
5 -
6 csv_product_export_notify: pim-job-instance-csv-product-export-notify
7 tags:
8 - { name: pim_enrich.provider.form }
Tip
Of course, if your job doesn’t require any extra fields you don’t need to use a specific form configuration.
Just specify the root of the native form in your provider (that would be pim-job-instance-csv-product-export
in our case).
Add a new field to the job instance form¶
For now we have the same form for our job than the native one. We still need to add a field to be able to configure the target URL.
To do that, we need to register a new view in our form, representing the new field:
1 # src/Acme/Bundle/NotifyConnectorBundle/Resources/config/form_extensions/csv_product_export_notify_edit.yml
2 pim-job-instance-csv-product-export-notify-edit-properties-url-to-notify:
3 module: pim/job/common/edit/field/text
4 parent: pim-job-instance-csv-product-export-notify-edit-properties
5 position: 190
6 targetZone: properties
7 config:
8 fieldCode: configuration.urlToNotify
9 readOnly: false
10 label: acme.form.job_instance.tab.properties.url_to_notify.title
11 tooltip: acme.form.job_instance.tab.properties.url_to_notify.help
1 # src/Acme/Bundle/NotifyConnectorBundle/Resources/config/form_extensions/csv_product_export_notify_show.yml
2 pim-job-instance-csv-product-export-notify-show-properties-url-to-notify:
3 module: pim/job/common/edit/field/text
4 parent: pim-job-instance-csv-product-export-notify-show-properties
5 position: 190
6 targetZone: properties
7 config:
8 fieldCode: configuration.urlToNotify
9 readOnly: true
10 label: acme.form.job_instance.tab.properties.url_to_notify.title
Job form fields need special properties defined under the config
key:
fieldCode: The path to the data inside the form model. It’s usually
configuration.myParam
, withmyParam
being the key you use in the default values provider, constraint collection provider, and in your custom steps.readOnly: Is this field in read only mode?
label: The translation key for the field label.
tooltip: The translation key for the help tooltip.
Note
Here we used the very simple text field for our needs (pim/job/common/edit/field/text
module).
You can also use other fields natively available in the PIM or, if you have more specific needs, create your own field.
Now we can create and edit job instances via the UI using the menu “Spread > Export profiles” then “Create export profile” button.
Add a tab to the job edit form¶
Let’s say that we would like to add a custom tab to our job edit form in order to manage field mappings.
First, we need to create a Form extension in our bundle:
1'use strict';
2/*
3 * /src/Acme/Bundle/EnrichBundle/Resources/public/js/job/product/edit/mapping.js
4 */
5define(['pim/form'],
6 function (BaseForm) {
7 return BaseForm.extend({
8 configure: function () {
9 this.trigger('tab:register', {
10 code: this.code,
11 isVisible: this.isVisible.bind(this),
12 label: 'Mapping'
13 });
14
15 return BaseForm.prototype.configure.apply(this, arguments);
16 },
17 render: function () {
18 this.$el.html('Hello world');
19
20 return this;
21 },
22 isVisible: function () {
23 return true;
24 }
25 });
26 }
27);
For now this is a dummy extension, but this is a good start!
Let’s register this file in the requirejs
configuration
1# /src/Acme/Bundle/EnrichBundle/Resources/config/requirejs.yml
2
3config:
4 paths:
5 pim/job/product/edit/mapping: acmeenrich/js/job/product/edit/mapping
Now that our file is registered in requirejs
configuration, we can add this extension to the product edit form:
1# /src/Acme/Bundle/EnrichBundle/Resources/config/form_extensions/job_instance/csv_product_export_edit.yml
2
3extensions:
4 pim-job-instance-csv-product-export-notify-edit-mapping: # The form extension code (can be whatever you want)
5 module: pim/job/product/edit/mapping # The requirejs module we just created
6 parent: pim-job-instance-csv-product-export-notify-edit-tabs # The parent extension in the form where we want to be registered
7 aclResourceId: pim_importexport_export_profile_mapping_edit # The user will need this ACL for this extension to be registered
8 targetZone: container
9 position: 140 # The extension position
10 config:
11 tabTitle: acme_enrich.form.job_instance.tab.mapping.title
12 tabCode: pim-job-instance-mapping
13
14
15# /src/Acme/Bundle/EnrichBundle/Resources/config/form_extensions/job_instance/csv_product_export_show.yml
16
17extensions:
18 pim-job-instance-csv-product-export-notify-show-mapping: # The form extension code (can be whatever you want)
19 module: pim/job/product/show/mapping # The requirejs module we just created
20 parent: pim-job-instance-csv-product-export-notify-show-tabs # The parent extension in the form where we want to be registered
21 aclResourceId: pim_importexport_export_profile_mapping_show # The user will need this ACL for this extension to be registered
22 targetZone: container
23 position: 140 # The extension position
24 config:
25 tabTitle: acme_enrich.form.job_instance.tab.mapping.title
26 tabCode: pim-job-instance-mapping
To see your changes in the new tab in the job edit form you need to run:
bin/console cache:clear
bin/console --env=prod pim:installer:assets --symlink --clean
yarn run webpack
If you don’t see your changes, make sure you have run (bin/console assets:install --symlink web
).
Make a job stoppable¶
In Akeneo 5.0 we added the possibility to declare a Job as stoppable. But as some task may be not stoppable (important workflows that always need to finish, job depending on other steps, etc) we decided to make this new feature “opt in”. In this documentation, we will explain you how to make your job stoppable.
Note
If your job is based on the Akeneo\Tool\Component\Batch\Job\Job
class, you have to adapt your job service definition by setting the last property of the constructor to “true” (as shown in the example below).
Note
If your job is composed of an Akeneo\Tool\Component\Batch\Step\ItemStep
instance, you should not have to do anything regarding the steps of your jobs as the stopping mechanism is already built in the ItemStep class.
To make this action available from the UI, you need to make sure of a few things:
You need to make sure your custom job implements
Akeneo\Tool\Component\Batch\Job\StoppableJobInterface
and set the last property of the constructor to “true”. See the following job service definition example:
1pimee_catalog_rule.job.xlsx_product_import_with_rules:
2 class: 'Akeneo\Tool\Component\Batch\Job\Job'
3 arguments:
4 - '%pimee_catalog_rule.job_name.xlsx_product_import_with_rules%'
5 - '@event_dispatcher'
6 - '@akeneo_batch.job_repository'
7 -
8 - '@pim_connector.step.charset_validator'
9 - '@pimee_catalog_rule.step.xlsx_product.import'
10 - '@pimee_catalog_rule.step.xlsx_product.import_associations'
11 - '@pimee_catalog_rule.step.xlsx_product.execute_rules'
12 - true # <-- This property should be true if the job is stoppable
13 tags:
14 - { name: akeneo_batch.job, connector: '%pim_connector.connector_name.xlsx%', type: '%pim_connector.job.import_type%' }
If your job uses tasklets (by implementing
TaskletInterface
) to execute your business logic. You can inject the service “akeneo_batch.job.job_stopper” directly in your custom tasklets and use the function JobStopper::isStopping to know if user asked to stop the job. If this function return true, you should use the function JobStopper::stop and exit your Tasklet.
The following example shows a simple tasklet able to stop when a user stops the job from the UI.
1<?php
2declare(strict_types=1);
3
4namespace Acme\Bundle\StoppableJobBundle;
5
6class TrackableTasklet implements TaskletInterface
7{
8 private const BATCH_SIZE = 100;
9
10 protected ?StepExecution $stepExecution = null;
11 protected FindItemsToProcess $findItemsToProcess;
12 protected JobStopper $jobStopper;
13
14 public function __construct(
15 FindItemsToProcess $findItemsToProcess,
16 JobStopper $jobStopper
17 ) {
18 $this->jobStopper = $jobStopper;
19 $this->findItemsToProcess = $findItemsToProcess;
20 }
21
22 public function setStepExecution(StepExecution $stepExecution): void
23 {
24 $this->stepExecution = $stepExecution;
25 }
26
27 public function execute(): void
28 {
29 $itemsToProcess = $this->FindItemsToProcess->find();
30 foreach ($itemsToProcess as $i => $itemToProcess) {
31 $itemToProcess->doSomeWork();
32
33 // Check every 100 items if the process should be stopped
34 if ($i % self::BATCH_SIZE === 0
35 && $this->jobStopper->isStopping($this->stepExecution)
36 ) {
37 $this->jobStopper->stop($this->stepExecution);
38
39 return;
40 }
41 }
42 }
43}
Track the progress of a job¶
In Akeneo 5.0 we added the possibility to track the progress of job. But as some task may be not trackable we decided to make this new feature “opt in”. In this documentation, we will explain you how to expose the progress of your job.
Note
If your job is composed of an Akeneo\Tool\Component\Batch\Step\ItemStep
instance, as well as a reader implementing Akeneo\Tool\Component\Batch\Item\TrackableItemReaderInterface
the tracking of your job should already be available in the UI.
If your job uses a custom reader, make sure it implements Akeneo\Tool\Component\Batch\Item\TrackableItemReaderInterface
and exposes the total of number of items that will be processed during the execution of the step.
If your job uses a custom tasklet, we need to make sure of a few additional things:
your tasklet should implement the
Akeneo\Tool\Component\Batch\Item\TrackableTaskletInterface
interfaceat the very beginning of the execution of the tasklet, you need to provide the step execution with the total items your tasklet will process through the
Akeneo\Tool\Component\Batch\Model\StepExecution::setTotalItems
function.during the process of the tasklet, you need to provide the step execution with the progression the tasklet by incrementing a counter through the
Akeneo\Tool\Component\Batch\Model\StepExecution::incrementProcessedItems
function.
The following example shows a simple tasklet updating it’s progress using the step execution.
1<?php
2declare(strict_types=1);
3
4namespace Acme\Bundle\StoppableJobBundle;
5
6class TrackableTasklet implements TaskletInterface, TrackableTaskletInterface
7{
8 protected ?StepExecution $stepExecution = null;
9 protected FindItemsToProcess $findItemsToProcess;
10 protected JobRepositoryInterface $jobRepository;
11
12 public function __construct(
13 FindItemsToProcess $findItemsToProcess,
14 JobRepositoryInterface $jobRepository
15 ) {
16 $this->findItemsToProcess = $findItemsToProcess;
17 $this->jobRepository = $jobRepository;
18 }
19
20 public function setStepExecution(StepExecution $stepExecution): void
21 {
22 $this->stepExecution = $stepExecution;
23 }
24
25 public function execute(): void
26 {
27 // First, let's calculate the total items to process
28 $itemsToProcess = $this->findItemsToProcess->find();
29 $this->stepExecution->setTotalItems($itemsToProcess->count());
30
31 // then, start to process entities
32 // and update the step execution with the progress
33 foreach ($itemsToProcess as $itemToProcess) {
34 $itemToProcess->doSomeWork();
35 $this->stepExecution->incrementProcessedItems();
36 $this->jobRepository->updateStepExecution($this->stepExecution);
37 }
38 }
39
40 public function isTrackable(): bool
41 {
42 return true;
43 }
44}
Warning
Make sure to only call JobStopper::isStopped not too often as it will do a MySQL query. A good way of doing it could be to do it only every 100 loop tour.
Note
You can refer to the Akeneo\Pim\Enrichment\Component\Product\Job\DeleteProductsAndProductModelsTasklet
to see how the tracking is implemented and how the step execution is kept up to date with the progression.
Found a typo or a hole in the documentation and feel like contributing?
Join us on Github!