Hi Penghui and Bo, I realized this problem as well. No new APIs should be added. I added the new API mainly to express that the semantics of seek will be modified while the acknowledge semantics keeps not changed. Now I removed the new APIs and updated this proposal.
Thanks, Yunze On Thu, Dec 22, 2022 at 10:55 AM 丛搏 <bog...@apache.org> wrote: > > > Users can use > > ``` > > MessageId.fromByteArrayWithTopic() > > ``` > > or > > ``` > > TopicMessageId.fromByteArray(); > > TopicMessageId.create(); > > ``` > > I think this is a good point. `TopicName` don't in the `MessageId` > means that multiConsumer must know that this topic is a partitioned > topic and needs to assign `TopicName` to `MessageId` and doesn't need > to add new interfaces. > > Thanks, > Bo > > PengHui Li <peng...@apache.org> 于2022年12月22日周四 09:50写道: > > > > > Because `TopicMessageId` is constructed by the Pulsar Client library > > itself, which can guarantee `getOwnerTopic()` returns the correct > > topic name. The benefit of passing a `TopicMessageId` rather than the > > combination of a topic name and a `MessageId` is, users won't need to > > care about how to get the correct topic name for a given partition by > > themselves. > > > > This is a good point of why we should not introduce seek(TopicName, > > MessageId) > > It's better also to mention this part in the proposal. It will help us to > > understand > > why seek(TopicName, MessageId) is not a good way for Pulsar. > > > > Thanks, > > Penghui > > > > On Thu, Dec 22, 2022 at 9:48 AM PengHui Li <peng...@apache.org> wrote: > > > > > > In short, `seek(msgId)` will call `seek(TopicMessageId)` if `msgId` is > > > a `TopicMessageId`. > > > > > > Does it look like we don't need to add the following new APIs? > > > > > > ``` > > > void seek(TopicMessageId topicMessageId) throws PulsarClientException; > > > CompletableFuture<Void> seekAsync(TopicMessageId topicMessageId); > > > ``` > > > > > > Users can use > > > ``` > > > MessageId.fromByteArrayWithTopic() > > > ``` > > > or > > > ``` > > > TopicMessageId.fromByteArray(); > > > TopicMessageId.create(); > > > ``` > > > to construct a TopicMessageId instance. > > > > > > But we can use the existing seek API with the constructed TopicMessageId > > > instance > > > > > > ``` > > > consumer.seek(MessageId messageId); > > > ``` > > > > > > Thanks, > > > Penghui > > > > > > On Thu, Dec 22, 2022 at 12:35 AM Yunze Xu <y...@streamnative.io.invalid> > > > wrote: > > > > > >> > but it needs to cast the `TopicMessageId` from `MessageId`, which is > > >> very user-unfriendly. > > >> > > >> Sorry I think my proposal doesn't express it well. In my original > > >> thought, no cast is needed, please see the update in > > >> https://github.com/apache/pulsar/issues/18616. > > >> > > >> In short, `seek(msgId)` will call `seek(TopicMessageId)` if `msgId` is > > >> a `TopicMessageId`. > > >> > > >> Thanks, > > >> Yunze > > >> > > >> On Wed, Dec 21, 2022 at 11:26 PM 丛搏 <bog...@apache.org> wrote: > > >> > > > >> > > If you mean `msg.getTopicName()`, how can you declare it's better > > >> > > than > > >> > > `msgId.getOwnerTopic()`? > > >> > > > >> > > It can. Because the `TopicMessageIdImpl` already contains the correct > > >> > > topic. That's the point. > > >> > > > >> > ``` > > >> > var msgId = (TopicMessageId) multiTopicsConsumer.getMessageId(); > > >> > ``` > > >> > if `msgId.getOwnerTopic()` is the interface of `MessageId`, I have no > > >> > problem. but it needs to cast the `TopicMessageId` from `MessageId`, > > >> > which is very user-unfriendly. And it doesn't make sense. > > >> > > > >> > > I don't know what you're thinking about using > > >> > > `consumer.seek(msg.getTopicName(), msg.getMessageId()` for a single > > >> > > topic consumer. If it's accepted, and you want to unify the use case > > >> > > of `seek`, the original `seek` API should be deprecated and much > > >> > > existing code could be affected. If it's not accepted, users have to > > >> > > distinguish if a consumer is a multi-topics consumer. > > >> > > > >> > the same as `consumer.seek(TopicMessageId topicMessageId)` for a > > >> > single topic consumer is also strange. My point is either TopicName > > >> > belongs to MessageId or separate the two. it's not a good interface > > >> > implementation to couple them together. Very unclear. > > >> > > > >> > Thanks, > > >> > Bo > > >> > > > >> > Yunze Xu <y...@streamnative.io.invalid> 于2022年12月21日周三 22:46写道: > > >> > > > > >> > > > If messageID does not contain `TopicName`, the `TopicName` is best > > >> get from msg. > > >> > > > > >> > > If you mean `msg.getTopicName()`, how can you declare it's better > > >> > > than > > >> > > `msgId.getOwnerTopic()`? > > >> > > > > >> > > > but it still cannot avoid `TopicName` for marking this `MessageID` > > >> belongs to this topic. > > >> > > > > >> > > It can. Because the `TopicMessageIdImpl` already contains the correct > > >> > > topic. That's the point. > > >> > > > > >> > > > if using `TopicMessageId` also has the same problem, why we need to > > >> use `TopicMessageId` not `MessageId` > > >> > > > > >> > > Because `TopicMessageId` is constructed by the Pulsar Client library > > >> > > itself, which can guarantee `getOwnerTopic()` returns the correct > > >> > > topic name. The benefit of passing a `TopicMessageId` rather than the > > >> > > combination of a topic name and a `MessageId` is, users won't need to > > >> > > care about how to get the correct topic name for a given partition by > > >> > > themselves. > > >> > > > > >> > > The key point is that if there is only one valid value for an > > >> > > argument, which relies on the other argument, then the API design is > > >> > > bad. Assume you need to use the Pulsar client like: > > >> > > > > >> > > ``` > > >> > > // numberOfMessages must be the same with msgIds.size(), otherwise, > > >> > > an > > >> > > exception will be thrown > > >> > > consumer.acknowledge(numberOfMessages, msgIds); > > >> > > ``` > > >> > > > > >> > > With the API of this proposal, users don't need to care much about > > >> > > how > > >> > > to call `seek` correctly, except the MessageId is returned by > > >> > > Producer#send. `consumer.seek(msg.getMessageId())` works for all > > >> > > cases. > > >> > > > > >> > > With the `seek(String, MessageId)` API, you have to write more > > >> > > explanations like: > > >> > > 1. If the consumer only subscribes to a topic, use > > >> > > `consumer.seek(msg.getMessageId())`. > > >> > > 2. If the consumer subscribes to multiple topics, use > > >> > > `consumer.seek(topic, msg.getMessageId())`. The topic must be what > > >> > > the > > >> > > message belongs to, so you have to use the correct topic like > > >> > > `consumer.seek(msg.getTopicName(), msg.getMessageId()`. Otherwise, > > >> > > seek would fail. > > >> > > > > >> > > I don't know what you're thinking about using > > >> > > `consumer.seek(msg.getTopicName(), msg.getMessageId()` for a single > > >> > > topic consumer. If it's accepted, and you want to unify the use case > > >> > > of `seek`, the original `seek` API should be deprecated and much > > >> > > existing code could be affected. If it's not accepted, users have to > > >> > > distinguish if a consumer is a multi-topics consumer. > > >> > > > > >> > > Thanks, > > >> > > Yunze > > >> > > > > >> > > On Wed, Dec 21, 2022 at 8:50 PM 丛搏 <bog...@apache.org> wrote: > > >> > > > > > >> > > > Hi, Yunze: > > >> > > > > > >> > > > < ```java > > >> > > > < var msg = multiTopicsConsumer.receive(); > > >> > > > < var msgId = (TopicMessageId) multiTopicsConsumer.getMessageId(); > > >> > > > < consumer.seek(msgId.getOwnerTopic(), msgId); > > >> > > > < ``` > > >> > > > > > >> > > > the code can be like this: > > >> > > > ```java > > >> > > > var msg = anyConsumer.receive(); > > >> > > > var msgId = anyConsume.getMessageId(); > > >> > > > consumer.seek(msg, msgId); > > >> > > > ``` > > >> > > > If messageID does not contain `TopicName`, the `TopicName` is best > > >> get from msg. > > >> > > > > > >> > > > < What's different is that the offset in Kafka can represent a > > >> position > > >> > > > < of ANY partition, while the MessageId in Pulsar can only > > >> represent the > > >> > > > < position of A SPECIFIC partition. > > >> > > > > > >> > > > Although MessageId in Pulsar can only represent the position of A > > >> > > > SPECIFIC partition, but it still needs a TopicName. `LedgerID` and > > >> > > > `EntryID` do not mean that this `MessageID` belongs to a topic > > >> > > > (although it does belong), but it still cannot avoid `TopicName` > > >> > > > for > > >> > > > marking this `MessageID` belongs to this topic. > > >> > > > > > >> > > > > And in Pulsar, we also do not expose > > >> > > > > the partition concept, if we introduce the seek API with the > > >> > > > > topic > > >> > > > > name as the argument, we have to explain in detail about what's > > >> the > > >> > > > > topic name for a partition. It could be a very confusing thing > > >> from my > > >> > > > > experience when I explained the "partition" concept in community. > > >> > > > > > >> > > > if using `TopicMessageId` also has the same problem, why we need to > > >> > > > use `TopicMessageId` not `MessageId` > > >> > > > > > >> > > > Thanks, > > >> > > > Bo > > >> > > > > > >> > > > Yunze Xu <y...@streamnative.io.invalid> 于2022年12月21日周三 16:59写道: > > >> > > > > > > >> > > > > Hi Bo, > > >> > > > > > > >> > > > > If we have the `seek` API that accepts a topic name, how to use > > >> seek > > >> > > > > for a single topic consumer and multi-topics consumer will be > > >> > > > > different. > > >> > > > > > > >> > > > > ```java > > >> > > > > var msg = singleTopicConsumer.receive(); > > >> > > > > var msgId = singleTopicConsumer.getMessageId(); > > >> > > > > consumer.seek(msgId); > > >> > > > > ``` > > >> > > > > > > >> > > > > ```java > > >> > > > > var msg = multiTopicsConsumer.receive(); > > >> > > > > var msgId = (TopicMessageId) multiTopicsConsumer.getMessageId(); > > >> > > > > consumer.seek(msgId.getOwnerTopic(), msgId); > > >> > > > > ``` > > >> > > > > > > >> > > > > It's not as clear as you have thought. A question could come from > > >> the > > >> > > > > code above: since we can get the key (topic name) from `msgId` > > >> itself, > > >> > > > > why do we need another argument? > > >> > > > > > > >> > > > > What's worse is that users have to specify the correct topic > > >> name. For > > >> > > > > a partitioned topic, if users specified another partition, the > > >> `seek` > > >> > > > > operation would fail. If they specified something like > > >> > > > > `multiTopicsConsumer.getTopic()`, it would also fail because > > >> > > > > other > > >> > > > > APIs like `Consumer#getTopic()` doesn't return the correct topic > > >> name. > > >> > > > > > > >> > > > > If there is only one correct topic name for a given > > >> TopicMessageId, > > >> > > > > what's the meaning of making it as a required argument? > > >> > > > > > > >> > > > > BTW, let's see Kafka client's commit API: > > >> > > > > > > >> > > > > ```java > > >> > > > > public void commitSync(Map<TopicPartition,OffsetAndMetadata> > > >> offsets) > > >> > > > > ``` > > >> > > > > > > >> > > > > What's different is that the offset in Kafka can represent a > > >> position > > >> > > > > of ANY partition, while the MessageId in Pulsar can only > > >> represent the > > >> > > > > position of A SPECIFIC partition. And in Pulsar, we also do not > > >> expose > > >> > > > > the partition concept, if we introduce the seek API with the > > >> > > > > topic > > >> > > > > name as the argument, we have to explain in detail about what's > > >> the > > >> > > > > topic name for a partition. It could be a very confusing thing > > >> from my > > >> > > > > experience when I explained the "partition" concept in community. > > >> > > > > > > >> > > > > Thanks, > > >> > > > > Yunze > > >> > > > > > > >> > > > > > > >> > > > > On Wed, Dec 21, 2022 at 3:20 PM 丛搏 <bog...@apache.org> wrote: > > >> > > > > > > > >> > > > > > Hi Yunze, > > >> > > > > > > > >> > > > > > add `TopicMessageId ` will couple messageID and `topic name` > > >> together, > > >> > > > > > which is very unclear for non-partition-topic. > > >> > > > > > > > >> > > > > > ``` > > >> > > > > > void seek(String topicName, MessageId messageId) throws > > >> PulsarClientException; > > >> > > > > > List<Map<String, MessageId>> getLastTopicMessageId() throws > > >> > > > > > PulsarClientException; > > >> > > > > > ``` > > >> > > > > > If the interface is designed in this way, it may be simpler, > > >> easier to > > >> > > > > > understand, and more intuitive for users, and MessageID will > > >> not be > > >> > > > > > coupled with TopicName. > > >> > > > > > > > >> > > > > > because this PIP has already initiated a VOTE, so I will sync > > >> this > > >> > > > > > reply to PIP-224-VOTE[0] > > >> > > > > > > > >> > > > > > Thanks, > > >> > > > > > Bo > > >> > > > > > [0] > > >> https://lists.apache.org/thread/mbrpjsgrgwrlkdpvkk738jxnlk7rf4qk > > >> > > > > > > > >> > > > > > Yunze Xu <y...@streamnative.io.invalid> 于2022年12月9日周五 14:33写道: > > >> > > > > > > > > >> > > > > > > Hi Jiaqi, > > >> > > > > > > > > >> > > > > > > Let's move to > > >> https://lists.apache.org/thread/mbrpjsgrgwrlkdpvkk738jxnlk7rf4qk > > >> > > > > > > for the vote. > > >> > > > > > > > > >> > > > > > > Thanks, > > >> > > > > > > Yunze > > >> > > > > > > > > >> > > > > > > On Fri, Dec 9, 2022 at 1:54 PM Jiaqi Shen < > > >> gleiphir2...@gmail.com> wrote: > > >> > > > > > > > > > >> > > > > > > > This is make sense to me, +1 > > >> > > > > > > > > > >> > > > > > > > Thanks, > > >> > > > > > > > Jiaqi Shen > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > Yunze Xu <y...@streamnative.io.invalid> 于2022年12月7日周三 > > >> 13:51写道: > > >> > > > > > > > > > >> > > > > > > > > Hi Baodi, > > >> > > > > > > > > > > >> > > > > > > > > I decided not to change the behavior of the > > >> `negativeAcknowledge` > > >> > > > > > > > > method. I just checked again that there is no exception > > >> signature for > > >> > > > > > > > > this method and there is no asynchronous version like > > >> > > > > > > > > `negativeAcknowledgeAsync`. To keep the API compatible, > > >> we should not > > >> > > > > > > > > add an exception signature, which would be required if a > > >> > > > > > > > > `PulsarClientException` was thrown. > > >> > > > > > > > > > > >> > > > > > > > > Thanks, > > >> > > > > > > > > Yunze > > >> > > > > > > > > > > >> > > > > > > > > On Tue, Nov 29, 2022 at 10:12 PM Baodi Shi > > >> <baodi....@icloud.com.invalid> > > >> > > > > > > > > wrote: > > >> > > > > > > > > > > > >> > > > > > > > > > Hi, Yunze: > > >> > > > > > > > > > > > >> > > > > > > > > > Thanks for your proposal. That Looks good to me. > > >> > > > > > > > > > > > >> > > > > > > > > > `negativeAcknowledge` also needs to add the same checks > > >> as the new > > >> > > > > > > > > acknowledge interface. > > >> > > > > > > > > > > > >> > > > > > > > > > > This interface doesn't add any acknowledge overload > > >> because the > > >> > > > > > > > > overloads are already too many. But it will make the > > >> behavior clear. > > >> > > > > > > > > > I think since we exposed the TopicMessageId, it would > > >> be better to add > > >> > > > > > > > > overloaded interfaces (even if the overloads are a lot). > > >> This can users to > > >> > > > > > > > > clearly associate the use cases of MultiTopicConsumer and > > >> TopicMessageId. > > >> > > > > > > > > > > > >> > > > > > > > > > Also, while it's okay to use TopicMessageId param on a > > >> single consumer, > > >> > > > > > > > > I guess we shouldn't allow users to use it. > > >> > > > > > > > > > > > >> > > > > > > > > > In this way, users are clearly aware that > > >> TopicMessageId is used when > > >> > > > > > > > > using MultiTopicConsumer and MessageId is used when using > > >> > > > > > > > > SingleTopicConsumer.(Maybe it's not a good idea) > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > Thanks, > > >> > > > > > > > > > Baodi Shi > > >> > > > > > > > > > > > >> > > > > > > > > > > 2022年11月29日 15:57,Yunze Xu > > >> <y...@streamnative.io.INVALID> 写道: > > >> > > > > > > > > > > > > >> > > > > > > > > > >> Is there a case where the user uses the messageId > > >> returned by the > > >> > > > > > > > > > > producer to seek in the consumer? Is this a good > > >> behavior? > > >> > > > > > > > > > > > > >> > > > > > > > > > > Yes. I think it should be acceptable. To correct my > > >> previous point, > > >> > > > > > > > > > > now I think the MessageId returned by send should > > >> also be able to be > > >> > > > > > > > > > > applied for seek or acknowledge. > > >> > > > > > > > > > > > > >> > > > > > > > > > >> even with the > > >> > > > > > > > > > > current proposal, it may return null when getting the > > >> topic from > > >> > > > > > > > > > > TopicMessageId for backward compatibility. > > >> > > > > > > > > > > > > >> > > > > > > > > > > No. It may return null just because Java doesn't > > >> allow a non-null > > >> > > > > > > > > > > returned value. The internal implementations of > > >> > > > > > > > > > > TopicMessageId#getOwerTopic should return a non-null > > >> topic name to > > >> > > > > > > > > > > avoid null check. > > >> > > > > > > > > > > > > >> > > > > > > > > > > When I mentioned **the implementation of > > >> getTopicName() must return > > >> > > > > > > > > > > null**, the assumption is that MessageId#toByteArray > > >> serializes the > > >> > > > > > > > > > > topic name if adding the `getTopicName()` method. > > >> However, in this > > >> > > > > > > > > > > proposal, `TopicMessageId#toByteArray` won't. See the > > >> implementation > > >> > > > > > > > > > > of `TopicMessageId#create`. It's only a wrapper for > > >> an arbitrary > > >> > > > > > > > > > > MessageId implementation. > > >> > > > > > > > > > > > > >> > > > > > > > > > > Thanks, > > >> > > > > > > > > > > Yunze > > >> > > > > > > > > > > > > >> > > > > > > > > > > On Tue, Nov 29, 2022 at 2:47 PM Zike Yang < > > >> z...@apache.org> wrote: > > >> > > > > > > > > > >> > > >> > > > > > > > > > >> Hi Yunze, > > >> > > > > > > > > > >> > > >> > > > > > > > > > >> Thanks for your proposal. Quoted from your GitHub > > >> comments[0]: > > >> > > > > > > > > > >> > > >> > > > > > > > > > >>> There is also a case when MessageId is returned > > >> from Producer#send. > > >> > > > > > > > > In this case, the returned MessageId should only used for > > >> serialization > > >> > > > > > > > > > >> > > >> > > > > > > > > > >> Is there a case where the user uses the messageId > > >> returned by the > > >> > > > > > > > > > >> producer to seek in the consumer? Is this a good > > >> behavior? > > >> > > > > > > > > > >> > > >> > > > > > > > > > >>> If we added the method directly to MessageId, to > > >> keep the backward > > >> > > > > > > > > compatibility, the implementation of getTopicName() must > > >> return null, which > > >> > > > > > > > > is not a good design. > > >> > > > > > > > > > >> > > >> > > > > > > > > > >> I think it's a trade-off. If I understand correctly, > > >> even with the > > >> > > > > > > > > > >> current proposal, it may return null when getting > > >> the topic from > > >> > > > > > > > > > >> TopicMessageId for backward compatibility. The > > >> current > > >> > > > > > > > > > >> TopicMessageIdImpl doesn't serialize the topic > > >> information. > > >> > > > > > > > > > >> > > >> > > > > > > > > > >> > > >> > > > > > > > > > >> [0] > > >> > > > > > > > > > > >> https://github.com/apache/pulsar/issues/18616#issuecomment-1328609346 > > >> > > > > > > > > > >> > > >> > > > > > > > > > >> Thanks, > > >> > > > > > > > > > >> Zike Yang > > >> > > > > > > > > > >> > > >> > > > > > > > > > >> On Mon, Nov 28, 2022 at 12:22 PM Yunze Xu > > >> > > > > > > > > <y...@streamnative.io.invalid> wrote: > > >> > > > > > > > > > >>> > > >> > > > > > > > > > >>> Hi all, > > >> > > > > > > > > > >>> > > >> > > > > > > > > > >>> I've opened a PIP to discuss: > > >> > > > > > > > > https://github.com/apache/pulsar/issues/18616. > > >> > > > > > > > > > >>> > > >> > > > > > > > > > >>> The consumer's MessageId related APIs have some > > >> hidden requirements > > >> > > > > > > > > > >>> and flakiness and some behaviors are not documented > > >> well. This > > >> > > > > > > > > > >>> proposal will introduce a TopicMessageId interface > > >> that exposes a > > >> > > > > > > > > > >>> method to get a message's owner topic. > > >> > > > > > > > > > >>> > > >> > > > > > > > > > >>> P.S. There was an email [1] that didn't add the > > >> "[DISCUSS]" label, > > >> > > > > > > > > > >>> which might be a little confusing. So I sent the > > >> email again for > > >> > > > > > > > > > >>> discussion. Please do not reply to the previous > > >> email. > > >> > > > > > > > > > >>> > > >> > > > > > > > > > >>> [1] > > >> https://lists.apache.org/thread/6gj16pmrjk6ncsd30xrl20pr5ng6t61o > > >> > > > > > > > > > >>> > > >> > > > > > > > > > >>> Thanks, > > >> > > > > > > > > > >>> Yunze > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > >