+1, good idea!

Thanks,
Zixuan



Qiang Huang <qiang.huang1...@gmail.com> 于2022年7月24日周日 22:25写道:

> Hi Pulsar community:
> I open a pip to discuss "Pulsar client: seek command add epoch"
> Proposal Link:
>
>    - issue link: https://github.com/apache/pulsar/issues/16757
>
> --
> ## Motivation
> `Reader` belongs to exclusive subscription type, and it uses `nonDurable`
> cursor. After receiving messages, `Reader` will ack cumulatively
> immediately.
> The `flowPermits` are triggered in multiple scenarios from the client side
> and it is isolated from `seek` of `Consumer`. Therefore, it is possibile
> that `flowPermits` will execute after `seek` from the client side, like the
> following flow chart.
>
> [image: image.png]
>
> When `handleSeek` processing is delay from the server side, the
> `MarkDelete position` is modified in a wrong way.
> The expected result is that `Reader`can re-consume messages from `mark
> delete:(1,1)` after `seek`. But it doesn't work.
>
> Pulsar read message and seek position is not a synchronous operation, the
> seek request can't prevent an in-process entry reading operation. The
> client-side also has an opportunity to receive messages after the seek
> position.
>
> Pulsar client make read messages operation and seek position operation
> synchronized so add an epoch into server and client consumer.  After client
> reader consumer invoke `seek` , the epoch increase 1 and send `seek`
>  command carry the epoch and then server consumer will update the epoch.
> When dispatcher messages to client will carry the epoch which the cursor
> read at the time. Client consumer will filter the send messages command
> which is smaller than current epoch.
> In this way, after the client consumer send `seek` command successfully,
> because it has passed the epoch filtering, the consumer will not receive a
> message with a messageID greater than the user previously seek position.
>
>
> ### Current implementation details
> #### CommandSeek Protocal
> ```proto
> // Reset an existing consumer to a particular message id
> message CommandSeek {
>     required uint64 consumer_id = 1;
>     required uint64 request_id  = 2;
>
>     optional MessageIdData message_id = 3;
>     optional uint64 message_publish_time = 4;
> }
> ```
> ### CommandMessage
> ```proto
> message CommandMessage {
>     required uint64 consumer_id       = 1;
>     required MessageIdData message_id = 2;
>     optional uint32 redelivery_count  = 3 [default = 0];
>     repeated int64 ack_set = 4;
>     optional uint64 epoch = 5 [default = 0];
> }
> ```
> `CommandMessage` already add epoch by [PIP-84](
> https://github.com/apache/pulsar/wiki/PIP-84-:-Pulsar-client:-Redeliver-command-add-epoch)
> , when client receive `CommandMessage` will compare the command epoch and
> local epoch to handle this command.
>
> ## Goal
> Add epoch into seek command.
>
> ## API Changes
> ### Protocal change: CommandSeek
> ```proto
> // Reset an existing consumer to a particular message id
> message CommandSeek {
>     required uint64 consumer_id = 1;
>     required uint64 request_id  = 2;
>
>     optional MessageIdData message_id = 3;
>     optional uint64 message_publish_time = 4;
>     optional uint64 consumer_epoch = 5;
> }
> ```
> `CommandSeek` command add epoch field, when client send seek command to
> server successfully, the server will change the server consumer epoch to
> the command epoch. The epoch only can bigger than the old epoch in server.
> Now the client can filter out the message which contains less consumer
> epoch.
>
> ## Implementation
> - stage 1: Check the current cursor status when handling flowPermits from
> the server side.
> - stage 2: Add epoch into seek command, and server update the consumer
> epoch. It can prevent an in-process entry reading operation after the seek
> request.
>
> ## Reject Alternatives
> None yet.
>
> ## Note
> 1. Consumer reconnect need reset epoch.
>
> --
> BR,
> Qiang Huang
>

Reply via email to