> Just the way to implements partitioned-topic metadata
> notification mechanism is much like notifications on regex sub changes

Why do we need a separate notification implementation? The regex
subscription feature is about discovering topics (not subscriptions)
that match a regular expression. As Joe mentioned, I think we just
need the client to "subscribe" to a topic notification for
"<topic-name>-partition-[0-9]+" to eliminate the polling.

Building on PIP 145, the work for this PIP would be in implementing a
different `TopicsChangedListener` [1] so that the result of an added
topic is to add a producer/consumer to the new partition.

I support removing polling in our streaming platform, but I'd prefer
to limit the number of notification systems we implement.

Thanks,
Michael

[0] https://github.com/apache/pulsar/pull/16062
[1] 
https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java#L169-L175



On Fri, Feb 24, 2023 at 1:57 AM houxiaoyu <anonhx...@gmail.com> wrote:
>
> Hi Joe,
>
> When we use PartitionedProducerImpl or MultiTopicsConsumerImpl,  there is a
> poll task to fetch the metadata of the partitioned-topic regularly for the
> number of partitions updated.  This PIP wants to use a
> notification mechanism to replace the metadata poll task.
>
> Just the way to implements partitioned-topic metadata
> notification mechanism is much like notifications on regex sub changes
>
> Joe F <joefranc...@gmail.com> 于2023年2月24日周五 13:37写道:
>
> > Why is this needed when we have notifications on regex sub changes? Aren't
> > the partition names a well-defined regex?
> >
> > Joe
> >
> > On Thu, Feb 23, 2023 at 8:52 PM houxiaoyu <anonhx...@gmail.com> wrote:
> >
> > > Hi Asaf,
> > > thanks for your reminder.
> > >
> > > ## Changing
> > > I have updated the following changes to make sure the notification
> > arrived
> > > successfully:
> > > 1. The watch success response `CommandWatchPartitionUpdateSuccess` will
> > > contain all the concerned topics of this watcher
> > > 2. The notification `CommandPartitionUpdate` will always contain all the
> > > concerned topics of this watcher.
> > > 3. The notification `CommandPartitionUpdate`contains a monotonically
> > > increased version.
> > > 4. A map `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/,
> > > Pair<version, long/*timestamp*/>>` will keep track of the updating
> > > 5. A timer will check the updating timeout through `inFlightUpdate`
> > > 6. The client acks `CommandPartitionUpdateResult` to broker when it
> > > finishes updating.
> > >
> > > ## Details
> > >
> > > The following mechanism could make sure the newest notification arrived
> > > successfully, copying the description from GH:
> > >
> > > A new class, `org.apache.pulsar.PartitonUpdateWatcherService` will keep
> > > track of watchers and will listen to the changes in the metadata.
> > Whenever
> > > a topic partition updates it checks if any watchers should be notified
> > and
> > > sends an update for all topics the watcher concerns through the
> > ServerCnx.
> > > Then we will record this request into a map,
> > > `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/,
> > Pair<version,
> > > long/*timestamp*/>>`.  A timer will check this update timeout through
> > > inFlightUpdate .  We will query all the concerned topics's partition if
> > > this watcher has sent an update timeout and will resend it.
> > >
> > > The client acks `CommandPartitionUpdateResult` to broker when it finishes
> > > updating.  The broker handle `CommandPartitionUpdateResult` request:
> > >  - If CommandPartitionUpdateResult#version <
> > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version,
> > broker
> > > ignores this ack.
> > >  -  If CommandPartitionUpdateResult#version ==
> > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version
> > >     - If CommandPartitionUpdateResult#success is true,  broker just
> > removes
> > > the watcherID from inFlightUpdate.
> > >     - If CommandPartitionUpdateResult#success is false,  broker removes
> > the
> > > watcherId from inFlightUpdate, and queries all the concerned topics's
> > > partition and resend.
> > >  - If CommandPartitionUpdateResult#version >
> > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version, this
> > > should not happen.
> > >
> > >  ## Edge cases
> > > - Broker restarts or crashes
> > > Client will reconnect to another broker, broker responses
> > > `CommandWatchPartitionUpdateSuccess` with watcher concerned topics's
> > > partitions.  We will call `PartitionsUpdateListener` if the connection
> > > opens.
> > > - Client acks fail or timeout
> > > Broker will resend the watcher concerned topics's partitions either
> > client
> > > acks fail or acks timeout.
> > > - Partition updates before client acks.
> > > `CommandPartitionUpdate#version` monotonically increases every time it is
> > > updated. If Partition updates before client acks, a greater version will
> > be
> > > put into `PartitonUpdateWatcherService#inFlightUpdate`.  The previous
> > acks
> > > will be ignored because the version is less than the current version.
> > >
> > >
> > > Asaf Mesika <asaf.mes...@gmail.com> 于2023年2月22日周三 21:33写道:
> > >
> > > > How about edge cases?
> > > > In Andra's PIP he took into account cases where updates were lost, so
> > he
> > > > created a secondary poll. Not saying it's the best situation for your
> > > case
> > > > of course.
> > > > I'm saying that when a broker sends an update CommandPartitionUpdate,
> > how
> > > > do you know it arrived successfully? From my memory, there is no ACK in
> > > the
> > > > protocol, saying "I'm the client, I got the update successfully" and
> > only
> > > > then it removed the "dirty" flag for that topic, for this watcher ID.
> > > >
> > > > Are there any other edge cases we can have? Let's be exhaustive.
> > > >
> > > >
> > > >
> > > > On Wed, Feb 22, 2023 at 1:14 PM houxiaoyu <anonhx...@gmail.com> wrote:
> > > >
> > > > > Thanks for your great suggestion Enrico.
> > > > >
> > > > > I agreed with you. It's more reasonable to add a
> > > > > `supports_partition_update_watchers`  in `FeatureFlags`  to detect
> > that
> > > > the
> > > > > connected broker supporting this feature , and add a new broker
> > > > > configuration property `enableNotificationForPartitionUpdate` with
> > > > default
> > > > > value true, which is much like PIP-145.
> > > > >
> > > > > I have updated the descriptions.
> > > > >
> > > > > Enrico Olivelli <eolive...@gmail.com> 于2023年2月22日周三 17:26写道:
> > > > >
> > > > > > I support this proposal.
> > > > > > Coping here my comments from GH:
> > > > > >
> > > > > > can't we enable this by default in case we detect that the
> > connected
> > > > > > Broker supports it ?
> > > > > > I can't find any reason for not using this mechanism if it is
> > > > available.
> > > > > >
> > > > > > Maybe we can set the default to "true" and allow users to disable
> > it
> > > > > > in case it impacts their systems in an unwanted way.
> > > > > >
> > > > > > Maybe It would be useful to have a way to disable the mechanism on
> > > the
> > > > > > broker side as well
> > > > > >
> > > > > > Enrico
> > > > > >
> > > > > > Il giorno mer 22 feb 2023 alle ore 10:22 houxiaoyu
> > > > > > <anonhx...@gmail.com> ha scritto:
> > > > > > >
> > > > > > > Hi Pulsar community:
> > > > > > >
> > > > > > > I opened a PIP to discuss "Notifications for partitions update"
> > > > > > >
> > > > > > > ### Motivation
> > > > > > >
> > > > > > > Pulsar client will poll brokers at fix time for checking the
> > > > partitions
> > > > > > > update if we publish/subscribe the partitioned topics with
> > > > > > > `autoUpdatePartitions` as true. This causes unnecessary load for
> > > > both
> > > > > > > clients and brokers since most of the time the number of
> > partitions
> > > > > will
> > > > > > > not change. In addition polling introduces latency in partitions
> > > > update
> > > > > > >  which is specified by `autoUpdatePartitionsInterval`.
> > > > > > > This PIP would like to introduce a notification mechanism for
> > > > partition
> > > > > > > update, which is much like PIP-145 for regex subscriptions
> > > > > > > https://github.com/apache/pulsar/issues/14505.
> > > > > > >
> > > > > > > For more details, please read the PIP at:
> > > > > > > https://github.com/apache/pulsar/issues/19596
> > > > > > > Looking forward to hearing your thoughts.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Xiaoyu Hou
> > > > > > > ----
> > > > > >
> > > > >
> > > >
> > >
> >

Reply via email to