Hi Michael,

> is there a reason that we couldn't also use this to improve PIP 145?

The protocol described in this PIP could also be used to improve PIP-145.
However I think that it' not a good reason that we use  the regex sub
watcher to implement the partitioned update watcher because of the other
reasons we mentioned above.

> Since we know we're using a TCP connection, is it possible to rely on
> pulsar's keep alive timeout (the broker and the client each have their
> own) to close a connection that isn't responsive?

Maybe it could fail on application layer I think,  for example, the
partitioned update listener run fail unexceptionly.  Currently another task
will be scheduled if the poll task encounters error in partition auto
update timer task. [0]

> Regarding the connection, which connection should the client use to send
the watch requests?

The `PartitionUpdateWatcher` will call `connectionHandler.grabCnx()` to
open an connection, which is analogous to `TopicListWatcher`. [1]

> do we plan on using metadata storenotifications to trigger the callbacks
that trigger notifications sent
> to the clients

Yes, we will just look up the metadataStore to fetch the count of the
partitions and register a watcher to the metadataStore to trigger the count
update.

> One nit on the protobuf for CommandWatchPartitionUpdateSuccess:
>
>    repeated string topics         = 3;
>   repeated uint32 partitions     = 4;
>
> What do you think about using a repeated message that represents a
> pair of a topic and its partition count instead of using two lists?

Great. It looks better using a repeated message, I will update the protobuf.

> How will we handle the case where a watched topic does not exist?

1. When `PulsarClient` calls `create()` to create a producer or  calls
`subscribe()` to create a consumer,  the client will first get
partitioned-topic metadata from broker, [2]. If the topic doesn't exist and
`isAllowAutoTopicCreation=true` in broker, the partitioned-topic zk node
will auto create with default partition num.
2.  After the client getting partitioned-topic metadata successfully,  the
`PartitionedProducerImpl` will be create if `meta.partition >
0`.  `PartitionUpdateWatcher` will be initilized in
`PartitionedProducerImpl` constructor. The `PartitionUpdateWatcher` sends
command to broker to register a watcher. If any topic in the topicList
doesn't exist,  the broker will send error to the client and the
`PartitionedProducerImpl` will start fail.  `MultiTopicsConsumerImpl` will
work in the same way.

> I want to touch on authorization. A role should have "lookup"
> permission to watch for updates on each partitioned topic that it
> watches. As a result, if we allow for a request to watch multiple
> topics, some might succeed while others fail. How do we handle partial
> success?

If any topic in the topicList authorizes fail, the broker will send error
to the client. The following reasons support this action:
1. Before we sending command to register a partition update watcher, the
client should have send the `CommandPartitionedTopicMetadata` and should
have the `lookup` permission [3] [4].
2. Currently if any topic subsrbies fail the consumer wil start faiil. [5]


[0]
https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L1453-L1461

[1]
https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java#L67-L81

[2]
https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java#L365-L371

[3]
https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L903-L923

[4]
https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L558-L560

[5]
https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L171-L193

Thanks,
Xiaoyu Hou

Michael Marshall <mmarsh...@apache.org> 于2023年3月7日周二 15:43写道:

> Thanks for the context Xiaoyu Hou and Asaf. I appreciate the
> efficiencies that we can gain by creating a specific implementation
> for the partitioned topic use case. I agree that this new notification
> system makes sense based on Pulsar's current features, and I have some
> implementation questions.
>
> >- If the broker sends notification and it's lost due network issues,
> > you'll only know about it due to the client doing constant polling, using
> > its hash to minimize response.
>
> I see that we implemented an ack mechanism to get around this. I
> haven't looked closely, but is there a reason that we couldn't also
> use this to improve PIP 145?
>
> Since we know we're using a TCP connection, is it possible to rely on
> pulsar's keep alive timeout (the broker and the client each have their
> own) to close a connection that isn't responsive? Then, when the
> connection is re-established, the client would get the latest topic
> partition count.
>
> Regarding the connection, which connection should the client use to
> send the watch requests? At the moment, the "parent" partitioned topic
> does not have an owner, but perhaps it would help this design to make
> a single owner for a given partitioned topic. This could trivially be
> done using the existing bundle mapping. Then, all watchers for a given
> partitioned topic would be hosted on the same broker, which should be
> more efficient. I don't think we currently redirect clients to any
> specific bundle when creating the metadata for a partitioned topic,
> but if we did, then we might be able to remove some edge cases for
> notification delivery because a single broker would update the
> metadata store and then trigger the notifications to the clients. If
> we don't use this implementation, do we plan on using metadata store
> notifications to trigger the callbacks that trigger notifications sent
> to the clients?
>
> > - Each time meta-update you'll need to run it through regular
> > expression, on all topics hosted on the broker, for any given client.
> > That's a lot of CPU.
> > - Suggested mechanism mainly cares about the count of partitions, so
> > it's a lot more efficient.
>
> I forgot the partition count was its own piece of metadata that the
> broker can watch for. That part definitely makes sense to me.
>
> One nit on the protobuf for CommandWatchPartitionUpdateSuccess:
>
>     repeated string topics         = 3;
>     repeated uint32 partitions     = 4;
>
> What do you think about using a repeated message that represents a
> pair of a topic and its partition count instead of using two lists?
>
> How will we handle the case where a watched topic does not exist?
>
> I want to touch on authorization. A role should have "lookup"
> permission to watch for updates on each partitioned topic that it
> watches. As a result, if we allow for a request to watch multiple
> topics, some might succeed while others fail. How do we handle partial
> success?
>
> One interesting detail is that this PIP is essentially aligned with
> notifying clients when topic metadata changes while PIP 145 was
> related to topic creation itself. An analogous proposal could request
> a notification for any topic that gets a new metadata label. I do not
> think it is worth considering that case in this design.
>
> Thanks,
> Michael
>
> [0] https://lists.apache.org/thread/t4cwht08d4mhp3qzoxmqh6tht8l0728r
>
> On Sun, Mar 5, 2023 at 8:01 PM houxiaoyu <anonhx...@gmail.com> wrote:
> >
> > Bump. Are there other concerns or suggestions about this PIP :)  Ping @
> > Michael @Joe @Enrico
> >
> > Thanks
> > Xiaoyu Hou
> >
> > houxiaoyu <anonhx...@gmail.com> 于2023年2月27日周一 14:10写道:
> >
> > > Hi Joe and Michael,
> > >
> > > I think I misunderstood what you replied before. Now I understand and
> > > explain it again.
> > >
> > > Besides the reasons what Asaf mentioned above, there are also some
> limits
> > > for using topic list watcher.  For example the `topicsPattern.pattern`
> must
> > > less that `maxSubscriptionPatternLeng` [0]. If the consumer subscribes
> > > multi partitioned-topics, the `topicsPattern.pattern` maybe very long.
> > >
> > > So I think that it's better to have a separate notification
> implementation
> > > for partition update.
> > >
> > > [0]
> > >
> https://github.com/apache/pulsar/blob/5d6932137d76d544f939bef27df25f61b4a4d00d/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java#L115-L126
> > >
> > > Thanks,
> > > Xiaoyu Hou
> > >
> > > houxiaoyu <anonhx...@gmail.com> 于2023年2月27日周一 10:56写道:
> > >
> > >> Hi Michael,
> > >>
> > >> >  I think we just need the client to "subscribe" to a topic
> notification
> > >> for
> > >> >  "<topic-name>-partition-[0-9]+" to eliminate the polling
> > >>
> > >> If pulsar users want to pub/sub a partitioned-topic, I think most of
> the
> > >> users would like to create a simple producer or consumer like
> following:
> > >> ```
> > >> Producer<byte[]> producer =
> client.newProducer().topic(topic).create();
> > >> producer.sendAsync(msg);
> > >> ```
> > >> ```
> > >> client.newConsumer()
> > >>         .topic(topic)
> > >>         .subscriptionName(subscription)
> > >>         .subscribe();
> > >> ```
> > >> I think there is no reason for users to use `topicsPattern` if a
> pulsar
> > >> just wants to subscribe a partitioned-topic. In addition,
> `topicsPattern`
> > >> couldn't be used for producers.
> > >>
> > >> So I think PIP-145 [0] will benefit for regex subscriptions.  And this
> > >> PIP [1] will benefit for the common partitioned-topic pub/sub
> scenario.
> > >>
> > >> [0] https://github.com/apache/pulsar/issues/14505
> > >> [1] https://github.com/apache/pulsar/issues/19596
> > >>
> > >> Thanks
> > >> Xiaoyu Hou
> > >>
> > >> Michael Marshall <mmarsh...@apache.org> 于2023年2月25日周六 01:29写道:
> > >>
> > >>> > Just the way to implements partitioned-topic metadata
> > >>> > notification mechanism is much like notifications on regex sub
> changes
> > >>>
> > >>> Why do we need a separate notification implementation? The regex
> > >>> subscription feature is about discovering topics (not subscriptions)
> > >>> that match a regular expression. As Joe mentioned, I think we just
> > >>> need the client to "subscribe" to a topic notification for
> > >>> "<topic-name>-partition-[0-9]+" to eliminate the polling.
> > >>>
> > >>> Building on PIP 145, the work for this PIP would be in implementing a
> > >>> different `TopicsChangedListener` [1] so that the result of an added
> > >>> topic is to add a producer/consumer to the new partition.
> > >>>
> > >>> I support removing polling in our streaming platform, but I'd prefer
> > >>> to limit the number of notification systems we implement.
> > >>>
> > >>> Thanks,
> > >>> Michael
> > >>>
> > >>> [0] https://github.com/apache/pulsar/pull/16062
> > >>> [1]
> > >>>
> https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java#L169-L175
> > >>>
> > >>>
> > >>>
> > >>> On Fri, Feb 24, 2023 at 1:57 AM houxiaoyu <anonhx...@gmail.com>
> wrote:
> > >>> >
> > >>> > Hi Joe,
> > >>> >
> > >>> > When we use PartitionedProducerImpl or MultiTopicsConsumerImpl,
> there
> > >>> is a
> > >>> > poll task to fetch the metadata of the partitioned-topic regularly
> for
> > >>> the
> > >>> > number of partitions updated.  This PIP wants to use a
> > >>> > notification mechanism to replace the metadata poll task.
> > >>> >
> > >>> > Just the way to implements partitioned-topic metadata
> > >>> > notification mechanism is much like notifications on regex sub
> changes
> > >>> >
> > >>> > Joe F <joefranc...@gmail.com> 于2023年2月24日周五 13:37写道:
> > >>> >
> > >>> > > Why is this needed when we have notifications on regex sub
> changes?
> > >>> Aren't
> > >>> > > the partition names a well-defined regex?
> > >>> > >
> > >>> > > Joe
> > >>> > >
> > >>> > > On Thu, Feb 23, 2023 at 8:52 PM houxiaoyu <anonhx...@gmail.com>
> > >>> wrote:
> > >>> > >
> > >>> > > > Hi Asaf,
> > >>> > > > thanks for your reminder.
> > >>> > > >
> > >>> > > > ## Changing
> > >>> > > > I have updated the following changes to make sure the
> notification
> > >>> > > arrived
> > >>> > > > successfully:
> > >>> > > > 1. The watch success response
> `CommandWatchPartitionUpdateSuccess`
> > >>> will
> > >>> > > > contain all the concerned topics of this watcher
> > >>> > > > 2. The notification `CommandPartitionUpdate` will always
> contain
> > >>> all the
> > >>> > > > concerned topics of this watcher.
> > >>> > > > 3. The notification `CommandPartitionUpdate`contains a
> > >>> monotonically
> > >>> > > > increased version.
> > >>> > > > 4. A map
> > >>> `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/,
> > >>> > > > Pair<version, long/*timestamp*/>>` will keep track of the
> updating
> > >>> > > > 5. A timer will check the updating timeout through
> `inFlightUpdate`
> > >>> > > > 6. The client acks `CommandPartitionUpdateResult` to broker
> when it
> > >>> > > > finishes updating.
> > >>> > > >
> > >>> > > > ## Details
> > >>> > > >
> > >>> > > > The following mechanism could make sure the newest notification
> > >>> arrived
> > >>> > > > successfully, copying the description from GH:
> > >>> > > >
> > >>> > > > A new class, `org.apache.pulsar.PartitonUpdateWatcherService`
> will
> > >>> keep
> > >>> > > > track of watchers and will listen to the changes in the
> metadata.
> > >>> > > Whenever
> > >>> > > > a topic partition updates it checks if any watchers should be
> > >>> notified
> > >>> > > and
> > >>> > > > sends an update for all topics the watcher concerns through the
> > >>> > > ServerCnx.
> > >>> > > > Then we will record this request into a map,
> > >>> > > > `PartitonUpdateWatcherService#inFlightUpdate<long/*watchID*/,
> > >>> > > Pair<version,
> > >>> > > > long/*timestamp*/>>`.  A timer will check this update timeout
> > >>> through
> > >>> > > > inFlightUpdate .  We will query all the concerned topics's
> > >>> partition if
> > >>> > > > this watcher has sent an update timeout and will resend it.
> > >>> > > >
> > >>> > > > The client acks `CommandPartitionUpdateResult` to broker when
> it
> > >>> finishes
> > >>> > > > updating.  The broker handle `CommandPartitionUpdateResult`
> > >>> request:
> > >>> > > >  - If CommandPartitionUpdateResult#version <
> > >>> > > >
> PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version,
> > >>> > > broker
> > >>> > > > ignores this ack.
> > >>> > > >  -  If CommandPartitionUpdateResult#version ==
> > >>> > > >
> PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version
> > >>> > > >     - If CommandPartitionUpdateResult#success is true,  broker
> just
> > >>> > > removes
> > >>> > > > the watcherID from inFlightUpdate.
> > >>> > > >     - If CommandPartitionUpdateResult#success is false,  broker
> > >>> removes
> > >>> > > the
> > >>> > > > watcherId from inFlightUpdate, and queries all the concerned
> > >>> topics's
> > >>> > > > partition and resend.
> > >>> > > >  - If CommandPartitionUpdateResult#version >
> > >>> > > >
> > >>> PartitonUpdateWatcherService#inFlightUpdate.get(watcherID).version,
> this
> > >>> > > > should not happen.
> > >>> > > >
> > >>> > > >  ## Edge cases
> > >>> > > > - Broker restarts or crashes
> > >>> > > > Client will reconnect to another broker, broker responses
> > >>> > > > `CommandWatchPartitionUpdateSuccess` with watcher concerned
> > >>> topics's
> > >>> > > > partitions.  We will call `PartitionsUpdateListener` if the
> > >>> connection
> > >>> > > > opens.
> > >>> > > > - Client acks fail or timeout
> > >>> > > > Broker will resend the watcher concerned topics's partitions
> either
> > >>> > > client
> > >>> > > > acks fail or acks timeout.
> > >>> > > > - Partition updates before client acks.
> > >>> > > > `CommandPartitionUpdate#version` monotonically increases every
> > >>> time it is
> > >>> > > > updated. If Partition updates before client acks, a greater
> > >>> version will
> > >>> > > be
> > >>> > > > put into `PartitonUpdateWatcherService#inFlightUpdate`.  The
> > >>> previous
> > >>> > > acks
> > >>> > > > will be ignored because the version is less than the current
> > >>> version.
> > >>> > > >
> > >>> > > >
> > >>> > > > Asaf Mesika <asaf.mes...@gmail.com> 于2023年2月22日周三 21:33写道:
> > >>> > > >
> > >>> > > > > How about edge cases?
> > >>> > > > > In Andra's PIP he took into account cases where updates were
> > >>> lost, so
> > >>> > > he
> > >>> > > > > created a secondary poll. Not saying it's the best situation
> for
> > >>> your
> > >>> > > > case
> > >>> > > > > of course.
> > >>> > > > > I'm saying that when a broker sends an update
> > >>> CommandPartitionUpdate,
> > >>> > > how
> > >>> > > > > do you know it arrived successfully? From my memory, there
> is no
> > >>> ACK in
> > >>> > > > the
> > >>> > > > > protocol, saying "I'm the client, I got the update
> successfully"
> > >>> and
> > >>> > > only
> > >>> > > > > then it removed the "dirty" flag for that topic, for this
> > >>> watcher ID.
> > >>> > > > >
> > >>> > > > > Are there any other edge cases we can have? Let's be
> exhaustive.
> > >>> > > > >
> > >>> > > > >
> > >>> > > > >
> > >>> > > > > On Wed, Feb 22, 2023 at 1:14 PM houxiaoyu <
> anonhx...@gmail.com>
> > >>> wrote:
> > >>> > > > >
> > >>> > > > > > Thanks for your great suggestion Enrico.
> > >>> > > > > >
> > >>> > > > > > I agreed with you. It's more reasonable to add a
> > >>> > > > > > `supports_partition_update_watchers`  in `FeatureFlags`  to
> > >>> detect
> > >>> > > that
> > >>> > > > > the
> > >>> > > > > > connected broker supporting this feature , and add a new
> broker
> > >>> > > > > > configuration property
> `enableNotificationForPartitionUpdate`
> > >>> with
> > >>> > > > > default
> > >>> > > > > > value true, which is much like PIP-145.
> > >>> > > > > >
> > >>> > > > > > I have updated the descriptions.
> > >>> > > > > >
> > >>> > > > > > Enrico Olivelli <eolive...@gmail.com> 于2023年2月22日周三
> 17:26写道:
> > >>> > > > > >
> > >>> > > > > > > I support this proposal.
> > >>> > > > > > > Coping here my comments from GH:
> > >>> > > > > > >
> > >>> > > > > > > can't we enable this by default in case we detect that
> the
> > >>> > > connected
> > >>> > > > > > > Broker supports it ?
> > >>> > > > > > > I can't find any reason for not using this mechanism if
> it is
> > >>> > > > > available.
> > >>> > > > > > >
> > >>> > > > > > > Maybe we can set the default to "true" and allow users to
> > >>> disable
> > >>> > > it
> > >>> > > > > > > in case it impacts their systems in an unwanted way.
> > >>> > > > > > >
> > >>> > > > > > > Maybe It would be useful to have a way to disable the
> > >>> mechanism on
> > >>> > > > the
> > >>> > > > > > > broker side as well
> > >>> > > > > > >
> > >>> > > > > > > Enrico
> > >>> > > > > > >
> > >>> > > > > > > Il giorno mer 22 feb 2023 alle ore 10:22 houxiaoyu
> > >>> > > > > > > <anonhx...@gmail.com> ha scritto:
> > >>> > > > > > > >
> > >>> > > > > > > > Hi Pulsar community:
> > >>> > > > > > > >
> > >>> > > > > > > > I opened a PIP to discuss "Notifications for partitions
> > >>> update"
> > >>> > > > > > > >
> > >>> > > > > > > > ### Motivation
> > >>> > > > > > > >
> > >>> > > > > > > > Pulsar client will poll brokers at fix time for
> checking
> > >>> the
> > >>> > > > > partitions
> > >>> > > > > > > > update if we publish/subscribe the partitioned topics
> with
> > >>> > > > > > > > `autoUpdatePartitions` as true. This causes unnecessary
> > >>> load for
> > >>> > > > > both
> > >>> > > > > > > > clients and brokers since most of the time the number
> of
> > >>> > > partitions
> > >>> > > > > > will
> > >>> > > > > > > > not change. In addition polling introduces latency in
> > >>> partitions
> > >>> > > > > update
> > >>> > > > > > > >  which is specified by `autoUpdatePartitionsInterval`.
> > >>> > > > > > > > This PIP would like to introduce a notification
> mechanism
> > >>> for
> > >>> > > > > partition
> > >>> > > > > > > > update, which is much like PIP-145 for regex
> subscriptions
> > >>> > > > > > > > https://github.com/apache/pulsar/issues/14505.
> > >>> > > > > > > >
> > >>> > > > > > > > For more details, please read the PIP at:
> > >>> > > > > > > > https://github.com/apache/pulsar/issues/19596
> > >>> > > > > > > > Looking forward to hearing your thoughts.
> > >>> > > > > > > >
> > >>> > > > > > > > Thanks,
> > >>> > > > > > > > Xiaoyu Hou
> > >>> > > > > > > > ----
> > >>> > > > > > >
> > >>> > > > > >
> > >>> > > > >
> > >>> > > >
> > >>> > >
> > >>>
> > >>
>

Reply via email to