Hi Michael,

>  I think we just need the client to "subscribe" to a topic notification
for
>  "<topic-name>-partition-[0-9]+" to eliminate the polling

If pulsar users want to pub/sub a partitioned-topic, I think most of the
users would like to create a simple producer or consumer like following:
```
Producer<byte[]> producer = client.newProducer().topic(topic).create();
producer.sendAsync(msg);
```
```
client.newConsumer()
        .topic(topic)
        .subscriptionName(subscription)
        .subscribe();
```
I think there is no reason for users to use `topicsPattern` if a pulsar
just wants to subscribe a partitioned-topic. In addition, `topicsPattern`
couldn't be used for producers.

So I think PIP-145 [0] will benefit for regex subscriptions.  And this PIP
[1] will benefit for the common partitioned-topic pub/sub scenario.

[0] https://github.com/apache/pulsar/issues/14505
[1] https://github.com/apache/pulsar/issues/19596

Thanks
Xiaoyu Hou

Michael Marshall <mmarsh...@apache.org> 于2023年2月25日周六 01:29写道:

> > Just the way to implements partitioned-topic metadata
> > notification mechanism is much like notifications on regex sub changes
>
> Why do we need a separate notification implementation? The regex
> subscription feature is about discovering topics (not subscriptions)
> that match a regular expression. As Joe mentioned, I think we just
> need the client to "subscribe" to a topic notification for
> "<topic-name>-partition-[0-9]+" to eliminate the polling.
>
> Building on PIP 145, the work for this PIP would be in implementing a
> different `TopicsChangedListener` [1] so that the result of an added
> topic is to add a producer/consumer to the new partition.
>
> I support removing polling in our streaming platform, but I'd prefer
> to limit the number of notification systems we implement.
>
> Thanks,
> Michael
>
> [0] https://github.com/apache/pulsar/pull/16062
> [1]
> https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java#L169-L175
>
>
>
> On Fri, Feb 24, 2023 at 1:57 AM houxiaoyu <anonhx...@gmail.com> wrote:
> >
> > Hi Joe,
> >
> > When we use PartitionedProducerImpl or MultiTopicsConsumerImpl,  there
> is a
> > poll task to fetch the metadata of the partitioned-topic regularly for
> the
> > number of partitions updated.  This PIP wants to use a
> > notification mechanism to replace the metadata poll task.
> >
> > Just the way to implements partitioned-topic metadata
> > notification mechanism is much like notifications on regex sub changes
> >
> > Joe F <joefranc...@gmail.com> 于2023年2月24日周五 13:37写道:
> >
> > > Why is this needed when we have notifications on regex sub changes?
> Aren't
> > > the partition names a well-defined regex?
> > >
> > > Joe
> > >
> > > On Thu, Feb 23, 2023 at 8:52 PM houxiaoyu <anonhx...@gmail.com> wrote:
> > >
> > > > Hi Asaf,
> > > > thanks for your reminder.
> > > >
> > > > ## Changing
> > > > I have updated the following changes to make sure the notification
> > > arrived
> > > > successfully:
> > > > 1. The watch success response `CommandWatchPartitionUpdateSuccess`
> will
> > > > contain all the concerned topics of this watcher
> > > > 2. The notification `CommandPartitionUpdate` will always contain all
> the
> > > > concerned topics of this watcher.
> > > > 3. The notification `CommandPartitionUpdate`contains a monotonically
> > > > increased version.
> > > > 4. A map
> `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/,
> > > > Pair<version, long/*timestamp*/>>` will keep track of the updating
> > > > 5. A timer will check the updating timeout through `inFlightUpdate`
> > > > 6. The client acks `CommandPartitionUpdateResult` to broker when it
> > > > finishes updating.
> > > >
> > > > ## Details
> > > >
> > > > The following mechanism could make sure the newest notification
> arrived
> > > > successfully, copying the description from GH:
> > > >
> > > > A new class, `org.apache.pulsar.PartitonUpdateWatcherService` will
> keep
> > > > track of watchers and will listen to the changes in the metadata.
> > > Whenever
> > > > a topic partition updates it checks if any watchers should be
> notified
> > > and
> > > > sends an update for all topics the watcher concerns through the
> > > ServerCnx.
> > > > Then we will record this request into a map,
> > > > `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/,
> > > Pair<version,
> > > > long/*timestamp*/>>`.  A timer will check this update timeout through
> > > > inFlightUpdate .  We will query all the concerned topics's partition
> if
> > > > this watcher has sent an update timeout and will resend it.
> > > >
> > > > The client acks `CommandPartitionUpdateResult` to broker when it
> finishes
> > > > updating.  The broker handle `CommandPartitionUpdateResult` request:
> > > >  - If CommandPartitionUpdateResult#version <
> > > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version,
> > > broker
> > > > ignores this ack.
> > > >  -  If CommandPartitionUpdateResult#version ==
> > > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version
> > > >     - If CommandPartitionUpdateResult#success is true,  broker just
> > > removes
> > > > the watcherID from inFlightUpdate.
> > > >     - If CommandPartitionUpdateResult#success is false,  broker
> removes
> > > the
> > > > watcherId from inFlightUpdate, and queries all the concerned topics's
> > > > partition and resend.
> > > >  - If CommandPartitionUpdateResult#version >
> > > > PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version,
> this
> > > > should not happen.
> > > >
> > > >  ## Edge cases
> > > > - Broker restarts or crashes
> > > > Client will reconnect to another broker, broker responses
> > > > `CommandWatchPartitionUpdateSuccess` with watcher concerned topics's
> > > > partitions.  We will call `PartitionsUpdateListener` if the
> connection
> > > > opens.
> > > > - Client acks fail or timeout
> > > > Broker will resend the watcher concerned topics's partitions either
> > > client
> > > > acks fail or acks timeout.
> > > > - Partition updates before client acks.
> > > > `CommandPartitionUpdate#version` monotonically increases every time
> it is
> > > > updated. If Partition updates before client acks, a greater version
> will
> > > be
> > > > put into `PartitonUpdateWatcherService#inFlightUpdate`.  The previous
> > > acks
> > > > will be ignored because the version is less than the current version.
> > > >
> > > >
> > > > Asaf Mesika <asaf.mes...@gmail.com> 于2023年2月22日周三 21:33写道:
> > > >
> > > > > How about edge cases?
> > > > > In Andra's PIP he took into account cases where updates were lost,
> so
> > > he
> > > > > created a secondary poll. Not saying it's the best situation for
> your
> > > > case
> > > > > of course.
> > > > > I'm saying that when a broker sends an update
> CommandPartitionUpdate,
> > > how
> > > > > do you know it arrived successfully? From my memory, there is no
> ACK in
> > > > the
> > > > > protocol, saying "I'm the client, I got the update successfully"
> and
> > > only
> > > > > then it removed the "dirty" flag for that topic, for this watcher
> ID.
> > > > >
> > > > > Are there any other edge cases we can have? Let's be exhaustive.
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Feb 22, 2023 at 1:14 PM houxiaoyu <anonhx...@gmail.com>
> wrote:
> > > > >
> > > > > > Thanks for your great suggestion Enrico.
> > > > > >
> > > > > > I agreed with you. It's more reasonable to add a
> > > > > > `supports_partition_update_watchers`  in `FeatureFlags`  to
> detect
> > > that
> > > > > the
> > > > > > connected broker supporting this feature , and add a new broker
> > > > > > configuration property `enableNotificationForPartitionUpdate`
> with
> > > > > default
> > > > > > value true, which is much like PIP-145.
> > > > > >
> > > > > > I have updated the descriptions.
> > > > > >
> > > > > > Enrico Olivelli <eolive...@gmail.com> 于2023年2月22日周三 17:26写道:
> > > > > >
> > > > > > > I support this proposal.
> > > > > > > Coping here my comments from GH:
> > > > > > >
> > > > > > > can't we enable this by default in case we detect that the
> > > connected
> > > > > > > Broker supports it ?
> > > > > > > I can't find any reason for not using this mechanism if it is
> > > > > available.
> > > > > > >
> > > > > > > Maybe we can set the default to "true" and allow users to
> disable
> > > it
> > > > > > > in case it impacts their systems in an unwanted way.
> > > > > > >
> > > > > > > Maybe It would be useful to have a way to disable the
> mechanism on
> > > > the
> > > > > > > broker side as well
> > > > > > >
> > > > > > > Enrico
> > > > > > >
> > > > > > > Il giorno mer 22 feb 2023 alle ore 10:22 houxiaoyu
> > > > > > > <anonhx...@gmail.com> ha scritto:
> > > > > > > >
> > > > > > > > Hi Pulsar community:
> > > > > > > >
> > > > > > > > I opened a PIP to discuss "Notifications for partitions
> update"
> > > > > > > >
> > > > > > > > ### Motivation
> > > > > > > >
> > > > > > > > Pulsar client will poll brokers at fix time for checking the
> > > > > partitions
> > > > > > > > update if we publish/subscribe the partitioned topics with
> > > > > > > > `autoUpdatePartitions` as true. This causes unnecessary load
> for
> > > > > both
> > > > > > > > clients and brokers since most of the time the number of
> > > partitions
> > > > > > will
> > > > > > > > not change. In addition polling introduces latency in
> partitions
> > > > > update
> > > > > > > >  which is specified by `autoUpdatePartitionsInterval`.
> > > > > > > > This PIP would like to introduce a notification mechanism for
> > > > > partition
> > > > > > > > update, which is much like PIP-145 for regex subscriptions
> > > > > > > > https://github.com/apache/pulsar/issues/14505.
> > > > > > > >
> > > > > > > > For more details, please read the PIP at:
> > > > > > > > https://github.com/apache/pulsar/issues/19596
> > > > > > > > Looking forward to hearing your thoughts.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Xiaoyu Hou
> > > > > > > > ----
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
>

Reply via email to