Demonstrating Semaphores Application in Nestjs - Transaction Processing System

In high-throughput transaction processing systems, specific challenges inevitably arise: Resource Exhaustion and Race Conditions. When your API receives requests faster than your backend can process them, simply pushing everything into an array or executing promises immediately is a recipe for disaster. You risk crashing your Node.js process (Out of Memory) or overwhelming your database.
In a previous article, we discussed this in a Java context. Today, we are porting this robust solution to NestJS and TypeScript. We will implement a Bounded Buffer using Semaphores to handle backpressure and coordinate producer-consumer flows elegantly.
Grab the code from the repo if you want to skip the talk and start hacking. I’ve included some test cases you can poke at, see if you can make them even better. 😌
The Challenge: The Node.js Event Loop
Node.js is single-threaded. While this prevents traditional memory-corruption race conditions (like two threads writing to the same memory address simultaneously), logical race conditions and resource bottlenecks are still major risks.
The Scenario:
Producers: Thousands of HTTP requests hitting your NestJS Controller to initiate payments.
Consumer: A background service that processes these payments (e.g., calling a slow 3rd-party banking API).
The Risk: If the Consumer is slower than the Producer, an unbound queue will grow until the server crashes.
The Solution: The Semaphore Pattern
To solve this, we create a Bounded Buffer, a queue with a fixed capacity. We use Semaphores to coordinate access:
spacesSemaphore: Tracks available slots. Producerswaithere if the buffer is full.itemsSemaphore: Tracks filled slots. Consumerswaithere if the buffer is empty.mutex(Lock): Ensures atomic access to the buffer (useful if our buffer logic becomes complex or involves async validation steps).
1. Implementing the Semaphore Utility
Since JavaScript's standard library doesn't include a Semaphore class, we'll implement a Promise-based one. This allows us to "block" execution using await without freezing the Node.js Event Loop.
src/utils/semaphore.util.ts
TypeScript
export class Semaphore {
private tasks: (() => void)[] = [];
constructor(
// Would typically be the max number of transactions that should be in queue at a time
private count: number,
) {}
/**
* Acquires a permit. If no permits are available,
* returns a Promise that resolves when a permit is released.
*/
async acquire(): Promise<void> {
if (this.count > 0) {
this.count--;
return Promise.resolve();
}
// If no permits, we push the `resolve` function to a queue
// and wait for someone to call release()
return new Promise<void>((resolve) => {
this.tasks.push(resolve);
});
}
/**
* Releases a permit. If there are waiting tasks,
* wakes up the next task instead of incrementing the counter.
*/
release(): void {
if (this.tasks.length > 0) {
const nextTask = this.tasks.shift();
if (nextTask) nextTask(); // Wake up the waiting thread
} else {
this.count++;
}
}
}
2. The Transaction Model
Let's define what we are processing.
src/transactions/transaction.model.ts
TypeScript
export interface Transaction {
id: string;
amount: number;
accountId: string;
timestamp: Date;
}
3. The Bounded Buffer Service
This is the core implementation. We use the @Injectable decorator to make it a standard NestJS service.
src/transactions/transaction.service.ts
TypeScript
import { Injectable, OnModuleInit, Logger } from '@nestjs/common';
import { Transaction } from './transaction.model';
import { Semaphore } from '../util/semaphore.util';
@Injectable()
export class TransactionService implements OnModuleInit {
private readonly logger = new Logger(TransactionService.name);
/**
* Number of transactions that can be in the buffer at once
* Could be populated sequentially from a queue == Redis and BullMQ
*/
private readonly BUFFER_CAPACITY = 10;
private readonly buffer: Transaction[] = [];
// Semaphores
private readonly items = new Semaphore(0); // Starts with 0 items
private readonly spaces = new Semaphore(this.BUFFER_CAPACITY); // Starts with full capacity
private readonly mutex = new Semaphore(1); // Mutual exclusion for buffer operations
// Initialize the Consumer loop when the module starts.
onModuleInit() {
this.startConsumerLoop();
}
/**
* PRODUCER: Called by the Controller.
* This will PAUSE (await) if the buffer is full.
*/
async submitTransaction(transaction: Transaction): Promise<void> {
this.logger.log(`Attempting to submit tx: ${transaction.id}`);
// Wait for space. If buffer is full (10), this await holds here.
await this.spaces.acquire();
// Critical Section: Add to buffer
await this.mutex.acquire();
try {
this.buffer.push(transaction);
this.logger.log(`Transaction Added. Buffer size: ${this.buffer.length}`);
} finally {
this.mutex.release();
}
// Signal that a new transaction is available for the consumer
this.items.release();
}
/**
* CONSUMER: Runs continuously in the background.
*/
private startConsumerLoop() {
// We start a non-blocking loop
process.nextTick(async () => {
while (true) {
try {
await this.processNextTransaction();
} catch (error) {
this.logger.error('Error in consumer loop', error);
}
}
});
}
private async processNextTransaction() {
// Wait for transaction. If buffer is empty, this await holds here.
await this.items.acquire();
let transaction: Transaction | undefined;
// Critical Section: Remove from buffer
await this.mutex.acquire();
try {
transaction = this.buffer.shift();
} finally {
this.mutex.release();
}
// Signal that a space has opened up
this.spaces.release();
if (transaction) {
await this.executeBusinessLogic(transaction);
}
}
// Run Business Logic
protected async executeBusinessLogic(transaction: Transaction) {
this.logger.log(`PROCESSING tx: ${transaction.id}...`);
// Simulate latency (500ms processing time)
await new Promise((resolve) => setTimeout(resolve, 500));
this.logger.log(`COMPLETED tx: ${transaction.id}`);
}
}
4. The Controller (Producer Entry Point)
Finally, we expose an endpoint to accept transactions.
src/transactions/transactions.controller.ts
TypeScript
import { Controller, Post, Body } from '@nestjs/common';
import { TransactionService } from './transaction.service';
import * as crypto from 'crypto';
@Controller('transactions')
export class TransactionController {
constructor(private readonly bufferService: TransactionService) {}
@Post()
async createTransaction(@Body() body: { amount: number; accountId: string }) {
const id = crypto.randomUUID();
const tx = {
id,
amount: body.amount,
accountId: body.accountId,
timestamp: new Date(),
};
// This call might wait if the buffer is full!
// In a real API, you might want to wrap this in a timeout
// to return a 503 Service Unavailable if the buffer is full for too long.
// Or better still utilize a queueing system like BullMQ with Redis,
// so as not to overload the server.
await this.bufferService.submitTransaction(tx);
return { status: 'Accepted', id: tx.id };
}
}
Why This Implementation is Robust
Backpressure Handling: If you flood the
/transactionsendpoint, thespaces.acquire()in the service will strictly limit thebufferarray to 10 items. Once full, the API request handler will simplyawait(pause) until the Consumer frees up a slot. This protects your server's RAM.Event Loop Safety: Unlike Java threads, we aren't sleeping the OS thread. We are using
awaitwith Promises. This means while a request waits for a buffer slot, the Node.js Event Loop is still free to handle other lightweight tasks (like health checks or read-only requests).Separation of Concerns: The Producer (Controller) only cares about submission. The Consumer (Background Loop) focuses on processing. The Semaphores handle the glue between them.
Conclusion
Adapting the Semaphore pattern to NestJS allows us to build resilient systems that can withstand traffic spikes without crashing. By controlling the flow of data before it hits your database or expensive processing logic, you ensure your application remains stable under load.
Feel free to reach out with a dm if you feel stuck or have suggestions at my LinkedIn.
Happy coding!



