Hi Asaf,

>  Prefer something more explicit: onTopicPartitionsCountChanged
> Maybe we should mark "PartitionsChangedListener" deprecated?
> 60 seconds seems like *a lot* to miss an update. Maybe 5-10 sec

It looks good to me

> Let's think about the case of endless retries

We will remove the watcher from broker if `channelInactive` invoked like
PIP-145.  Endless retries should not happen if we do so I think.

> Can you elaborate on which broker you'll be doing that conversation with?
> Partitioned topic are spread across several brokers, so how can one broker
> know the count of the partitioned topic?

The client side  `PartitionUpdateWatcher` will extends `HandlerState`, It
could get connection to any broker services. The broker knows the count of
the partitioned topic throught metaDataStore, and it also could knows the
partitioned changed event from watching metaDataStore.

Thanks,
Xiaoyu Hou


Asaf Mesika <asaf.mes...@gmail.com> 于2023年2月27日周一 00:31写道:

>  Michael,
>
> I think the current suggested protocol is more optimized compared to
> existing regex mechanism:
>
>    - If the broker sends notification and it's lost due network issues,
>    you'll only know about it due to the client doing constant polling,
> using
>    its hash to minimize response.
>       - In the suggested mechanism, there's no constant polling at all.
>       Just think of introducing polling to *all* multiple partition
> producers and
>       consumers (Which likely all pulsar users). The suggested mechanism
> has a
>       configured timeout allowing it to resend on timeout, but only
> when it fails
>       as opposed to constant polling mechanisms.
>    - Each time meta-update you'll need to run it through regular
>    expression, on all topics hosted on the broker, for any given client.
>    That's a lot of CPU.
>       - Suggested mechanism mainly cares about the count of partitions, so
>       it's a lot more efficient.
>
>
> Xiaoyu,
>
> I don't like the "onTopicExtend" method name. Prefer something more
> explicit: onTopicPartitionsCountChanged.
>
> Let's think about the case of endless retries - you know, the client is
> acting out, and never acknowledges. Need to to think about it.
> 60 seconds seems like *a lot* to miss an update. Maybe 5-10 sec?
>
> Can you elaborate on which broker you'll be doing that conversation with?
> Partitioned topic are spread across several brokers, so how can one broker
> know the count of the partitioned topic?
>
> Maybe we should mark "PartitionsChangedListener" deprecated? Having two
> interfaces is confusing.
>
>
>
> On Fri, Feb 24, 2023 at 7:29 PM Michael Marshall <mmarsh...@apache.org>
> wrote:
>
> > > Just the way to implements partitioned-topic metadata
> > > notification mechanism is much like notifications on regex sub changes
> >
> > Why do we need a separate notification implementation? The regex
> > subscription feature is about discovering topics (not subscriptions)
> > that match a regular expression. As Joe mentioned, I think we just
> > need the client to "subscribe" to a topic notification for
> > "<topic-name>-partition-[0-9]+" to eliminate the polling.
> >
> > Building on PIP 145, the work for this PIP would be in implementing a
> > different `TopicsChangedListener` [1] so that the result of an added
> > topic is to add a producer/consumer to the new partition.
> >
> > I support removing polling in our streaming platform, but I'd prefer
> > to limit the number of notification systems we implement.
> >
> > Thanks,
> > Michael
> >
> > [0] https://github.com/apache/pulsar/pull/16062
> > [1]
> >
> https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java#L169-L175
> >
> >
> >
> > On Fri, Feb 24, 2023 at 1:57 AM houxiaoyu <anonhx...@gmail.com> wrote:
> > >
> > > Hi Joe,
> > >
> > > When we use PartitionedProducerImpl or MultiTopicsConsumerImpl,  there
> > is a
> > > poll task to fetch the metadata of the partitioned-topic regularly for
> > the
> > > number of partitions updated.  This PIP wants to use a
> > > notification mechanism to replace the metadata poll task.
> > >
> > > Just the way to implements partitioned-topic metadata
> > > notification mechanism is much like notifications on regex sub changes
> > >
> > > Joe F <joefranc...@gmail.com> 于2023年2月24日周五 13:37写道:
> > >
> > > > Why is this needed when we have notifications on regex sub changes?
> > Aren't
> > > > the partition names a well-defined regex?
> > > >
> > > > Joe
> > > >
> > > > On Thu, Feb 23, 2023 at 8:52 PM houxiaoyu <anonhx...@gmail.com>
> wrote:
> > > >
> > > > > Hi Asaf,
> > > > > thanks for your reminder.
> > > > >
> > > > > ## Changing
> > > > > I have updated the following changes to make sure the notification
> > > > arrived
> > > > > successfully:
> > > > > 1. The watch success response `CommandWatchPartitionUpdateSuccess`
> > will
> > > > > contain all the concerned topics of this watcher
> > > > > 2. The notification `CommandPartitionUpdate` will always contain
> all
> > the
> > > > > concerned topics of this watcher.
> > > > > 3. The notification `CommandPartitionUpdate`contains a
> monotonically
> > > > > increased version.
> > > > > 4. A map
> > `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/,
> > > > > Pair<version, long/*timestamp*/>>` will keep track of the updating
> > > > > 5. A timer will check the updating timeout through `inFlightUpdate`
> > > > > 6. The client acks `CommandPartitionUpdateResult` to broker when it
> > > > > finishes updating.
> > > > >
> > > > > ## Details
> > > > >
> > > > > The following mechanism could make sure the newest notification
> > arrived
> > > > > successfully, copying the description from GH:
> > > > >
> > > > > A new class, `org.apache.pulsar.PartitonUpdateWatcherService` will
> > keep
> > > > > track of watchers and will listen to the changes in the metadata.
> > > > Whenever
> > > > > a topic partition updates it checks if any watchers should be
> > notified
> > > > and
> > > > > sends an update for all topics the watcher concerns through the
> > > > ServerCnx.
> > > > > Then we will record this request into a map,
> > > > > `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/,
> > > > Pair<version,
> > > > > long/*timestamp*/>>`.  A timer will check this update timeout
> through
> > > > > inFlightUpdate .  We will query all the concerned topics's
> partition
> > if
> > > > > this watcher has sent an update timeout and will resend it.
> > > > >
> > > > > The client acks `CommandPartitionUpdateResult` to broker when it
> > finishes
> > > > > updating.  The broker handle `CommandPartitionUpdateResult`
> request:
> > > > >  - If CommandPartitionUpdateResult#version <
> > > > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version,
> > > > broker
> > > > > ignores this ack.
> > > > >  -  If CommandPartitionUpdateResult#version ==
> > > > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version
> > > > >     - If CommandPartitionUpdateResult#success is true,  broker just
> > > > removes
> > > > > the watcherID from inFlightUpdate.
> > > > >     - If CommandPartitionUpdateResult#success is false,  broker
> > removes
> > > > the
> > > > > watcherId from inFlightUpdate, and queries all the concerned
> topics's
> > > > > partition and resend.
> > > > >  - If CommandPartitionUpdateResult#version >
> > > > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version,
> > this
> > > > > should not happen.
> > > > >
> > > > >  ## Edge cases
> > > > > - Broker restarts or crashes
> > > > > Client will reconnect to another broker, broker responses
> > > > > `CommandWatchPartitionUpdateSuccess` with watcher concerned
> topics's
> > > > > partitions.  We will call `PartitionsUpdateListener` if the
> > connection
> > > > > opens.
> > > > > - Client acks fail or timeout
> > > > > Broker will resend the watcher concerned topics's partitions either
> > > > client
> > > > > acks fail or acks timeout.
> > > > > - Partition updates before client acks.
> > > > > `CommandPartitionUpdate#version` monotonically increases every time
> > it is
> > > > > updated. If Partition updates before client acks, a greater version
> > will
> > > > be
> > > > > put into `PartitonUpdateWatcherService#inFlightUpdate`.  The
> previous
> > > > acks
> > > > > will be ignored because the version is less than the current
> version.
> > > > >
> > > > >
> > > > > Asaf Mesika <asaf.mes...@gmail.com> 于2023年2月22日周三 21:33写道:
> > > > >
> > > > > > How about edge cases?
> > > > > > In Andra's PIP he took into account cases where updates were
> lost,
> > so
> > > > he
> > > > > > created a secondary poll. Not saying it's the best situation for
> > your
> > > > > case
> > > > > > of course.
> > > > > > I'm saying that when a broker sends an update
> > CommandPartitionUpdate,
> > > > how
> > > > > > do you know it arrived successfully? From my memory, there is no
> > ACK in
> > > > > the
> > > > > > protocol, saying "I'm the client, I got the update successfully"
> > and
> > > > only
> > > > > > then it removed the "dirty" flag for that topic, for this watcher
> > ID.
> > > > > >
> > > > > > Are there any other edge cases we can have? Let's be exhaustive.
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, Feb 22, 2023 at 1:14 PM houxiaoyu <anonhx...@gmail.com>
> > wrote:
> > > > > >
> > > > > > > Thanks for your great suggestion Enrico.
> > > > > > >
> > > > > > > I agreed with you. It's more reasonable to add a
> > > > > > > `supports_partition_update_watchers`  in `FeatureFlags`  to
> > detect
> > > > that
> > > > > > the
> > > > > > > connected broker supporting this feature , and add a new broker
> > > > > > > configuration property `enableNotificationForPartitionUpdate`
> > with
> > > > > > default
> > > > > > > value true, which is much like PIP-145.
> > > > > > >
> > > > > > > I have updated the descriptions.
> > > > > > >
> > > > > > > Enrico Olivelli <eolive...@gmail.com> 于2023年2月22日周三 17:26写道:
> > > > > > >
> > > > > > > > I support this proposal.
> > > > > > > > Coping here my comments from GH:
> > > > > > > >
> > > > > > > > can't we enable this by default in case we detect that the
> > > > connected
> > > > > > > > Broker supports it ?
> > > > > > > > I can't find any reason for not using this mechanism if it is
> > > > > > available.
> > > > > > > >
> > > > > > > > Maybe we can set the default to "true" and allow users to
> > disable
> > > > it
> > > > > > > > in case it impacts their systems in an unwanted way.
> > > > > > > >
> > > > > > > > Maybe It would be useful to have a way to disable the
> > mechanism on
> > > > > the
> > > > > > > > broker side as well
> > > > > > > >
> > > > > > > > Enrico
> > > > > > > >
> > > > > > > > Il giorno mer 22 feb 2023 alle ore 10:22 houxiaoyu
> > > > > > > > <anonhx...@gmail.com> ha scritto:
> > > > > > > > >
> > > > > > > > > Hi Pulsar community:
> > > > > > > > >
> > > > > > > > > I opened a PIP to discuss "Notifications for partitions
> > update"
> > > > > > > > >
> > > > > > > > > ### Motivation
> > > > > > > > >
> > > > > > > > > Pulsar client will poll brokers at fix time for checking
> the
> > > > > > partitions
> > > > > > > > > update if we publish/subscribe the partitioned topics with
> > > > > > > > > `autoUpdatePartitions` as true. This causes unnecessary
> load
> > for
> > > > > > both
> > > > > > > > > clients and brokers since most of the time the number of
> > > > partitions
> > > > > > > will
> > > > > > > > > not change. In addition polling introduces latency in
> > partitions
> > > > > > update
> > > > > > > > >  which is specified by `autoUpdatePartitionsInterval`.
> > > > > > > > > This PIP would like to introduce a notification mechanism
> for
> > > > > > partition
> > > > > > > > > update, which is much like PIP-145 for regex subscriptions
> > > > > > > > > https://github.com/apache/pulsar/issues/14505.
> > > > > > > > >
> > > > > > > > > For more details, please read the PIP at:
> > > > > > > > > https://github.com/apache/pulsar/issues/19596
> > > > > > > > > Looking forward to hearing your thoughts.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Xiaoyu Hou
> > > > > > > > > ----
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> >
>

Reply via email to