Hi Asaf,
thanks for your reminder.

## Changing
I have updated the following changes to make sure the notification arrived
successfully:
1. The watch success response `CommandWatchPartitionUpdateSuccess` will
contain all the concerned topics of this watcher
2. The notification `CommandPartitionUpdate` will always contain all the
concerned topics of this watcher.
3. The notification `CommandPartitionUpdate`contains a monotonically
increased version.
4. A map `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/,
Pair<version, long/*timestamp*/>>` will keep track of the updating
5. A timer will check the updating timeout through `inFlightUpdate`
6. The client acks `CommandPartitionUpdateResult` to broker when it
finishes updating.

## Details

The following mechanism could make sure the newest notification arrived
successfully, copying the description from GH:

A new class, `org.apache.pulsar.PartitonUpdateWatcherService` will keep
track of watchers and will listen to the changes in the metadata. Whenever
a topic partition updates it checks if any watchers should be notified and
sends an update for all topics the watcher concerns through the ServerCnx.
Then we will record this request into a map,
`PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/, Pair<version,
long/*timestamp*/>>`.  A timer will check this update timeout through
inFlightUpdate .  We will query all the concerned topics's partition if
this watcher has sent an update timeout and will resend it.

The client acks `CommandPartitionUpdateResult` to broker when it finishes
updating.  The broker handle `CommandPartitionUpdateResult` request:
 - If CommandPartitionUpdateResult#version <
PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version, broker
ignores this ack.
 -  If CommandPartitionUpdateResult#version ==
PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version
    - If CommandPartitionUpdateResult#success is true,  broker just removes
the watcherID from inFlightUpdate.
    - If CommandPartitionUpdateResult#success is false,  broker removes the
watcherId from inFlightUpdate, and queries all the concerned topics's
partition and resend.
 - If CommandPartitionUpdateResult#version >
PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version, this
should not happen.

 ## Edge cases
- Broker restarts or crashes
Client will reconnect to another broker, broker responses
`CommandWatchPartitionUpdateSuccess` with watcher concerned topics's
partitions.  We will call `PartitionsUpdateListener` if the connection
opens.
- Client acks fail or timeout
Broker will resend the watcher concerned topics's partitions either client
acks fail or acks timeout.
- Partition updates before client acks.
`CommandPartitionUpdate#version` monotonically increases every time it is
updated. If Partition updates before client acks, a greater version will be
put into `PartitonUpdateWatcherService#inFlightUpdate`.  The previous acks
will be ignored because the version is less than the current version.


Asaf Mesika <asaf.mes...@gmail.com> 于2023年2月22日周三 21:33写道:

> How about edge cases?
> In Andra's PIP he took into account cases where updates were lost, so he
> created a secondary poll. Not saying it's the best situation for your case
> of course.
> I'm saying that when a broker sends an update CommandPartitionUpdate, how
> do you know it arrived successfully? From my memory, there is no ACK in the
> protocol, saying "I'm the client, I got the update successfully" and only
> then it removed the "dirty" flag for that topic, for this watcher ID.
>
> Are there any other edge cases we can have? Let's be exhaustive.
>
>
>
> On Wed, Feb 22, 2023 at 1:14 PM houxiaoyu <anonhx...@gmail.com> wrote:
>
> > Thanks for your great suggestion Enrico.
> >
> > I agreed with you. It's more reasonable to add a
> > `supports_partition_update_watchers`  in `FeatureFlags`  to detect that
> the
> > connected broker supporting this feature , and add a new broker
> > configuration property `enableNotificationForPartitionUpdate` with
> default
> > value true, which is much like PIP-145.
> >
> > I have updated the descriptions.
> >
> > Enrico Olivelli <eolive...@gmail.com> 于2023年2月22日周三 17:26写道:
> >
> > > I support this proposal.
> > > Coping here my comments from GH:
> > >
> > > can't we enable this by default in case we detect that the connected
> > > Broker supports it ?
> > > I can't find any reason for not using this mechanism if it is
> available.
> > >
> > > Maybe we can set the default to "true" and allow users to disable it
> > > in case it impacts their systems in an unwanted way.
> > >
> > > Maybe It would be useful to have a way to disable the mechanism on the
> > > broker side as well
> > >
> > > Enrico
> > >
> > > Il giorno mer 22 feb 2023 alle ore 10:22 houxiaoyu
> > > <anonhx...@gmail.com> ha scritto:
> > > >
> > > > Hi Pulsar community:
> > > >
> > > > I opened a PIP to discuss "Notifications for partitions update"
> > > >
> > > > ### Motivation
> > > >
> > > > Pulsar client will poll brokers at fix time for checking the
> partitions
> > > > update if we publish/subscribe the partitioned topics with
> > > > `autoUpdatePartitions` as true. This causes unnecessary load for
> both
> > > > clients and brokers since most of the time the number of partitions
> > will
> > > > not change. In addition polling introduces latency in partitions
> update
> > > >  which is specified by `autoUpdatePartitionsInterval`.
> > > > This PIP would like to introduce a notification mechanism for
> partition
> > > > update, which is much like PIP-145 for regex subscriptions
> > > > https://github.com/apache/pulsar/issues/14505.
> > > >
> > > > For more details, please read the PIP at:
> > > > https://github.com/apache/pulsar/issues/19596
> > > > Looking forward to hearing your thoughts.
> > > >
> > > > Thanks,
> > > > Xiaoyu Hou
> > > > ----
> > >
> >
>

Reply via email to