Hi Tecno Brain,

> We have observed that some messages are consumed extremely delayed (even when 
> the median age is just a few milliseconds)

It sounds like you might have some consumer applications that are
shutting down without acknowledging some messages. This is not
necessarily a problem, but it likely explains your observations.

> Our app usually starts consuming messages with a large age (redeliveries =0) 
> and then it eventually processes messages with an age of just a few 
> milliseconds.

This is how pulsar consumers work. For a shared subscription, when a
consumer connects, the broker delivers the oldest unacknowledged
message that is not already delivered to a connected consumer. Note
that when a consumer receives a message, that message will only be
sent to another consumer if one of the following happens:

1. The consumer negatively acknowledges the message. (The ack timeout
triggers a negative acknowledgement in the client.)
2. The consumer disconnects from the broker. (Note that this can
currently happen due to load balancing.)

When a message is acknowledged, subsequent redelivery would generally
be considered a bug.

> My guess is that a new consumer is processing the entire history in the 
> topic, looking for messages that had not been acknowledged. (since the 
> initial position is set to Earliest)

If you did not reset the cursor, the new consumer is not going to the
earliest position.

> a) How can I avoid these "gaps" of messages apparently not being consumed 
> timely?  Why are these not being delivered to the consumers already up and 
> running?

My first note is that you'll want to understand how the
receiverQueueSize impacts your workload. The receiverQueueSize is the
number of messages a consumer fetches and buffers internally. Unless
conditions 1 or 2 above are met, these messages cannot get delivered
to another consumer. Note also that the ack timeout does not start
counting until a message is received by the client application calling
"receive". As such, an application that stops processing messages but
is still able to keep the broker connection alive will hold on to
these buffered messages.

If you would like an application to only process one message at a
time, you can set receiverQueueSize to 0. That can decrease processing
efficiency and also requires using unbatched messages, which removes
some key performance enhancements. Here is some documentation on the
setting: 
https://pulsar.apache.org/docs/2.10.x/client-libraries-java#configure-consumer.

In order to debug a bit more, it might be helpful to look at the
topic's "internal stats". You can do this by running one of the
following:

pulsar-admin topics stats-internal <topic>

or

pulsar-admin topics partitioned-stats-internal <topic>

The result will include a map of all cursors (basically subscriptions)
for the topic, and in those maps, you'll see
"individuallyDeletedMessages". Those are the ranges of messages that
have been delivered. You'll likely see that you have holes in your
subscription. These holes will map to messages that are getting
redelivered.

> maxTotalReceiverQueueSizeAcrossPartitions(1) ?

This could help, but it will probably increase the latency associated
with calling `receive`.

- Michael

On Fri, Nov 18, 2022 at 6:50 PM Tecno Brain
<cerebrotecnolog...@gmail.com> wrote:
>
> Would I need to also use
>
> maxTotalReceiverQueueSizeAcrossPartitions(1) ?
>
>
> On Fri, Nov 18, 2022 at 7:42 PM Tecno Brain <cerebrotecnolog...@gmail.com> 
> wrote:
>>
>> Hello All,
>>
>> We have a Pulsar cluster with a topic that uses 3 partitions.
>> We are using Pulsar as a queue, a shared subscription.
>> The number of consumers goes up and down constantly (we look at the size of 
>> the queue to scale up and down our number of consumers)
>>
>> We have observed that some messages are consumed extremely delayed (even 
>> when the median age is just a few milliseconds)
>>
>> The median age of the messages received is usually just a few milliseconds, 
>> with some probably in a few seconds (caused by we need to scale up our 
>> consumer cluster)
>>
>> But some messages are consumed minutes or even hours after they have been 
>> queued.
>>
>> Our app usually starts consuming messages with a large age (redeliveries =0) 
>> and then it eventually processes messages with an age of just a few 
>> milliseconds.
>>
>> I found that whenever the age was very very large (hours),  it was  a 
>> redelivery. (therefore, the large age at least has an explanation)
>>
>> My guess is that a new consumer is processing the entire history in the 
>> topic, looking for messages that had not been acknowledged. (since the 
>> initial position is set to Earliest)
>>
>> a) How can I avoid these "gaps" of messages apparently not being consumed 
>> timely?  Why are these not being delivered to the consumers already up and 
>> running?
>>
>> b) How can I reduce the time taken for a message to be redelivered?
>>
>> Here is how we create the Pulsar Consumer:
>>
>>  consumer =
>>         client
>>             .newConsumer(JSONSchema.of(Payload.class))
>>             .topic(requestQueue)
>>             .subscriptionType(SubscriptionType.Shared)
>>             
>> .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
>>             .subscriptionName("queue-sample")
>>             .receiverQueueSize(1)
>>             .subscribe();
>>
>> Any pointers are welcome.
>> Thank you
>>

Reply via email to