Hi Hsekar, If the logic of consuming messages is an asynchronous operation and don’t need lots of CPU cycles, I think it’s not a big problem for the single listener thread for a consumer. But if the logic is a synchronous operation, It’s better to asynchronize it or you can maintain a thread pool for running the message handling logic for increasing the parallelism.
Usually affect the performance of consumer reading messages is the receiverQueueSize of the consumer. By default is 1000, If you want to achieve higher throughput such as 100k msgs/sec, it’s better to increase the receiver queue size. BTW, there is a white paper(https://streamnative.io/whitepaper/taking-a-deep-dive-into-apache-pulsar-architecture-for-performance-tuning) to explore factors affecting performance in pulsar. Hope it can help you. Thanks, Penghui On Nov 5, 2020, 10:08 PM +0800, Rakesh Nair <[email protected]>, wrote: > Hey Penghui, > Then what might be the best approach to attain maximum throughput when > consuming numerous messages - maintaining multiple consumers with shared > subscription? But wouldn't this create unnecessary broker communication > overhead? > > I'm open to other suggestions too for attaining maximum throughput.. > > -- > Thanks, > Hsekar Rian. > > > On Thu, Nov 5, 2020 at 7:20 PM PengHui Li <[email protected]> wrote: > > > Hi Hsekar, > > > > > > Sorry, I did not describe clearly. The client will get a thread from the > > > listener threads following round-robin pattern and then assign this > > > thread to a consumer for handling the incoming messages. > > > So for a consumer all incoming messages are handling by single thread and > > > if the consumers count are greater than the listener threads count, there > > > will be the same thread assigned to different consumers. > > > > > > Thanks, > > > Penghui > > > On Nov 5, 2020, 9:07 PM +0800, Rakesh Nair <[email protected]>, > > > wrote: > > > > Hey Penghui, > > > > Could you take a look at this code snippet. > > > > > > > > int numconsumers = 1; > > > > PulsarClient client = > > > > PulsarClient.builder().serviceUrl("pulsar://localhost:6650").statsInterval(5, > > > > TimeUnit.SECONDS).listenerThreads(8).build(); > > > > final List<Consumer<?>> consumers = new ArrayList<>(); > > > > for (int i = 0; i < numconsumers; i++) { > > > > consumers.add(createConsumerWithLister(client, "my-topic", > > > > "my-subscription", "C" + i)); > > > > } > > > > System.out.println("Started..."); > > > > > > > > Runtime.getRuntime().addShutdownHook(new Thread(() -> { > > > > for (Consumer<?> consumer : consumers) { > > > > try { > > > > consumer.close(); > > > > } catch (PulsarClientException e) { > > > > e.printStackTrace(); > > > > } > > > > } > > > > })); > > > > private static Consumer<byte[]> createConsumerWithLister(final > > > > PulsarClient client, final String topic, final String subscription, > > > > final String consumerName) throws PulsarClientException { > > > > return > > > > client.newConsumer().topic(topic).consumerName(consumerName).subscriptionName(subscription) > > > > > > > > .subscriptionMode(SubscriptionMode.Durable).subscriptionType(SubscriptionType.Shared) > > > > > > > > .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) > > > > .messageListener((consumer, msg) -> { > > > > System.out.printf("[%s/%s]Message received: key=%s, > > > > value=%s, topic=%s, id=%s%n", consumerName, > > > > Thread.currentThread().getName(), > > > > msg.getKey(), msg.getValue(), msg.getTopicName(), > > > > msg.getMessageId().toString()); > > > > consumer.acknowledgeAsync(msg); > > > > }).subscribe(); > > > > } > > > > Upon execution, I noticed that all the messages were getting processed > > > > by the same listener thread only. This opposes the fact that setting > > > > ClientBuilder.listenerThreads(8) and initializing 1 consumer results in > > > > all the 8 listener threads being used in processing the incoming > > > > message. > > > > Could you clarify on this? Is it some configuration mistake? > > > > > > > > -- > > > > Thanks, > > > > Hsekar Rian. > > > > > > > > > On Thu, Nov 5, 2020 at 2:59 PM PengHui Li <[email protected]> > > > > > wrote: > > > > > > > Thanks for the quick reply. So, just confirming, if I set up > > > > > > > ClientBuilder.listenerThreads(8) and initialize 1 consumer and 1 > > > > > > > producer, then all the 8 threads would be used for processing the > > > > > > > incoming messages in a round-robin fashion. Is my understanding > > > > > > > correct? > > > > > > > > > > > > Yes. > > > > > > > > > > > > > Secondly, I'm trying to understand the > > > > > > > producerBuilder.createAsync() & messageBuilder.sendAsync() > > > > > > > concept. I understand that sendAsync() drops the message into > > > > > > > the producer queue. Does a seperate thread pull the messages from > > > > > > > the queue and then send it to the broker? If so, is there a way > > > > > > > to configure the number of these threads? I'm trying to > > > > > > > understand how best to maintain the producer to get maximum > > > > > > > publish throughput? > > > > > > > > > > > > The sendAsync() will send the message immediately, it will left a > > > > > > pending callback in the queue. After the broker return the send > > > > > > message to the client, the client will remove the pending callback. > > > > > > This step is done in the io thread. > > > > > > You can change the default IO threads by using > > > > > > clientBuilder.ioThreads(int numIoThreads). Generally speaking, this > > > > > > does not require a lot of threads, the general recommendation is > > > > > > your cpu cores. > > > > > > > > > > > > Thanks, > > > > > > Penghui > > > > > > On Nov 5, 2020, 4:30 PM +0800, [email protected], wrote: > > > > > > > > > > > > > > Thanks for the quick reply. So, just confirming, if I set up > > > > > > > ClientBuilder.listenerThreads(8) and initialize 1 consumer and 1 > > > > > > > producer, then all the 8 threads would be used for processing the > > > > > > > incoming messages in a round-robin fashion. Is my understanding > > > > > > > correct? > > > >
