fabianwikstrom opened a new issue, #372: URL: https://github.com/apache/pulsar-client-node/issues/372
Hello, we need some advice on how to increase throughput in our Pulsar consumers. Here are some details: - We run 1 consumer per pod in a kubernetes cluster - We run in the `Shared` subscription mode. Strict ordering does not matter for us - To keep debugging simple we have not enabled batching - We've been using the `listener` pattern We've found that our messages are being processed sequentially, which leads to poor throughput. We need to speed things up a bit. What we are wondering is what is the recommended way to do so. I've attached two options we are considering below 1. Increases the number of consumers and uses the listener pattern 2. Uses the receiver pattern with multiple workers We'd like to understand what the community considers best practice and why. Thank you :) ### Running multiple consumers per pod ``` import { faker } from '@faker-js/faker'; import Pulsar from 'pulsar-client'; process.env.ENVIRONMENT = 'development'; process.env.PULSAR_SERVICE_URL = 'pulsar://localhost:6650'; const PULSAR_TOPIC = `test-${faker.string.alpha(10)}`; const PULSAR_SUBSCRIPTION = `sub-${PULSAR_TOPIC}`; const CONCURRENCY = 5; const SEND_NUMBER = 10; async function handleMessage( message: Pulsar.Message, consumer: Pulsar.Consumer, ): Promise<void> { console.log('Received message: ', message.getData().toString()); await new Promise((resolve) => setTimeout(resolve, 1000)); await consumer.acknowledge(message); } async function main() { const client = new Pulsar.Client({ serviceUrl: process.env.PULSAR_SERVICE_URL as string, log: logconfig(), messageListenerThreads: CONCURRENCY, }); console.log('Topic: ', PULSAR_TOPIC); console.log('Subscription: ', PULSAR_SUBSCRIPTION); // Create the main consumer const consumers = []; const counter = new Map<string, number>(); const subscriptionType = 'Shared'; const ackTimeoutMs = 10_000; const nAckRedeliverTimeoutMs = 2_000; const batchIndexAckEnabled = false; for (let i = 0; i < CONCURRENCY; i += 1) { const consumer = await client.subscribe({ topic: PULSAR_TOPIC, subscription: PULSAR_SUBSCRIPTION, subscriptionType, ackTimeoutMs, nAckRedeliverTimeoutMs, receiverQueueSize: 10, batchIndexAckEnabled, listener: (message, consumer) => handleMessage(message, consumer), }); consumers.push(consumer); } // Send messages const producer = await client.createProducer({ topic: PULSAR_TOPIC }); for (let i = 0; i < SEND_NUMBER; i += 1) { const msg = `test-message-${i}`; counter.set(msg, 0); await producer.send({ data: Buffer.from(msg) }); } // Sleep 20 seconds to wait for the messages to be processed await new Promise((resolve) => setTimeout(resolve, 50000)); await producer.close(); for (const consumer of consumers) { await consumer.close(); } process.exit(0); } void main(); function logconfig() { return (level: any, _file: any, _line: any, message: any) => { switch (level) { case Pulsar.LogLevel.DEBUG: console.debug(message); break; case Pulsar.LogLevel.INFO: console.info(message); break; case Pulsar.LogLevel.WARN: console.warn(message); break; case Pulsar.LogLevel.ERROR: console.error(message); break; } }; } ``` ### Increasing concurrency per consumer ```import { faker } from '@faker-js/faker'; import Pulsar from 'pulsar-client'; import logger from '../../utils/logger'; process.env.ENVIRONMENT = 'development'; process.env.PULSAR_SERVICE_URL = 'pulsar://localhost:6650'; const PULSAR_TOPIC = `test-${faker.string.alpha(10)}`; const PULSAR_SUBSCRIPTION = `sub-${PULSAR_TOPIC}`; const CONCURRENCY = 5; const SEND_NUMBER = 10; async function handleMessage( message: Pulsar.Message, consumer: Pulsar.Consumer, ): Promise<void> { console.log('Received message: ', message.getData().toString()); await new Promise((resolve) => setTimeout(resolve, 1000)); await consumer.acknowledge(message); } async function main() { const client = new Pulsar.Client({ serviceUrl: process.env.PULSAR_SERVICE_URL as string, log: logconfig() }); console.log('Topic: ', PULSAR_TOPIC); console.log('Subscription: ', PULSAR_SUBSCRIPTION); // Create the main consumer const consumers = []; const counter = new Map<string, number>(); const subscriptionType = 'Shared'; const ackTimeoutMs = 10_000; const nAckRedeliverTimeoutMs = 2_000; const batchIndexAckEnabled = false; const consumer = await client.subscribe({ topic: PULSAR_TOPIC, subscription: PULSAR_SUBSCRIPTION, subscriptionType, ackTimeoutMs, nAckRedeliverTimeoutMs, receiverQueueSize: 10, batchIndexAckEnabled, }); await listen( consumer, async (consumer, message) => handleMessage(message, consumer), CONCURRENCY, ); // Send messages const producer = await client.createProducer({ topic: PULSAR_TOPIC }); for (let i = 0; i < SEND_NUMBER; i += 1) { const msg = `test-message-${i}`; counter.set(msg, 0); await producer.send({ data: Buffer.from(msg) }); } // Sleep 20 seconds to wait for the messages to be processed await new Promise((resolve) => setTimeout(resolve, 50000)); await producer.close(); await consumer.close(); process.exit(0); } void main(); /** * Receive messages from a Pulsar consumer and process them concurrently. * * @param consumer - Pulsar consumer to receive messages from. * @param listener - Message handler function. * @param concurrency - Maximum number of messages to process at a time. */ export async function listen( consumer: Pulsar.Consumer, listener: ( consumer: Pulsar.Consumer, message: Pulsar.Message, ) => Promise<void>, concurrency = 1, ): Promise<void> { const workers = new Array<Promise<void>>(); for (let i = 0; i < concurrency; i++) { const worker = async () => { for (;;) { try { const message = await consumer.receive(); await listener(consumer, message); } catch (err: any) { logger.error(`Message processing error: ${err.message}`); } } }; workers.push(worker()); } await Promise.all(workers); } function logconfig() { return (level: any, _file: any, _line: any, message: any) => { switch (level) { case Pulsar.LogLevel.DEBUG: console.debug(message); break; case Pulsar.LogLevel.INFO: console.info(message); break; case Pulsar.LogLevel.WARN: console.warn(message); break; case Pulsar.LogLevel.ERROR: console.error(message); break; } }; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org