Hi, Qiang Huang.
This is a good proposal to solve the seek issue of readers. Overall
looks good to me. Left some comments here. Thanks.

> > - stage 1: Check the current cursor status when handling flowPermits from
> > the server side.

Could you explain more details on this step? It looks like there is
not much described above. What kind of status needs to be checked, and
what kind of behavior will the broker take?

> > 1. Consumer reconnect need reset epoch.

Why do we need to reset the epoch when the consumer reconnects?

Thanks!

Zike Yang

On Tue, Jul 26, 2022 at 11:51 AM Anon Hxy <anonhx...@gmail.com> wrote:
>
> +1, Good work.
>
> Thanks,
> Xiaoyu Hou
>
> Qiang Huang <qiang.huang1...@gmail.com> 于2022年7月24日周日 22:25写道:
>
> > Hi Pulsar community:
> > I open a pip to discuss "Pulsar client: seek command add epoch"
> > Proposal Link:
> >
> >    - issue link: https://github.com/apache/pulsar/issues/16757
> >
> > --
> > ## Motivation
> > `Reader` belongs to exclusive subscription type, and it uses `nonDurable`
> > cursor. After receiving messages, `Reader` will ack cumulatively
> > immediately.
> > The `flowPermits` are triggered in multiple scenarios from the client side
> > and it is isolated from `seek` of `Consumer`. Therefore, it is possibile
> > that `flowPermits` will execute after `seek` from the client side, like the
> > following flow chart.
> >
> > [image: image.png]
> >
> > When `handleSeek` processing is delay from the server side, the
> > `MarkDelete position` is modified in a wrong way.
> > The expected result is that `Reader`can re-consume messages from `mark
> > delete:(1,1)` after `seek`. But it doesn't work.
> >
> > Pulsar read message and seek position is not a synchronous operation, the
> > seek request can't prevent an in-process entry reading operation. The
> > client-side also has an opportunity to receive messages after the seek
> > position.
> >
> > Pulsar client make read messages operation and seek position operation
> > synchronized so add an epoch into server and client consumer.  After client
> > reader consumer invoke `seek` , the epoch increase 1 and send `seek`
> >  command carry the epoch and then server consumer will update the epoch.
> > When dispatcher messages to client will carry the epoch which the cursor
> > read at the time. Client consumer will filter the send messages command
> > which is smaller than current epoch.
> > In this way, after the client consumer send `seek` command successfully,
> > because it has passed the epoch filtering, the consumer will not receive a
> > message with a messageID greater than the user previously seek position.
> >
> >
> > ### Current implementation details
> > #### CommandSeek Protocal
> > ```proto
> > // Reset an existing consumer to a particular message id
> > message CommandSeek {
> >     required uint64 consumer_id = 1;
> >     required uint64 request_id  = 2;
> >
> >     optional MessageIdData message_id = 3;
> >     optional uint64 message_publish_time = 4;
> > }
> > ```
> > ### CommandMessage
> > ```proto
> > message CommandMessage {
> >     required uint64 consumer_id       = 1;
> >     required MessageIdData message_id = 2;
> >     optional uint32 redelivery_count  = 3 [default = 0];
> >     repeated int64 ack_set = 4;
> >     optional uint64 epoch = 5 [default = 0];
> > }
> > ```
> > `CommandMessage` already add epoch by [PIP-84](
> > https://github.com/apache/pulsar/wiki/PIP-84-:-Pulsar-client:-Redeliver-command-add-epoch)
> > , when client receive `CommandMessage` will compare the command epoch and
> > local epoch to handle this command.
> >
> > ## Goal
> > Add epoch into seek command.
> >
> > ## API Changes
> > ### Protocal change: CommandSeek
> > ```proto
> > // Reset an existing consumer to a particular message id
> > message CommandSeek {
> >     required uint64 consumer_id = 1;
> >     required uint64 request_id  = 2;
> >
> >     optional MessageIdData message_id = 3;
> >     optional uint64 message_publish_time = 4;
> >     optional uint64 consumer_epoch = 5;
> > }
> > ```
> > `CommandSeek` command add epoch field, when client send seek command to
> > server successfully, the server will change the server consumer epoch to
> > the command epoch. The epoch only can bigger than the old epoch in server.
> > Now the client can filter out the message which contains less consumer
> > epoch.
> >
> > ## Implementation
> > - stage 1: Check the current cursor status when handling flowPermits from
> > the server side.
> > - stage 2: Add epoch into seek command, and server update the consumer
> > epoch. It can prevent an in-process entry reading operation after the seek
> > request.
> >
> > ## Reject Alternatives
> > None yet.
> >
> > ## Note
> > 1. Consumer reconnect need reset epoch.
> >
> > --
> > BR,
> > Qiang Huang
> >

Reply via email to