zepgram / module-multi-threading

zepgram/module-multi-threading

This module is a powerful tool for developers who want to process large data sets in a short amount of time

  • Benjamin Calef
magento2-module Compatibility: 2.4.7-2.4.8 Code Quality: Fail Tests: N/A Security: Pass MIT

Multi-Threading for Magento 2

This module is a powerful tool for developers who want to process large data sets in
a short amount of time. It allows you to process large collections of data in parallel
using multiple child processes, improving performance and reducing processing time.

Installation

composer require zepgram/module-multi-threading
bin/magento module:enable Zepgram_MultiThreading
bin/magento setup:upgrade

Usage

These classes allows you to process a search criteria, a collection or an array using multi-threading.

ForkedSearchResultProcessor

use Zepgram\MultiThreading\Model\ForkedSearchResultProcessor;
use Magento\Catalog\Api\ProductRepositoryInterface;
use Magento\Framework\Api\SearchCriteriaBuilder;

class MyAwesomeClass
{
    /** @var ForkedSearchResultProcessor */
    private $forkedSearchResultProcessor;
    
    /** @var ProductRepositoryInterface */
    private $productRepository;
    
    public function __construct(
        ForkedSearchResultProcessor $forkedSearchResultProcessor,
        ProductRepositoryInterface $productRepository,
        SearchCriteriaBuilder $searchCriteriaBuilder 
    ) {
        $this->forkedSearchResultProcessor = $forkedSearchResultProcessor;
        $this->productRepository = $productRepository;
        $this->searchCriteriaBuilder = $searchCriteriaBuilder;
    }
    
    $searchCriteria = $this->searchCriteriaBuilder->create();
    $productRepository = $this->productRepository;
    $callback = function ($item) {
        $item->getData();
        // do your business logic here
    };
    
    $this->forkedSearchResultProcessor->process(
        $searchCriteria,
        $productRepository,
        $callback,
        $pageSize = 1000,
        $maxChildrenProcess = 10,
        $isIdempotent = true
    );
}

ForkedCollectionProcessor

use Zepgram\MultiThreading\Model\ForkedCollectionProcessor;
use Magento\Catalog\Model\ResourceModel\Product\CollectionFactory;

class MyAwesomeClass
{
    /** @var ForkedCollectionProcessor */
    private $forkedCollectionProcessor;

    public function __construct(
        ForkedCollectionProcessor $forkedCollectionProcessor,
        CollectionFactory $collectionFactory
    ) {
        $this->forkedCollectionProcessor = $forkedCollectionProcessor;
        $this->collectionFactory = $collectionFactory;
    }

    $collection = $this->collectionFactory->create();
    $callback = function ($item) {
        $item->getData();
        // do your business logic here
    };

    $this->forkedCollectionProcessor->process(
        $collection,
        $callback,
        $pageSize = 1000,
        $maxChildrenProcess = 10,
        $isIdempotent = true
    );
}

ForkedArrayProcessor

This class allows you to process an array of data using multi-threading.

use Zepgram\MultiThreading\Model\ForkedArrayProcessor;

class MyAwesomeClass
{
    /** @var ForkedArrayProcessor */
    private $forkedArrayProcessor;
    
    public function __construct(ForkedArrayProcessor $forkedArrayProcessor)
    {
        $this->forkedArrayProcessor = $forkedArrayProcessor;
    }
    
    $array = [1,2,3,4,5,...];
    $callback = function ($item) {
        echo $item;
        // do your business logic here
    };
    
    $this->forkedArrayProcessor->process(
        $array,
        $callback,
        $pageSize = 2,
        $maxChildrenProcess = 2
    );
}

ParallelStoreProcessor or ParallelWebsiteProcessor

use Zepgram\MultiThreading\Model\Dimension\ParallelStoreProcessor;
use Magento\Catalog\Model\ResourceModel\Product\Collection;
use Magento\Catalog\Model\ResourceModel\Product\CollectionFactory;

class MyAwesomeClass
{
    /** @var ParallelStoreProcessor */
    private $parallelStoreProcessor;
    
    /** @var CollectionFactory */
    private $collectionFactory;
    
    public function __construct(
        ParallelStoreProcessor $parallelStoreProcessor,
        CollectionFactory $collectionFactory
    ) {
        $this->parallelStoreProcessor = $parallelStoreProcessor;
        $this->collectionFactory = $collectionFactory;
    }
    
    $array = [1,2,3,4,5,...];
    $callback = function (StoreInterface $store) {
        // retrieve data from database foreach stores (do not load the collection !)
        $collection = $this->collectionFactory->create();
        $collection->addFieldToFilter('type_id', 'simple')
            ->addFieldToSelect(['sku', 'description', 'created_at'])
            ->setStoreId($store->getId())
            ->addStoreFilter($store->getId())
            ->distinct(true);
            
        // handle pagination system to avoid memory leak
        $currentPage = 1;
        $pageSize = 1000;
        $collection->setPageSize($pageSize);
        $totalPages = $collection->getLastPageNumber();
        while ($currentPage <= $totalPages) {
            $collection->clear();
            $collection->setCurPage($currentPage);
            foreach ($collection->getItems() as $product) {
                // do your business logic here
            }
            $currentPage++;
        }
    };
    
    // your collection will be processed foreach store by a dedicated child process
    $this->parallelStoreProcessor->process(
        $callback,
        $maxChildrenProcess = null,
        $onlyActiveStores = true,
        $withDefaultStore = false
    );
}

bin/magento thread:processor command

This command allows running a command indefinitely in a dedicated thread using
the Process Symfony Component.

bin/magento thread:processor <command_name> [command_args...] [--timeout=<timeout>] [--iterations=<iterations>] [--delay=<delay>] [--environment=<environment>] [--progress] [--fail-on-loop] [--ignore-exit-code]

Options

  • timeout: Define the process timeout in seconds (default: 300)
  • iterations: Define the number of iteration (default: 0)
  • delay: Define the delay in ms between each iteration (default: 0)
  • environment: Set environment variables separate by comma
  • progress: Show progress bar while executing command
  • fail-on-loop: Stop the iteration loop at first failure
  • ignore-exit-code: Return success even when some iterations fail (a warning summary is logged)

How it works

The thread:processor command creates a dedicated child process to execute existing command line.
The child process runs the command specified by the user, while the parent process
monitors the child process and can act accordingly. You can define iterations and execute the same command
multiple times with a dedicated child foreach execution.

By default, the wrapper returns a non-zero exit code when at least one iteration fails.
This keeps monitoring behavior consistent with real command outcomes.
If your workflow is retry-driven and you want eventual consistency over strict exit codes,
use --ignore-exit-code to return success with a warning summary.

The ForkedSearchResultProcessor,ForkedCollectionProcessor and ForkedArrayProcessor classes
use a similar approach to process a search criteria or a collection. The process is divided
into several pages, and for each page, a child process is created to run the callback
function specified by the user on each item of that page.

The ParallelStoreProcessor and ParallelWebsiteProcessor classes are designed to make it easier
to process a list of stores or websites in parallel. To use either of these classes, you'll need to
provide a callback function that will be called for each store or website in the list. The callback
function should take one parameter, which will be a single store or website object.

Each store or website will be passed to the callback function in a separate process,
allowing faster processing times.
The number of children process cannot exceed the number of stores or websites: for example,
if you have 10 stores, the maximum number of child processes that can be created in parallel is 10.

Here is a breakdown of the parameters:

  • $collection/$searchCriteria/$array: The first parameter is the data source,
    either a Magento\Framework\Api\SearchCriteriaInterface for ForkedSearchResultProcessor or
    a Magento\Framework\Data\Collection for ForkedCollectionProcessor, and an array
    for ForkedArrayProcessor

  • $callback: This parameter is a callable that will be executed on each item of the collection.
    It is a callback function that is passed the current item from the collection to be processed.
    This function should contain the business logic that should be executed on each item.

  • $pageSize: This parameter is used to set the number of items per page.
    It is used to paginate the collection so that it can be processed in smaller chunks.

  • $maxChildrenProcess: This parameter is used to set the maximum number of child
    processes that can be run simultaneously. This is used to control the number of threads
    that will be used by the multi-threading process. If set to 1, by definition you will have no parallelization,
    the parent process will wait the child process to finish before creating another one.

  • reconnectDatabaseInChild (constructor argument on ForkedProcessor): disabled by default.
    Keep this disabled when the parent process relies on MySQL connection-scoped temporary table
    state. This is not specific to mview: it also applies to workloads that depend on temporary
    tables that are explicit, implicit, or otherwise virtual to the current DB session. If your
    workload requires fresh child DB connections, you can enable it explicitly through DI.

    Example (etc/di.xml):

    <type name="Zepgram\MultiThreading\Model\Processor\ForkedProcessor">
        <arguments>
            <argument name="reconnectDatabaseInChild" xsi:type="boolean">true</argument>
        </arguments>
    </type>
    

    Use true for standalone forked workloads where child processes should always open fresh DB
    connections. Keep false when processing logic depends on DB session-scoped temporary state.

  • $isIdempotent: This parameter is a flag set to true by default and can be used for ForkedSearchResultProcessor
    or ForkedCollectionProcessor when your $maxChildrenProcess is greater than one.
    While fetching data from database with ForkedSearchResult and ForkedCollectionProcessor you may change values
    queried: by modifying items on columns queried you will change the nature of the initial collection query and at the end,
    the OFFSET limit in the query will be invalid because the native pagination system expect the pagination to be
    processed by only one process. To avoid that, set $isIdempotent to false.

    E.G.: In your collection query, you request all products disabled, in your callback method you enable and save
    them in database, then in this particular case you are modifying the column that you request in your collection,
    your query is not idempotent.

Memory Limit

This module allows to bypass the limitation of the memory limit, because the memory
limit is reset on each child process creation. This means that even if the memory limit
is set to a low value, this module can still process large amounts of data without
running out of memory. However, it is important to keep in mind that this also means
that the overall resource usage will be higher, so it is important to monitor the
system and adjust the parameters accordingly.

Limitations

This module uses pcntl_fork() function which is not available on Windows.

Conclusion

This module provides a useful tool for running commands or processing collections
and search criteria in a multi-threaded way, making it a great solution for improving
performance and reducing execution time.
The module is easy to install and use, and provides options for controlling the number
of child processes, timeout, and environment variables.

Disclaimer

The Multi-Threading for Magento 2 module is provided as is, without any guarantees or warranties.
While this module has been tested and is believed to be functional, it is important to note
that the use of multi-threading in PHP can be complex and may have unintended consequences.
As such, it is the responsibility of the user of this module to thoroughly test it in a
development environment before deploying it to a production environment.
I decline all responsibility for any issues or damages that may occur as a result of using
this module. With great power comes great responsibility, use it wisely.

Changelog

All notable changes to this project will be documented in this file.

[0.3.0] - 2026-02-07

Fixed

  • thread:processor exit status masking: command now returns failure when at least one wrapped iteration fails.
  • thread:processor argument passthrough: command now supports command arguments (including whitespace command strings like "help cache:clean").
  • thread:processor memory pressure on output-heavy commands: output is now flushed incrementally while child process is running instead of only at process end.
  • invalid max children configuration: ForkedProcessorRunner, ParallelStoreProcessor, and ParallelWebsiteProcessor now reject maxChildrenProcess <= 0.
  • dimension processors with empty inputs: store/website processors now return early without running the forked runner when no targets exist.

Added

  • thread:processor --fail-on-loop option to break iteration loop after first failure.
  • thread:processor --ignore-exit-code option to force success exit code while emitting a warning summary.
  • thread:processor command_args array argument.

Changed

  • ForkedProcessor fallback now targets explicitly failed pages instead of all non-completed pages.
  • ForkedProcessor supports configurable child DB reconnect behavior through constructor argument reconnectDatabaseInChild (now opt-in; default false to preserve DB session compatibility for temporary-table-based workloads).
  • ForkedProcessor compatibility mode (default) now terminates child workers with signals to avoid PHP child shutdown closing parent DB session state used by temporary-table-based workloads.

[0.2.0] - 2026-01-26

Fixed

  • Fix incorrect exit status check that caused false error logs
  • Enable pcntl_async_signals(true) for proper signal handling
  • Replace unreliable self-SIGKILL shutdown with proper exit codes
  • Add pcntl_wait return value check to prevent infinite loops
  • Fix typo isIdemPotent -> isIdempotent in interface
  • Fix @inheirtDoc -> @inheritDoc typos
Versions
Version Stability QA Status Compatibility Released
0.3.0 stable Fail Magento 2.4.7-2.4.8 Details 2026-02-08 09:55:34
0.1.9 stable Not tested Not yet tested Details 2025-07-19 08:20:08
0.1.8 stable Not tested Not yet tested Details 2025-07-09 17:15:38
0.1.7 stable Not tested Not yet tested Details 2025-07-09 16:33:53
0.1.6 stable Not tested Not yet tested Details 2023-12-13 15:37:04
0.1.5 stable Not tested Not yet tested Details 2023-12-08 14:48:08
0.1.4 stable Not tested Not yet tested Details 2023-10-16 16:23:44
0.1.3 stable Not tested Not yet tested Details 2023-04-11 10:59:22
0.1.2 stable Not tested Not yet tested Details 2023-03-07 10:01:50
0.1.1 stable Not tested Not yet tested Details 2023-02-28 23:22:08
0.1.0 stable Not tested Not yet tested Details 2023-02-23 12:27:18

Requires 4

Package Constraint
ext-pcntl *
ext-posix *
magento/framework ^101.0.0|^102.0.0|^103.0.0
magento/module-store ^101

Compatibility

Each Magento release line is installed on its supported PHP versions, then the module is built (DI compilation + static-content deploy) and its unit and integration suites are run. The matrix shows the lines and PHP versions the module is confirmed to install and run on. Code-quality results further down (phpstan, phpcs, …) are reported separately and never affect compatibility.

Compatibility matrix (Magento × PHP)
Magento PHP 8.2 PHP 8.3 PHP 8.4 PHP 8.5
2.4.7 Pass Pass
2.4.8 Pass Pass
2.4.9 Fail di error Fail di error

Code Quality

Advisory checks against the module's source. Static analysis runs once across the whole module; PHPStan re-runs per Magento + PHP version because resolvable symbols differ between releases. These NEVER affect the Compatibility badge — a phpcs finding can't make a module incompatible.

Static analysis

Coding standards (phpcs), mess detection (phpmd), copy-pasted code (cpd), PHP cross-version compatibility, composer.json validity. Each runs once for the whole module.

Static analysis results
Tool Status Findings Summary
PHPCS Fail 49 1 error, 48 warnings (ruleset: Magento2) — 8 auto-fixable with phpcbf
PHPMD Warning 14 14 rule violations (CyclomaticComplexity:3, MissingImport:3, NPathComplexity:2, ExcessiveMethodLength:2, UnusedFormalParameter:2)
Cpd Pass 0
Composer validate Info 1 valid; 1 advisory note (composer validate --strict)

PHPStan

Type-checks the module's PHP against a real Magento install at the configured gate level. Re-runs per Magento and PHP version because resolvable symbols differ between releases. Cell → details modal.

PHPStan results by Magento and PHP version
Magento PHP 8.2 PHP 8.3 PHP 8.4 PHP 8.5
2.4.7 9 9
2.4.8 9 9
2.4.9 12 12

Tests

Unit and integration suites, run for each applicable Magento and PHP version. A test failure speaks to the module's behaviour, not its compatibility with a Magento line, so it is reported here separately and never reddens the compatibility matrix.

Unit tests

Unit tests results by Magento and PHP version
Magento PHP 8.2 PHP 8.3 PHP 8.4 PHP 8.5
2.4.7 N/A N/A
2.4.8 N/A N/A
2.4.9 N/A N/A

Integration tests

Integration tests results by Magento and PHP version
Magento PHP 8.2 PHP 8.3 PHP 8.4 PHP 8.5
2.4.7 N/A N/A
2.4.8 N/A N/A
2.4.9 N/A N/A

Security

Security checks run directly against the module: an audit of its declared dependencies for known vulnerabilities (composer audit) and a scan of its source for malware and web-shell signatures. Each runs once. A malware detection fails the version outright.

Security results
Tool Status Findings Summary
Composer audit Pass 0
Malware scan Pass 0
License
MIT
Authors

More from zepgram

View vendor
Make it pay

Turn an existing module into recurring revenue.

If you already maintain a Magento 2 module on GitHub or GitLab, listing it on Packagento takes about five minutes. We mirror your tags, handle distribution signing, and route paid licenses through Stripe Connect, so you can keep shipping the way you already do.