fabianwikstrom opened a new issue, #372:
URL: https://github.com/apache/pulsar-client-node/issues/372

   Hello, we need some advice on how to increase throughput in our Pulsar 
consumers. Here are some details: 
   - We run 1 consumer per pod in a kubernetes cluster 
   - We run in the `Shared` subscription mode. Strict ordering does not matter 
for us 
   - To keep debugging simple we have not enabled batching 
   - We've been using the `listener` pattern
   
   We've found that our messages are being processed sequentially, which leads 
to poor throughput. We need to speed things up a bit. What we are wondering is 
what is the recommended way to do so. I've attached two options we are 
considering below
   1. Increases the number of consumers and uses the listener pattern
   2. Uses the receiver pattern with multiple workers
   
   We'd like to understand what the community considers best practice and why. 
Thank you :) 
   
   ### Running multiple consumers per pod 
   ```
   import { faker } from '@faker-js/faker';
   import Pulsar from 'pulsar-client';
   
   process.env.ENVIRONMENT = 'development';
   process.env.PULSAR_SERVICE_URL = 'pulsar://localhost:6650';
   const PULSAR_TOPIC = `test-${faker.string.alpha(10)}`;
   
   const PULSAR_SUBSCRIPTION = `sub-${PULSAR_TOPIC}`;
   
   const CONCURRENCY = 5;
   const SEND_NUMBER = 10;
   
   async function handleMessage(
     message: Pulsar.Message,
     consumer: Pulsar.Consumer,
   ): Promise<void> {
     console.log('Received message: ', message.getData().toString());
     await new Promise((resolve) => setTimeout(resolve, 1000));
     await consumer.acknowledge(message);
   }
   
   async function main() {
     const client = new Pulsar.Client({
       serviceUrl: process.env.PULSAR_SERVICE_URL as string,
       log: logconfig(),
       messageListenerThreads: CONCURRENCY,
     });
   
     console.log('Topic: ', PULSAR_TOPIC);
     console.log('Subscription: ', PULSAR_SUBSCRIPTION);
   
     // Create the main consumer
     const consumers = [];
     const counter = new Map<string, number>();
     const subscriptionType = 'Shared';
     const ackTimeoutMs = 10_000;
     const nAckRedeliverTimeoutMs = 2_000;
     const batchIndexAckEnabled = false;
   
     for (let i = 0; i < CONCURRENCY; i += 1) {
       const consumer = await client.subscribe({
         topic: PULSAR_TOPIC,
         subscription: PULSAR_SUBSCRIPTION,
         subscriptionType,
         ackTimeoutMs,
         nAckRedeliverTimeoutMs,
         receiverQueueSize: 10,
         batchIndexAckEnabled,
         listener: (message, consumer) => handleMessage(message, consumer),
       });
       consumers.push(consumer);
     }
   
     // Send messages
     const producer = await client.createProducer({ topic: PULSAR_TOPIC });
   
     for (let i = 0; i < SEND_NUMBER; i += 1) {
       const msg = `test-message-${i}`;
       counter.set(msg, 0);
       await producer.send({ data: Buffer.from(msg) });
     }
   
     // Sleep 20 seconds to wait for the messages to be processed
     await new Promise((resolve) => setTimeout(resolve, 50000));
   
     await producer.close();
     for (const consumer of consumers) {
       await consumer.close();
     }
     process.exit(0);
   }
   
   void main();
   
   function logconfig() {
     return (level: any, _file: any, _line: any, message: any) => {
       switch (level) {
         case Pulsar.LogLevel.DEBUG:
           console.debug(message);
           break;
         case Pulsar.LogLevel.INFO:
           console.info(message);
           break;
         case Pulsar.LogLevel.WARN:
           console.warn(message);
           break;
         case Pulsar.LogLevel.ERROR:
           console.error(message);
           break;
       }
     };
   }
   ```
   
   ### Increasing concurrency per consumer 
   
   ```import { faker } from '@faker-js/faker';
   import Pulsar from 'pulsar-client';
   
   import logger from '../../utils/logger';
   
   process.env.ENVIRONMENT = 'development';
   process.env.PULSAR_SERVICE_URL = 'pulsar://localhost:6650';
   const PULSAR_TOPIC = `test-${faker.string.alpha(10)}`;
   
   const PULSAR_SUBSCRIPTION = `sub-${PULSAR_TOPIC}`;
   
   const CONCURRENCY = 5;
   const SEND_NUMBER = 10;
   
   async function handleMessage(
     message: Pulsar.Message,
     consumer: Pulsar.Consumer,
   ): Promise<void> {
     console.log('Received message: ', message.getData().toString());
     await new Promise((resolve) => setTimeout(resolve, 1000));
     await consumer.acknowledge(message);
   }
   
   async function main() {
     const client = new Pulsar.Client({
       serviceUrl: process.env.PULSAR_SERVICE_URL as string,
       log: logconfig()
     });
   
     console.log('Topic: ', PULSAR_TOPIC);
     console.log('Subscription: ', PULSAR_SUBSCRIPTION);
   
     // Create the main consumer
     const consumers = [];
     const counter = new Map<string, number>();
     const subscriptionType = 'Shared';
     const ackTimeoutMs = 10_000;
     const nAckRedeliverTimeoutMs = 2_000;
     const batchIndexAckEnabled = false;
   
     const consumer = await client.subscribe({
       topic: PULSAR_TOPIC,
       subscription: PULSAR_SUBSCRIPTION,
       subscriptionType,
       ackTimeoutMs,
       nAckRedeliverTimeoutMs,
       receiverQueueSize: 10,
       batchIndexAckEnabled,
     });
   
     await listen(
       consumer,
       async (consumer, message) => handleMessage(message, consumer),
       CONCURRENCY,
     );
   
     // Send messages
     const producer = await client.createProducer({ topic: PULSAR_TOPIC });
   
     for (let i = 0; i < SEND_NUMBER; i += 1) {
       const msg = `test-message-${i}`;
       counter.set(msg, 0);
       await producer.send({ data: Buffer.from(msg) });
     }
   
     // Sleep 20 seconds to wait for the messages to be processed
     await new Promise((resolve) => setTimeout(resolve, 50000));
   
     await producer.close();
     await consumer.close();
     process.exit(0);
   }
   
   void main();
   
   /**
    * Receive messages from a Pulsar consumer and process them concurrently.
    *
    * @param consumer - Pulsar consumer to receive messages from.
    * @param listener - Message handler function.
    * @param concurrency - Maximum number of messages to process at a time.
    */
   export async function listen(
     consumer: Pulsar.Consumer,
     listener: (
       consumer: Pulsar.Consumer,
       message: Pulsar.Message,
     ) => Promise<void>,
     concurrency = 1,
   ): Promise<void> {
     const workers = new Array<Promise<void>>();
     for (let i = 0; i < concurrency; i++) {
       const worker = async () => {
         for (;;) {
           try {
             const message = await consumer.receive();
             await listener(consumer, message);
           } catch (err: any) {
             logger.error(`Message processing error: ${err.message}`);
           }
         }
       };
       workers.push(worker());
     }
     await Promise.all(workers);
   }
   
   function logconfig() {
     return (level: any, _file: any, _line: any, message: any) => {
       switch (level) {
         case Pulsar.LogLevel.DEBUG:
           console.debug(message);
           break;
         case Pulsar.LogLevel.INFO:
           console.info(message);
           break;
         case Pulsar.LogLevel.WARN:
           console.warn(message);
           break;
         case Pulsar.LogLevel.ERROR:
           console.error(message);
           break;
       }
     };
   }
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to