Zaid
Al-Jubouri

HomeAboutContact

Pitfalls of adding multithreading to a Node.js project using threads.js

Multithreading can be complicated; here are three mistakes I made trying to incorporate it into a Node.js project using threads.js.

As part of my series on random number generation, I mentioned a suite of statistical tests used to determine whether a set of numbers is independent and identically distributed. I wrote these tests using Node.js and TypeScript—for convenience—without much thought given to performance and whether this approach was truly the best available to me.

When I started actually running the tests on real datasets containing a million values, I was hit with a completion time nearing three hours. I thought a possible “easy” fix would be to utilise more CPU power, as it was clear that the bulk of the time was spent running each permutation of the dataset through the statistical tests. This didn’t have to occur synchronously; each array could have its test statistics calculated independently of the others. The worker_threads module has been a stable feature since Node.js v12, and this seemed like a good opportunity to try it out. I knew this would not be a panacea for my performance woes, but it couldn’t make things worse.

Of course, the first thing I did was look for a higher-level library that abstracts away much of the worker threads syntax, and provides the thread-pooling logic I required. The titular threads.js was the package of choice. The documentation is light but functional, and the initial implementation was straightforward. I created a worker class which takes an array of values and runs the statistical tests on said array. A thread pool can then be initialised and tasks can be queued, executing when threads become available. I began to run the program on various datasets and everything looked great—it was utilising all of my CPU’s threads, memory usage was stable, and it finished nearly four times faster in comparison to the original implementation. A closer look at the actual results, however, led to a series of mistakes which meant that I was not nearly done.

Mistake one — shuffling the array in the parent thread

The test suite needs to be run on 10,000 permutations of the original dataset, and this permuting is done using the Fisher–Yates shuffle algorithm. The main loop, after implementing the workers and pools, looked like this:

index.ts
// Start a threads.js pool with a worker and the number of CPU threads to use
const pool = Pool(() => spawn<ExampleWorker>(new Worker('./workers/ExampleWorker')), 12);
for (let i = 0; i < 10000; i++) {
// Shuffle the original array
const shuffledArray = shuffleArray(originalArray);
// Add a task to the pool to run a function on the
// shuffled array and run another function on the result
pool
.queue(async (exampleWorker) => await exampleWorker.doSomething(shuffledArray))
.then((result) => {
const finalResult = doAnotherThingWithThe(result);
});
});
}

When I ran it, I noticed my results were extremely uniform, i.e. every permutation held the exact same comparison to the original dataset. Prefixing DEBUG=threads:* to my run command enabled thread.js’ internal logging, and I could see clearly that the first shuffled array sent to each worker was being reused ad infinitum. I cannot explain why this occurs (and if someone can, I’d love to know), but the resolution was to expose the shuffling method on the worker class and ask the worker to perform it instead of the parent thread:

index.ts
const pool = Pool(() => spawn<ExampleWorker>(new Worker('./workers/ExampleWorker')), 12);
for (let i = 0; i < 10000; i++) {
// shuffledArray will now be different on each loop
const shuffledArray = await pool.queue(
async (exampleWorker) => await exampleWorker.shuffleArray(originalArray)
);
pool
.queue(async (exampleWorker) => await exampleWorker.doSomething(shuffledArray))
.then((result) => {
const finalResult = doAnotherThingWithThe(result);
});
});
}

Problem solved, and onto the next.

Mistake two — updating the counters on Promise fulfilment

This one was mostly due to wishful thinking. As a way of keeping memory efficient, the original, synchronous code would run the statistical tests on a permutation, and when complete it would immediately compare against the original dataset’s results to determine which counter (if any) to increment. I simply moved the same logic to the fulfilment handler:

index.ts
const pool = Pool(() => spawn<ExampleWorker>(new Worker('./workers/ExampleWorker')), 12);
const someCounterZero = 0;
const someCounterOne = 0;
for (let i = 0; i < 10000; i++) {
const shuffledArray = await pool.queue(
async (exampleWorker) => await exampleWorker.shuffleArray(originalArray)
);
pool
.queue(async (exampleWorker) => await exampleWorker.doSomething(shuffledArray))
.then((result) => {
const finalResult = doAnotherThingWithThe(result);
if (finalResult > 10) someCounterZero++;
else someCounterOne++;
});
});
}

As you can probably guess, trying to concurrently update a variable didn’t work out very well. When multiple workers complete their task at the same time and the parent thread uses the results to increment the same counter, there’s no guarantee it contains the latest value, and so the final result will be less than it should be. One way to resolve this could be to use JavaScript’s Atomics object, but I opted for the simpler strategy of utilising await Promise.all():

index.ts
const pool = Pool(() => spawn<ExampleWorker>(new Worker('./workers/ExampleWorker')), 12);
const someCounterZero = 0;
const someCounterOne = 0;
for (let i = 0; i < 10000; i++) {
const shuffledArray = await pool.queue(
async (exampleWorker) => await exampleWorker.shuffleArray(originalArray)
);
// Add each task run to an array
myTasks.push(
pool.queue(async (exampleWorker) => {
await exampleWorker.doSomething(shuffledArray);
})
);
}

It does mean that the 10,000 result objects need to be processed synchronously after all the statistical tests have completed, but in my case this has negligible performance impact:

index.ts
// ... Previous for-loop code
// Wait for all tasks to complete
await Promise.all(myTasks);
// Loop through the task results and perform the comparisons
for (let i = 0; i < myTasks.length; i++) {
myTasks[i]
.then((result) => {
const finalResult = doAnotherThingWithThe(result);
if (finalResult > 10) someCounterZero++;
else someCounterOne++;
});
});
}

The counters will now increment sequentially and contain the correct values when all is said and done.

Mistake three — passing objects between parent and workers

If you have some familiarity with worker threads (or possibly multithreading in general), you might have noticed something missing from the previous code snippets. Running the program on small datasets worked well. However, as soon as I started using my dataset of a million numbers, I would see the memory usage climb before eventually causing an error:

FATAL ERROR: MarkCompactCollector: young object promotion failed Allocation failed - JavaScript heap out of memory

The heap runs out of space and the program crashes. Even when utilising the V8 option --max-old-space-size, unused memory is not freed fast enough and the same error occurs. When objects are sent between worker threads, their structure is copied over. It also seems like these copied objects are not cleared off the heap once they’ve been used. Node has the function process.memoryUsage() which provides a snapshot of your program’s memory usage. Some analysis—excessive use of console.log() and breakpoints—showed that whenever something was returned from a worker to the parent thread, the heap usage increased, eventually resulting in the fatal error.

There’s a JavaScript concept of transferable objects, where ownership of the object is transferred between threads as opposed to simply copying the entire structure over—no duplicates are created, no extra memory is required. If you’ve experience with using C/C++, it’s like passing by reference. In threads.js, the helper function Transfer() takes a transferable object (ArrayBufferLikes) and marks it to be transferred between threads. The following shows this in action when returning objects from the worker thread to the parent:

exampleWorker.ts
import { serialize } from 'v8';
import { TransferDescriptor } from 'threads';
import { expose, Transfer } from 'threads/worker';
import { shuffleArray, doYetAnotherThing } from 'somewhere';
const exampleWorker = {
shuffleArray: (originalArray: number[]): TransferDescriptor => {
const shuffledArray = shuffleArray(originalArray);
// Convert the shuffled array into a Uint32Array which is ArrayBufferLike
const uint32ShuffledArray = new Uint32Array(shuffledArray);
// threads.js' Transfer method signals ownership of the result should be transferred
return Transfer(uint32ShuffledArray.buffer, [uint32ShuffledArray.buffer]);
},
doSomething: (shuffledArray: number[]): TransferDescriptor => {
const result = doYetAnotherThing(shuffledArray);
// Use v8.serialize to transform the result into an ArrayBufferLike
const serializedResult = serialize(result);
// threads.js' Transfer method signals ownership of the result should be transferred
return Transfer(serializedResult.buffer, [serializedResult.buffer]);
},
};
export type ExampleWorker = typeof exampleWorker;
expose(exampleWorker);

For an object that doesn’t naturally fit into an ArrayBufferLike, the V8 serialize() and deserialize() methods work to reshape it into something that will, and eventually restore it to its former glory. Now when I ran the code, the heap usage did not increase at a rate the garbage collector cannot maintain, the results were accurate, and order seemed to be restored—until I noticed a spike in memory usage to 10 GB.

The issue causing this is similar except it’s now a crack in what Node.js calls “external” memory usage, the makeup of which is all the C++ objects bound to JavaScript objects. Since we already transferised—a verb I’ve made up that denotes marking an object as transferable—the objects being returned from the workers, it’s logical that those being passed to the workers also need transferising. As these are number arrays, it’s simply a case of using Uint32Array’s constructor to convert it into an ArrayBufferLike which can then be used with Transfer():

index.ts
for (let i = 0; i < 10000; i++) {
const shuffledArray = await pool.queue(async (exampleWorker) => {
// Convert the original array to Uint32Array
const uint32OriginalArray = new Uint32Array(originalArray);
// Transfer ownership of the Uint32Array to the worker
return await exampleWorker.shuffleArray(
Transfer(uint32OriginalArray.buffer, [uint32OriginalArray.buffer])
);
});
myTasks.push(
pool.queue(async (exampleWorker) => {
// Convert the shuffled array to Uint32Array
const uint32ShuffledArray = new Uint32Array(shuffledArray);
// Transfer ownership of the Uint32Array to the worker
return await exampleWorker.doSomething(
Transfer(uint32ShuffledArray.buffer, [uint32ShuffledArray.buffer])
);
})
);
}

If the functions your worker runs rely on non-ArrayBufferLike variables, then you need to make sure to convert the input to the type expected by those functions:

exampleWorker.ts
import { serialize } from 'v8';
import { TransferDescriptor } from 'threads';
import { expose, Transfer } from 'threads/worker';
import { shuffleArray, doYetAnotherThing } from 'somewhere';
const exampleWorker = {
shuffleArray: (originalArray: any): TransferDescriptor => {
// Convert the TransferDescriptor into a Uint32Array into a number array
// This looks silly but TypeScript compilation will fail, and if you force-compile it,
// a runtime TypeError will occur on execution
const numberArray = Array.from(new Uint32Array(originalArray));
const shuffledArray = shuffleArray(numberArray);
const uint32ShuffledArray = new Uint32Array(shuffledArray);
return Transfer(uint32ShuffledArray.buffer, [uint32ShuffledArray.buffer]);
},
doSomething: (shuffledArray: any): TransferDescriptor => {
// Same as above
const numberArray = Array.from(new Uint32Array(shuffledArray));
const result = doYetAnotherThing(numberArray);
const serializedResult = serialize(result);
return Transfer(serializedResult.buffer, [serializedResult.buffer]);
},
};
export type ExampleWorker = typeof exampleWorker;
expose(exampleWorker);

With this final piece of duct tape applied to my leaky code, the program’s memory usage hovers around 1 GB with twelve worker threads.

Closure

The end result is a script which can employ multiple workers, each picking away at the dataset. Running the program on 1,000,000 values with one thread takes about two hours and forty-five minutes to complete. With twelve threads, it takes about forty-five minutes, roughly 3.5 times faster. A solid gain, but also indicative of other efficiencies that can be made in the statistical test implementations themselves. Retrospectively, opting for convenience and familiarity in using Node.js might not have been the best choice—perhaps another language with multithreading idiosyncrasies would have provided a better overall experience. However, that investigation is for another time and possibly another post.

The code snippets in this post omit a lot of detail in my worker thread usage. If you would like to see how it actually all works together, feel free to check out the project's source code.

• • •
Home.
© 2021 Zaid Al-Jubouri