FWIW, the Flink Pulsar connector hacky parses the message id internals to
get the next message id:
https://github.com/apache/flink/blob/421f057a7488fd64854a82424755f76b89561a0b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/MessageIdUtils.java

Enrico Olivelli <eolive...@gmail.com>于2022年11月10日 周四01:03写道:

> After reading Joe's comments I have changed my mind.
> Actually it is better to not expose "ledgerId" and "entryId" to client
> applications.
> They are useless pieces of information.
> And also if in the future we want to change the way we internally
> address a message we will always have to support these fields.
>
> toByteArray() is enough for an application to save the ID into an
> external database and then to recover a Subscription (or a Reader)
> from a certain point.
> toString() is good only for debug/logs, we can change it but it is
> better to not touch it and add "tests"
>
> It is better that the MessageId API stays as opaque as possible.
>
> Enrico
>
>
> Il giorno mer 9 nov 2022 alle ore 15:50 Yunze Xu
> <y...@streamnative.io.invalid> ha scritto:
> >
> > Hi Jiaqi,
> >
> > > I don't think `tostring` should be used in any serious case because it
> has
> > no standard.
> >
> > I agree. But it's better to keep it not changed. Just like my previous
> reply, it
> > might be a de-facto standard because the `toString()` like methods are
> used
> > in logging, not only for debugging. For example, there is a
> getLastMessageId
> > API in consumer and users might log the last message ID.
> >
> > Different representations across different languages is not a big issue
> but it
> > could make users and administrators confused.
> >
> > Here is an example that the C++ client supports constructing a MessageId
> with
> > 4 arguments, but the 1st argument is the partition, not the ledger id.
> > However, the
> > string representation is still
> > "<ledger-id>:<entry-id>:<partition>:<batch-index>". Though
> > in Java client a non-batched message ID doesn't have the
> > ":<batch-index>" suffix.
> >
> > Thanks,
> > Yunze
> >
> > On Wed, Nov 9, 2022 at 9:13 PM Jiaqi Shen <gleiphir2...@gmail.com>
> wrote:
> > >
> > > Thanks, this is very inspiring to me.
> > >
> > > But I have a different opinion on `tostring`.
> > >
> > > >>You can only see a representation from `toString` method and got some
> > > output like "0:0:-1:0".
> > >
> > > I don't think `tostring` should be used in any serious case because it
> has
> > > no standard. There is no constraint on how the messageId should be
> > > converted to a string. For example, in go client, `tostring` is not
> being
> > > supported now. If go client should implement a `tostring` method, does
> go
> > > client' s`tostring` must follow the java implement like "0:0:-1:0"?
> > >
> > > If user do need a string/[]byte to record a messageId, `toByteArray`
> will
> > > be enough. In user side, most of the time , I think users don't really
> care
> > > about the "messageId string" is meaningful. I think `tostring` only
> should
> > > be used in debug.
> > >
> > > Thanks,
> > > Jiaqi Shen
> > >
> > >
> > > Joe F <joefranc...@gmail.com> 于2022年11月9日周三 20:25写道:
> > >
> > > > Messageid is an identifier which identifies a message.  How that id
> is
> > > > constructed, or what it contains should not  matter to an
> application,  and
> > > > an application should not assume anything about the implementation
> of that
> > > > id.
> > > >
> > > > >What about the partition index? We have a `TopicMetadata` interface
> that
> > > > returns the number of partitions.
> > > >
> > > > Partitioning is a first class concept, and is  designed to be used by
> > > > application.  How a partition is implemented  should not be used by
> the
> > > > application .
> > > >
> > > >  [ People violate this all the time, and I regret that Pulsar did not
> > > > provide get_Nth_topicpartion(), which led to people hardcoding it  as
> > > > topicname-N. and using that directly.  Now we are stuck with it.]
> > > >
> > > >  Similarly batch index and batch size. Those are all logical concepts
> > > > exposed to the user.  For eg: batch size is something the app is
> allowed to
> > > > tune
> > > >
> > > > >Even for ledger id and entry id, this pair represents a logic
> storage
> > > > position like the offset concept in Kafka
> > > > These are not equivalent.   In Pulsar these are implementation
> details,
> > > > while in Kafka those are logical concepts.
> > > >
> > > > One might think that these are logical concepts in Pulsar, because
> if you
> > > > reverse engineer the current msgid implementation, you observe some
> > > > "properties".
> > > >
> > > > Ledger id/entry id are logical concepts in __Bookkeeper__ , not  in
> Pulsar.
> > > > There is the Managed Ledger abstraction on top of BK, and then there
> is
> > > > Pulsar on top of ML. You will break two levels of abstraction to
> expose
> > > > ledger/entryid to an application
> > > >
> > > > An application  should only care about the  operations that  can be
> done
> > > > with a messageId
> > > >
> > > > - getmsgid() to return the message id  as an opaque object
> > > >
> > > > [Operators   using  one messageId ]
> > > > -serde,   like tostring(). for storage/retrieval of message
> identifier
> > > > -getter/setter on logical properties of the message (partition id
> etc...)
> > > > -increment/decrement
> > > >
> > > > [Operators that take multiple messageIds]
> > > > -comparator
> > > > -range
> > > >
> > > > Those are the kind of operators Pulsar should provide to a user.
> > > > Applications should not implement these operators on their own by
> reverse
> > > > engineering the msgId. No application should be directly using
> ledgerid or
> > > > entryid for doing anything (math or logic),
> > > >
> > > >   As long as Pulsar provides  these operations  with msgid to the
> > > > application,  it should not care whether it's represented as
> "0:1:-1:-1"
> > > > or  "a:b:-b-b", or   "#xba4231!haxcy1826923f" or as a serialized
> binary
> > > > object or..whatever it may be.
> > > >
> > > > >>But it would be harder to know a tuple like "0:1:-1:-1" means.
> > > >
> > > > A user shouldn't have to know what this means. That's the point.
> > > >
> > > > Pulsar itself changed the messageId multiple times as it added
> > > > partitioning, batching and so on, and it might do so again. And
> bookkeeper
> > > > could change its representation of  ledgers, (for eg,  to uuids and
> byte
> > > > offsets)  ML could replace BK with something else  (for eg.  a table
> in a
> > > > db.)  Anything is possible - Pulsar would then just have to change
> the
> > > > implementation of the operator functions, and no application needs
> to be
> > > > rewritten.
> > > >
> > > > -j
> > > >
> > > > On Tue, Nov 8, 2022 at 6:05 PM Yunze Xu <y...@streamnative.io.invalid
> >
> > > > wrote:
> > > >
> > > > > Hi Joe,
> > > > >
> > > > > Then what would we expect users to do with the MessageId? It
> should only
> > > > > be passed to Consumer#seek or ReaderBuilder#startMessageId?
> > > > >
> > > > > What about the partition index? We have a `TopicMetadata`
> interface that
> > > > > returns
> > > > > the number of partitions. If the partition is also "implementation
> > > > > details", should we expose
> > > > > this interface? Or should we support customizing a MessageRouter
> because
> > > > it
> > > > > returns the partition index?
> > > > >
> > > > > What about the batch index and batch size? For example, we have an
> > > > > enableBatchIndexAcknowledgment method to enable batch index ACK.
> If batch
> > > > > index is also "implementation details", how could users know what
> does
> > > > > "batch
> > > > > index ack" mean?
> > > > >
> > > > > Even for ledger id and entry id, this pair represents a logic
> storage
> > > > > position like the offset
> > > > > concept in Kafka (though each offset represents a message while
> each
> > > > > entry represents
> > > > > a batch). If you see the Message API, it also exposes many
> attributes.
> > > > > IMO, for the
> > > > > MessageIdData, only the ack_set (a long array serialized from the
> > > > > BitSet) is the implementation
> > > > > detail.
> > > > >
> > > > > The MessageId API should be flexible, not an abstract one. If not,
> why
> > > > > do we still implement
> > > > > the toString() method? We should not encourage users to print the
> > > > > MessageId. It would
> > > > > be easy to know what "ledger is 0, entry id is 1" means, users only
> > > > > need to know the concepts
> > > > > of ledger id and entry id. But it would be harder to know a tuple
> like
> > > > > "0:1:-1:-1" means.
> > > > >
> > > > > Thanks,
> > > > > Yunze
> > > > >
> > > > > On Tue, Nov 8, 2022 at 11:16 PM Joe F <joefranc...@gmail.com>
> wrote:
> > > > > >
> > > > > > >Maybe this design is to hidden some details, but if
> > > > > > users don't know the details like ledger id and entry id, how
> could
> > > > > > you know what does "0:0:-1:0" mean?
> > > > > >
> > > > > >  Abstractions exist for a reason. Ledgerid and entryid are
> > > > > implementation
> > > > > > details, and an application should not be interpreting that at
> all.
> > > > > > -j
> > > > > >
> > > > > >
> > > > > > On Tue, Nov 8, 2022 at 3:43 AM Yunze Xu
> <y...@streamnative.io.invalid>
> > > > > > wrote:
> > > > > >
> > > > > > > I didn't look into these two methods at the moment. But I
> think it's
> > > > > > > possible to
> > > > > > > retain only the `fromByteArray`.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Yunze
> > > > > > >
> > > > > > > On Tue, Nov 8, 2022 at 7:02 PM Enrico Olivelli <
> eolive...@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > Il giorno mar 8 nov 2022 alle ore 11:52 Yunze Xu
> > > > > > > > <y...@streamnative.io.invalid> ha scritto:
> > > > > > > > >
> > > > > > > > > Hi Enrico,
> > > > > > > > >
> > > > > > > > > > We also need a way to represent this as a String or a
> byte[]
> > > > > > > > >
> > > > > > > > > We already have the `toByteArray` method, right?
> > > > > > > >
> > > > > > > > Yes, correct. So we are fine. I forgot about it and I
> answered too
> > > > > > > quickly.
> > > > > > > >
> > > > > > > > I am not sure if this can be in the scope of this
> initiative, but
> > > > we
> > > > > > > > should somehow get rid of
> > > > > > > > stuff like "fromByteArrayWithTopic" vs "fromByteArray".
> > > > > > > >
> > > > > > > > Thanks
> > > > > > > > Enrico
> > > > > > > >
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Yunze
> > > > > > > > >
> > > > > > > > > On Tue, Nov 8, 2022 at 6:43 PM Enrico Olivelli <
> > > > > eolive...@gmail.com>
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > Il giorno mar 8 nov 2022 alle ore 11:25 Yunze Xu
> > > > > > > > > > <y...@streamnative.io.invalid> ha scritto:
> > > > > > > > > > >
> > > > > > > > > > > Hi all,
> > > > > > > > > > >
> > > > > > > > > > > Currently we have the following 5 implementations of
> > > > MessageId:
> > > > > > > > > > >
> > > > > > > > > > > - MessageIdImpl: (ledger id, entry id, partition index)
> > > > > > > > > > >   - BatchMessageIdImpl: adds (batch index, batch size,
> > > > acker),
> > > > > > > where
> > > > > > > > > > >     acker is a wrapper of a BitSet.
> > > > > > > > > > >   - ChunkMessageIdImpl: adds another MessageIdImpl that
> > > > > represents
> > > > > > > > > > >     the first MessageIdImpl of a BitSet.
> > > > > > > > > > >   - MultiMessageIdImpl: adds a map that maps the topic
> name
> > > > to
> > > > > the
> > > > > > > > > > >     MessageId.
> > > > > > > > > > > - TopicMessageIdImpl: adds the topic name and the
> partition
> > > > > name
> > > > > > > > > > >
> > > > > > > > > > > These implementations are such a mess. For example,
> when
> > > > users
> > > > > get
> > > > > > > a
> > > > > > > > > > > MessageId from `Producer#send`:
> > > > > > > > > > >
> > > > > > > > > > > ```java
> > > > > > > > > > > var id = producer.send("msg");
> > > > > > > > > > > ```
> > > > > > > > > > >
> > > > > > > > > > > There is no getter to get some specific fields like
> ledger
> > > > id.
> > > > > You
> > > > > > > can
> > > > > > > > > > > only see a representation from `toString` method and
> got some
> > > > > > > output
> > > > > > > > > > > like "0:0:-1:0". Maybe this design is to hidden some
> details,
> > > > > but
> > > > > > > if
> > > > > > > > > > > users don't know the details like ledger id and entry
> id, how
> > > > > could
> > > > > > > > > > > you know what does "0:0:-1:0" mean? What if
> > > > > `MessageId#toString`'s
> > > > > > > > > > > implementation changed? Should it be treated as a
> breaking
> > > > > change?
> > > > > > > > > > >
> > > > > > > > > > > The original definition of the underlying
> MessageIdData is
> > > > much
> > > > > > > more
> > > > > > > > > > > clear:
> > > > > > > > > > >
> > > > > > > > > > > ```proto
> > > > > > > > > > > message MessageIdData {
> > > > > > > > > > >     required uint64 ledgerId = 1;
> > > > > > > > > > >     required uint64 entryId  = 2;
> > > > > > > > > > >     optional int32 partition = 3 [default = -1];
> > > > > > > > > > >     optional int32 batch_index = 4 [default = -1];
> > > > > > > > > > >     repeated int64 ack_set = 5;
> > > > > > > > > > >     optional int32 batch_size = 6;
> > > > > > > > > > >
> > > > > > > > > > >     // For the chunk message id, we need to specify
> the first
> > > > > > > chunk message id.
> > > > > > > > > > >     optional MessageIdData first_chunk_message_id = 7;
> > > > > > > > > > > }
> > > > > > > > > > > ```
> > > > > > > > > > >
> > > > > > > > > > > IMO, MessageId should be a wrapper of MessageIdData.
> It's
> > > > more
> > > > > > > natural
> > > > > > > > > > > to have an interface like:
> > > > > > > > > > >
> > > > > > > > > > > ```java
> > > > > > > > > > > interface MessageId {
> > > > > > > > > > >     long ledgerId();
> > > > > > > > > > >     long entryId();
> > > > > > > > > > >     Optional<Integer> partition();
> > > > > > > > > > >     Optional<Integer> batchIndex();
> > > > > > > > > > >     // ...
> > > > > > > > > > > ```
> > > > > > > > > >
> > > > > > > > > > This is very good for client applications.
> > > > > > > > > > We also need a way to represent this as a String or a
> byte[],
> > > > > this
> > > > > > > way
> > > > > > > > > > client applications have a standard way to store
> > > > > > > > > > message offsets into an external system (for instance
> when you
> > > > > want
> > > > > > > to
> > > > > > > > > > user the Reader API and keep track of the position by
> yourself)
> > > > > > > > > >
> > > > > > > > > > Enrico
> > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Additionally, there are many places that use only the
> triple
> > > > of
> > > > > > > > > > > (ledger id, entry id, batch index) as the key to
> represent
> > > > the
> > > > > > > position.
> > > > > > > > > > > Currently, they are done by adding a conversion from
> > > > > > > > > > > BatchMessageIdImpl to MessageIdImpl. However, it's more
> > > > > intuitive
> > > > > > > to
> > > > > > > > > > > write something like:
> > > > > > > > > > >
> > > > > > > > > > > ```java
> > > > > > > > > > > class MessageIdPosition implements
> > > > > Comparable<MessageIdPosition> {
> > > > > > > > > > >     private final MessageId messageId;
> > > > > > > > > > >     // TODO: compare only the triple (ledger, entry,
> batch)
> > > > > > > > > > > ```
> > > > > > > > > > >
> > > > > > > > > > > Therefore, I'm going to write a proposal to redesign
> the
> > > > > MessageId
> > > > > > > > > > > interface only by adding some getters. Regarding the 5
> > > > existing
> > > > > > > > > > > implementations, I think we can drop them because they
> are a
> > > > > part
> > > > > > > > > > > of `pulsar-client`, not `pulsar-client-api`.
> > > > > > > > > > >
> > > > > > > > > > > Please feel free to share your points.
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Yunze
> > > > > > >
> > > > >
> > > >
>
-- 
Best,
tison.

Reply via email to