Hi,
Can we clarify how the customer can deup the following duplicate chunked
msgs?


Lets say we have dedup topic here,
// producer sends a chunked msg, m1
s1, c0
s1, c1
s1, c2 // complete

// producer oom and restarted without updating the sequence id to 2

// producer resend the msg(same content), m2 with the same sequence id.
s1, c0
s1, c1
s1, c2 //complete


// consumer receives m1
s1, c0
s1, c1
s1, c2 // complete

// consumer oom and restarted

// consumer receives m2
s1, c0
s1, c1
s1, c2 // complete

In this case, it seems like the consumer received the same msg twice, even
though the sequence id is the same.

Please confirm if I missed something here.







On Sun, Aug 27, 2023 at 9:55 PM Xiangying Meng <xiangy...@apache.org> wrote:

> Hi community,
>
> After internal discussions and agreement with Penghui and Zike, we've
> come to a consensus that a small amount of dirty data within the topic
> can be tolerated. As a result, this proposal has lost its
> significance. We are now closing this discussion. Thanks to all
> participants for their contributions. This has been a successful
> collaboration.
>
> Best regards,
> Xiangying
>
> On Mon, Aug 28, 2023 at 10:13 AM Xiangying Meng <xiangy...@apache.org>
> wrote:
> >
> > Hi Penghui,
> >
> > >From my understanding.
> > >The message deduplication should only check the last chunk of the
> message.
> >
> > Yes, I agree. If we only check the first chunk, the third chunk will be
> dropped.
> >
> > Thanks,
> > Xiangying
> >
> > On Mon, Aug 28, 2023 at 10:08 AM PengHui Li <peng...@apache.org> wrote:
> > >
> > > Hi Xiangying,
> > >
> > > Thanks for driving the proposal.
> > > From my understanding.
> > > The message deduplication should only check the last chunk of the
> message.
> > > It doesn't need to care about whether each chunk is duplicated.
> > > The client side should handle issues like duplicated chunks.
> > >
> > > For the example that you have discussed:
> > >
> > > ```
> > > Producer sent:
> > > 1. SequenceID: 0, ChunkID: 0
> > > 2. SequenceID: 0, ChunkID: 1
> > > 3. SequenceID: 0, ChunkID: 0
> > > 4. SequenceID: 0, ChunkID: 1
> > > 5. SequenceID: 0, ChunkID: 2
> > > ```
> > >
> > > The consumer should give up 1 and 2. And start to build the chunk
> message
> > > from 3 to 5.
> > > Because 1 and 2 belong to an incomplete chunk message.
> > >
> > > For the deduplication. If the chunkId 2 is the last chunk of the
> message.
> > > We should put it into the persistence map in the deduplication once it
> has
> > > been persistent.
> > > Any subsequent messages with the same sequence ID and producer name
> will be
> > > treated as
> > > duplicates, no matter whether the sequence ID is generated by the
> producer
> > > or specified by users.
> > >
> > > Regards,
> > > Penghui
> > >
> > > On Sat, Aug 26, 2023 at 5:55 PM Xiangying Meng <xiangy...@apache.org>
> wrote:
> > >
> > > > Share more information: Even for versions before 3.0.0, the approach
> > > > doesn't assemble chunks 3, 4, and 5 together.
> > > >
> > > > Please note this line of code:
> > > >
> > > > ```java
> > > > chunkedMsgCtx =
> chunkedMessagesMap.computeIfAbsent(msgMetadata.getUuid(),
> > > >                     (key) -> ChunkedMessageCtx.get(totalChunks,
> > > > chunkedMsgBuffer));
> > > > ```
> > > >
> > > > And the new solution we adopted in the PR [0] is to add a timestamp
> in
> > > > the uuid. Thank Heesung for providing this idea again.
> > > >
> > > > [0]  https://github.com/apache/pulsar/pull/20948
> > > >
> > > >
> > > > On Sat, Aug 26, 2023 at 5:20 PM Xiangying Meng <xiangy...@apache.org
> >
> > > > wrote:
> > > > >
> > > > > Hi Zike,
> > > > >
> > > > > PR [0] has already fixed this bug and won't introduce compatibility
> > > > issues.
> > > > > PR [1] is unnecessary and can be closed. However, I still greatly
> > > > > appreciate the information you provided.
> > > > >
> > > > > [0] https://github.com/apache/pulsar/pull/20948
> > > > > [1] https://github.com/apache/pulsar/pull/21070
> > > > >
> > > > > On Sat, Aug 26, 2023 at 4:49 PM Zike Yang <z...@apache.org> wrote:
> > > > > >
> > > > > > > Hi Zike
> > > > > > You can see the processMessageChunk method of the ConsumerImpl.
> > > > > >
> > > > > > Ah. That seems like a regression bug introduced by
> > > > > > https://github.com/apache/pulsar/pull/18511. I have pushed a PR
> to fix
> > > > > > it: https://github.com/apache/pulsar/pull/21070
> > > > > >
> > > > > > For the behavior before Pulsar 3.0.0. The consumer should
> assemble the
> > > > > > message using 3,4,5.
> > > > > >
> > > > > > Thanks for pointing this out.
> > > > > >
> > > > > > BR,
> > > > > > Zike Yang
> > > > > >
> > > > > > On Sat, Aug 26, 2023 at 3:58 PM Xiangying Meng <
> xiangy...@apache.org>
> > > > wrote:
> > > > > > >
> > > > > > > >> Consumer receive:
> > > > > > > >1. SequenceID: 0, ChunkID: 0
> > > > > > > >2. SequenceID: 0, ChunkID: 1
> > > > > > > >3. SequenceID: 0, ChunkID: 0 // chunk ID out of order.
> Release this
> > > > > > > >chunk and recycle its `chunkedMsgCtx`.
> > > > > > > >4. SequenceID: 0, ChunkID: 1  // chunkedMsgCtx == null
> Release it.
> > > > > > > >5. SequenceID: 0, ChunkID: 2  // chunkedMsgCtx == null
> Release it.
> > > > > > > >
> > > > > > > >I think this case is wrong. For the current implementation,
> the
> > > > > > > >message 3,4,5 will be assembled as a original large message.
> > > > > > >
> > > > > > > Hi Zike
> > > > > > > You can see the processMessageChunk method of the ConsumerImpl.
> > > > > > >
> > > > > > > ```
> > > > > > >
> > > > > > > ChunkedMessageCtx chunkedMsgCtx =
> > > > chunkedMessagesMap.get(msgMetadata.getUuid());
> > > > > > >
> > > > > > > if (msgMetadata.getChunkId() == 0 && chunkedMsgCtx == null) {
> > > > > > >     //assemble a chunkedMsgCtx and put into
> > > > > > > pendingChunkedMessageUuidQueue and chunkedMessagesMap.
> > > > > > > }
> > > > > > >
> > > > > > > if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer ==
> null
> > > > > > >         || msgMetadata.getChunkId() !=
> > > > > > > (chunkedMsgCtx.lastChunkedMessageId + 1)) {
> > > > > > >     if (chunkedMsgCtx != null) {
> > > > > > >         if (chunkedMsgCtx.chunkedMsgBuffer != null) {
> > > > > > >
> > > >  ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);
> > > > > > >         }
> > > > > > >         chunkedMsgCtx.recycle();
> > > > > > >     }
> > > > > > >     chunkedMessagesMap.remove(msgMetadata.getUuid());
> > > > > > >     compressedPayload.release();
> > > > > > >     increaseAvailablePermits(cnx);
> > > > > > > }
> > > > > > >
> > > > > > > ```
> > > > > > >
> > > > > > > On Sat, Aug 26, 2023 at 3:48 PM Zike Yang <z...@apache.org>
> wrote:
> > > > > > > >
> > > > > > > > > Consumer receive:
> > > > > > > > 1. SequenceID: 0, ChunkID: 0
> > > > > > > > 2. SequenceID: 0, ChunkID: 1
> > > > > > > > 3. SequenceID: 0, ChunkID: 0 // chunk ID out of order.
> Release this
> > > > > > > > chunk and recycle its `chunkedMsgCtx`.
> > > > > > > > 4. SequenceID: 0, ChunkID: 1  // chunkedMsgCtx == null
> Release it.
> > > > > > > > 5. SequenceID: 0, ChunkID: 2  // chunkedMsgCtx == null
> Release it.
> > > > > > > >
> > > > > > > > I think this case is wrong. For the current implementation,
> the
> > > > > > > > message 3,4,5 will be assembled as a original large message.
> > > > > > > >
> > > > > > > > HI, Heesung
> > > > > > > >
> > > > > > > >
> > > > > > > > > I think brokers probably need to track map<uuid,
> last_chunk_id>
> > > > to dedup
> > > > > > > >
> > > > > > > > I propose a simpler solution in this mail thread earlier,
> which
> > > > > > > > doesn't need to introduce map<uuid, last_chunk_id> :
> > > > > > > >
> > > > > > > > > I think a simple better approach is to only check the
> > > > deduplication
> > > > > > > > for the last chunk of the large message. The consumer only
> gets the
> > > > > > > > whole message after receiving the last chunk. We don't need
> to
> > > > check
> > > > > > > > the deduplication for all previous chunks. Also by doing
> this we
> > > > only
> > > > > > > > need bug fixes, we don't need to introduce a new PIP.
> > > > > > > >
> > > > > > > > Could you explain or show a case in what cases would lead to
> this
> > > > > > > > simpler solution not working?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Zike Yang
> > > > > > > >
> > > > > > > > On Sat, Aug 26, 2023 at 1:38 PM Heesung Sohn
> > > > > > > > <heesung.s...@streamnative.io.invalid> wrote:
> > > > > > > > >
> > > > > > > > > > In this case, the consumer only can receive m1.
> > > > > > > > >
> > > > > > > > > Regarding this comment, can you explain how the consumer
> only
> > > > receives m1?
> > > > > > > > > Here, m1's and m2's uuid and msgId will be different(if we
> > > > suffix with a
> > > > > > > > > chunkingSessionId), although the sequence id is the same.
> > > > > > > > >
> > > > > > > > > > If we throw an exception when users use the same
> sequence to
> > > > send the
> > > > > > > > > message.
> > > > > > > > > Do You mean `If "producers" throw an exception when users
> use
> > > > the same
> > > > > > > > > sequence to send the message.`.
> > > > > > > > > Again, when the producers restart, they lose the last
> sequence
> > > > id sent.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > > If we do not throw an exception when users use the same
> > > > sequence to
> > > > > > > > > send the message.
> > > > > > > > >
> > > > > > > > > For this logic, how do we handle if the producer suddenly
> > > > resends the
> > > > > > > > > chunked message with a different chunking scheme(e.g.
> > > > maxMessageSize) ?
> > > > > > > > > uuid=1, sid=0, cid=0
> > > > > > > > > uuid=1, sid=0, cid=1
> > > > > > > > > uuid=2, sid=0, cid=0
> > > > > > > > > uuid=2, sid=0, cid=1
> > > > > > > > >
> > > > > > > > > We could refine what to track and algo logic on the broker
> side
> > > > more, but
> > > > > > > > > do we agree that the broker chunk dedup logic is needed?
> > > > > > > > >
> > > > > > > > > I will continue to think more next week. Have a nice
> weekend.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Fri, Aug 25, 2023 at 9:14 PM Xiangying Meng <
> > > > xiangy...@apache.org> wrote:
> > > > > > > > >
> > > > > > > > > > Hi Heesung,
> > > > > > > > > >
> > > > > > > > > > Maybe we only need to maintain the last chunk ID in a
> map.
> > > > > > > > > > Map<producername, chunkID> map1.
> > > > > > > > > > And we already have a map maintaining the last sequence
> ID.
> > > > > > > > > > Map<producername, sequence ID> map2.
> > > > > > > > > >
> > > > > > > > > > If we do not throw an exception when users use the same
> > > > sequence to
> > > > > > > > > > send the message.
> > > > > > > > > >
> > > > > > > > > > For any incoming msg, m :
> > > > > > > > > > chunk ID = -1;
> > > > > > > > > > If m is a chunk message:
> > > > > > > > > > chunk ID = m.chunkID.
> > > > > > > > > >       If m.currentSeqid < LastSeqId, dedup.
> > > > > > > > > >       If m.currentSeqid > LastSeqId && m.chunk ID = 0,
> nodedup
> > > > > > > > > >                 if chunk ID exists in the map.
> > > > > > > > > >                    Update it and log an error. This means
> > > > there is an
> > > > > > > > > > incomplete chunk message.
> > > > > > > > > >                 If chunk ID does not exist in the map.
> > > > > > > > > >                    Put it on the map.
> > > > > > > > > >       If m.currentSeqid == LastSeqId,
> > > > > > > > > >            1. if m.chunk ID == -1 || m.chunk ID == 0.
> dedup.
> > > > > > > > > >            2. If 1 <= m.chunkID <= total chunk.
> > > > > > > > > >               1. If chunk ID does not exist in the map.
> dedup.
> > > > > > > > > >               2. If chunk ID exists in the map. dedup.
> Check
> > > > the order
> > > > > > > > > > of the chunkID to determine whether dedup;
> > > > > > > > > >            3. If m.chunkID == total chunk, persistent the
> > > > chunk and
> > > > > > > > > > remove the chunkID in the map.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > If we throw an exception when users use the same
> sequence to
> > > > send the
> > > > > > > > > > message.
> > > > > > > > > >
> > > > > > > > > > For any incoming msg, m :
> > > > > > > > > > chunk ID = 0;
> > > > > > > > > > If m is a chunk message:
> > > > > > > > > > chunk ID = m.chunkID.
> > > > > > > > > >    If m.currentSeqid < LastSeqId, dedup.
> > > > > > > > > >    If m.currentSeqid == LastSeqId.
> > > > > > > > > >        If chunkID > 0, Check the last chunkID to
> determine
> > > > whether to
> > > > > > > > > > dedup.
> > > > > > > > > >             If chunkID == 1, put chunkID into the map if
> > > > absent.
> > > > > > > > > >        IF chunkID = 0, dedup.
> > > > > > > > > >
> > > > > > > > > > BR,
> > > > > > > > > > xiangying
> > > > > > > > > >
> > > > > > > > > > On Sat, Aug 26, 2023 at 11:53 AM Heesung Sohn
> > > > > > > > > > <heesung.s...@streamnative.io.invalid> wrote:
> > > > > > > > > > >
> > > > > > > > > > > However, what If the producer jvm gets restarted after
> the
> > > > broker
> > > > > > > > > > persists
> > > > > > > > > > > the m1 (but before updating their sequence id in their
> > > > persistence
> > > > > > > > > > > storage), and the producer is trying to resend the same
> > > > msg(so m2) with
> > > > > > > > > > the
> > > > > > > > > > > same sequence id after restarting?
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Aug 25, 2023 at 8:22 PM Xiangying Meng <
> > > > xiangy...@apache.org>
> > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Heesung,
> > > > > > > > > > > >
> > > > > > > > > > > > In this case, the consumer only can receive m1.
> > > > > > > > > > > >
> > > > > > > > > > > > But it has the same content as the previous case:
> What
> > > > should we do if
> > > > > > > > > > > > the user sends messages with the sequence ID that
> was used
> > > > previously?
> > > > > > > > > > > >
> > > > > > > > > > > > I am afraid to introduce the incompatibility in this
> case,
> > > > so I only
> > > > > > > > > > > > added a warning log in the PR[0] instead of throwing
> an
> > > > exception.
> > > > > > > > > > > > Regarding this matter, what do you think? Should we
> throw
> > > > an exception
> > > > > > > > > > > > or add error logs?
> > > > > > > > > > > >
> > > > > > > > > > > > I'm looking forward to hearing your viewpoint.
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > > Xiangying
> > > > > > > > > > > >
> > > > > > > > > > > > [0] https://github.com/apache/pulsar/pull/21047
> > > > > > > > > > > >
> > > > > > > > > > > > On Sat, Aug 26, 2023 at 10:58 AM Heesung Sohn
> > > > > > > > > > > > <heesung.s...@streamnative.io.invalid> wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > Actually, can we think about this case too?
> > > > > > > > > > > > >
> > > > > > > > > > > > > What happens if the cx sends the same chunked msg
> with
> > > > the same seq
> > > > > > > > > > id
> > > > > > > > > > > > when
> > > > > > > > > > > > > dedup is enabled?
> > > > > > > > > > > > >
> > > > > > > > > > > > > // user send a chunked msg, m1
> > > > > > > > > > > > > s1, c0
> > > > > > > > > > > > > s1, c1
> > > > > > > > > > > > > s1, c2 // complete
> > > > > > > > > > > > >
> > > > > > > > > > > > > // user resend the duplicate msg, m2
> > > > > > > > > > > > > s1, c0
> > > > > > > > > > > > > s1, c1
> > > > > > > > > > > > > s1, c2 //complete
> > > > > > > > > > > > >
> > > > > > > > > > > > > Do consumers receive m1 and m2(no dedup)?
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Fri, Aug 25, 2023 at 6:55 PM Xiangying Meng <
> > > > xiangy...@apache.org
> > > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi Heesung,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > >I think this means, for the PIP, the broker
> side's
> > > > chunk
> > > > > > > > > > > > deduplication.
> > > > > > > > > > > > > > >I think brokers probably need to track map<uuid,
> > > > last_chunk_id> to
> > > > > > > > > > > > dedup
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > What is the significance of doing this?
> > > > > > > > > > > > > > My understanding is that if the client crashes
> and
> > > > restarts after
> > > > > > > > > > > > > > sending half of a chunk message and then it
> resends
> > > > the previous
> > > > > > > > > > chunk
> > > > > > > > > > > > > > message, the resent chunk message should be
> treated as
> > > > a new
> > > > > > > > > > message
> > > > > > > > > > > > > > since it calls the producer's API again.
> > > > > > > > > > > > > > If deduplication is enabled, users should ensure
> that
> > > > their
> > > > > > > > > > customized
> > > > > > > > > > > > > > sequence ID is not lower than the previous
> sequence ID.
> > > > > > > > > > > > > > I have considered this scenario and added a
> warning
> > > > log in PR[0].
> > > > > > > > > > (I'm
> > > > > > > > > > > > > > not sure whether an error log should be added or
> an
> > > > exception
> > > > > > > > > > thrown.)
> > > > > > > > > > > > > > If deduplication is not enabled, on the consumer
> side,
> > > > there
> > > > > > > > > > should be
> > > > > > > > > > > > > > an incomplete chunk message received alongside
> another
> > > > complete
> > > > > > > > > > chunk
> > > > > > > > > > > > > > message, each with a different UUID, and they
> will not
> > > > interfere
> > > > > > > > > > with
> > > > > > > > > > > > > > each other.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > My main point is that every message sent using
> > > > > > > > > > > > > > `producer.newMessage().send()` should be treated
> as a
> > > > new message.
> > > > > > > > > > > > > > UUID is solely used for the consumer side to
> identify
> > > > different
> > > > > > > > > > chunk
> > > > > > > > > > > > > > messages.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > BR
> > > > > > > > > > > > > > Xiangying
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > [0] https://github.com/apache/pulsar/pull/21047
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Sat, Aug 26, 2023 at 9:34 AM Heesung Sohn
> > > > > > > > > > > > > > <heesung.s...@streamnative.io.invalid> wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I think this means, for the PIP, the broker
> side's
> > > > chunk
> > > > > > > > > > > > deduplication.
> > > > > > > > > > > > > > > I think brokers probably need to track
> map<uuid,
> > > > last_chunk_id>
> > > > > > > > > > to
> > > > > > > > > > > > dedup
> > > > > > > > > > > > > > > chunks on the broker side.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Fri, Aug 25, 2023 at 6:16 PM Xiangying Meng
> <
> > > > > > > > > > xiangy...@apache.org
> > > > > > > > > > > > >
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Hi Heesung
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > It is a good point.
> > > > > > > > > > > > > > > > Assume the producer application jvm restarts
> in
> > > > the middle of
> > > > > > > > > > > > chunking
> > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > resends the message chunks from the
> beginning with
> > > > the previous
> > > > > > > > > > > > > > sequence
> > > > > > > > > > > > > > > > id.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > For the previous version, it should be:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Producer send:
> > > > > > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0
> > > > > > > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1
> > > > > > > > > > > > > > > > 3. SequenceID: 0, ChunkID: 0
> > > > > > > > > > > > > > > > 4. SequenceID: 0, ChunkID: 1
> > > > > > > > > > > > > > > > 5. SequenceID: 0, ChunkID: 2
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Consumer receive:
> > > > > > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0
> > > > > > > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1
> > > > > > > > > > > > > > > > 3. SequenceID: 0, ChunkID: 0 // chunk ID out
> of
> > > > order. Release
> > > > > > > > > > this
> > > > > > > > > > > > > > > > chunk and recycle its `chunkedMsgCtx`.
> > > > > > > > > > > > > > > > 4. SequenceID: 0, ChunkID: 1  //
> chunkedMsgCtx ==
> > > > null Release
> > > > > > > > > > it.
> > > > > > > > > > > > > > > > 5. SequenceID: 0, ChunkID: 2  //
> chunkedMsgCtx ==
> > > > null Release
> > > > > > > > > > it.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Therefore, for the previous version, this
> chunk
> > > > message can
> > > > > > > > > > not be
> > > > > > > > > > > > > > > > received by the consumer. It is not an
> > > > incompatibility issue.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > However, the solution of optimizing the
> `uuid` is
> > > > valuable to
> > > > > > > > > > the
> > > > > > > > > > > > new
> > > > > > > > > > > > > > > > implementation.
> > > > > > > > > > > > > > > > I will modify this in the PR[0]. Thank you
> very
> > > > much for your
> > > > > > > > > > > > reminder
> > > > > > > > > > > > > > > > and the provided UUID optimization solution.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > BR,
> > > > > > > > > > > > > > > > Xiangying
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > [0]
> https://github.com/apache/pulsar/pull/20948
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On Sat, Aug 26, 2023 at 8:48 AM Heesung Sohn
> > > > > > > > > > > > > > > > <heesung.s...@streamnative.io.invalid>
> wrote:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Hi, I meant
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > What if the producer application jvm
> restarts in
> > > > the middle
> > > > > > > > > > of
> > > > > > > > > > > > > > chunking
> > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > resends the message chunks from the
> beginning
> > > > with the
> > > > > > > > > > previous
> > > > > > > > > > > > > > sequence
> > > > > > > > > > > > > > > > id?
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > On Fri, Aug 25, 2023 at 5:15 PM Xiangying
> Meng <
> > > > > > > > > > > > xiangy...@apache.org
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Hi Heesung
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > It is a good idea to cover this
> > > > incompatibility case if the
> > > > > > > > > > > > > > producer
> > > > > > > > > > > > > > > > > > splits the chunk message again when
> retrying.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > But in fact, the producer only resents
> the
> > > > chunks that are
> > > > > > > > > > > > > > assembled
> > > > > > > > > > > > > > > > > > to `OpSendMsg` instead of splitting the
> chunk
> > > > message
> > > > > > > > > > again.
> > > > > > > > > > > > > > > > > > So, there is no incompatibility issue of
> > > > resenting the
> > > > > > > > > > chunk
> > > > > > > > > > > > > > message
> > > > > > > > > > > > > > > > > > by splitting the chunk message again.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > The logic of sending chunk messages can
> be
> > > > found here:
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > >
> > > >
> https://github.com/apache/pulsar/blob/e0c481e5f8d7fa5534d3327785928a234376789e/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L533
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > The logic of resending the message can be
> > > > found here:
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > >
> > > >
> https://github.com/apache/pulsar/blob/e0c481e5f8d7fa5534d3327785928a234376789e/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1892
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > BR,
> > > > > > > > > > > > > > > > > > Xiangying
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > On Sat, Aug 26, 2023 at 2:24 AM Heesung
> Sohn
> > > > > > > > > > > > > > > > > > <heesung.s...@streamnative.io.invalid>
> wrote:
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > >> I think brokers can track the last
> > > > > > > > > > chunkMaxMessageSize for
> > > > > > > > > > > > > > each
> > > > > > > > > > > > > > > > > > producer.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > Using different chunkMaxMessageSize
> is
> > > > just one of the
> > > > > > > > > > > > > > aspects. In
> > > > > > > > > > > > > > > > > > > PIP-132 [0], we have included the
> message
> > > > metadata size
> > > > > > > > > > when
> > > > > > > > > > > > > > checking
> > > > > > > > > > > > > > > > > > > maxMessageSize.The message metadata
> can be
> > > > changed after
> > > > > > > > > > > > > > splitting
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > chunks. We are still uncertain about
> the way
> > > > the chunked
> > > > > > > > > > > > message
> > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > > split,
> > > > > > > > > > > > > > > > > > > even using the same ss
> chunkMaxMessageSize.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > This sounds like we need to revisit
> chunking
> > > > uuid logic.
> > > > > > > > > > > > > > > > > > > Like I commented here,
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > https://github.com/apache/pulsar/pull/20948/files#r1305997883
> > > > > > > > > > > > > > > > > > > Why don't we add a chunk session id
> suffix
> > > > to identify
> > > > > > > > > > the
> > > > > > > > > > > > > > ongoing
> > > > > > > > > > > > > > > > > > chunking
> > > > > > > > > > > > > > > > > > > uniquely?
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Currently,
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > chunking uuid = producer + sequence_id
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Proposal
> > > > > > > > > > > > > > > > > > > chunking  uuid = producer +
> sequence_id +
> > > > > > > > > > chunkingSessionId
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > * chunkingSessionId could be a
> timestamp
> > > > when the
> > > > > > > > > > chunking
> > > > > > > > > > > > > > started.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > On Fri, Aug 25, 2023 at 6:02 AM
> Xiangying
> > > > Meng <
> > > > > > > > > > > > > > xiangy...@apache.org
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > Hi Zike,
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > >How would this happen to get two
> > > > duplicated and
> > > > > > > > > > > > consecutive
> > > > > > > > > > > > > > > > ChunkID-1
> > > > > > > > > > > > > > > > > > > > >messages? The producer should
> guarantee
> > > > to retry the
> > > > > > > > > > whole
> > > > > > > > > > > > > > chunked
> > > > > > > > > > > > > > > > > > > > >messages instead of some parts of
> the
> > > > chunks.
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > If the producer guarantees to retry
> the
> > > > whole chunked
> > > > > > > > > > > > messages
> > > > > > > > > > > > > > > > instead
> > > > > > > > > > > > > > > > > > > > of some parts of the chunks,
> > > > > > > > > > > > > > > > > > > > Why doesn't the bug of the producer
> retry
> > > > chunk
> > > > > > > > > > messages in
> > > > > > > > > > > > > > the PR
> > > > > > > > > > > > > > > > [0]
> > > > > > > > > > > > > > > > > > > > appear?
> > > > > > > > > > > > > > > > > > > > And why do you need to set `chunkId`
> in
> > > > > > > > > > `op.rePopulate`?
> > > > > > > > > > > > > > > > > > > > It will be rested when split chunk
> > > > messages again if
> > > > > > > > > > the
> > > > > > > > > > > > > > producer
> > > > > > > > > > > > > > > > > > > > guarantees to retry the whole chunked
> > > > messages.
> > > > > > > > > > > > > > > > > > > > ```
> > > > > > > > > > > > > > > > > > > > final MessageMetadata
> finalMsgMetadata =
> > > > msgMetadata;
> > > > > > > > > > > > > > > > > > > > op.rePopulate = () -> {
> > > > > > > > > > > > > > > > > > > > if (msgMetadata.hasChunkId()) {
> > > > > > > > > > > > > > > > > > > > // The message metadata is shared
> between
> > > > all chunks
> > > > > > > > > > in a
> > > > > > > > > > > > large
> > > > > > > > > > > > > > > > message
> > > > > > > > > > > > > > > > > > > > // We need to reset the chunk id for
> each
> > > > call of this
> > > > > > > > > > > > method
> > > > > > > > > > > > > > > > > > > > // It's safe to do that because
> there is
> > > > only 1 thread
> > > > > > > > > > to
> > > > > > > > > > > > > > > > manipulate
> > > > > > > > > > > > > > > > > > > > this message metadata
> > > > > > > > > > > > > > > > > > > > finalMsgMetadata.setChunkId(chunkId);
> > > > > > > > > > > > > > > > > > > > }
> > > > > > > > > > > > > > > > > > > > op.cmd = sendMessage(producerId,
> > > > sequenceId,
> > > > > > > > > > numMessages,
> > > > > > > > > > > > > > > > messageId,
> > > > > > > > > > > > > > > > > > > > finalMsgMetadata,
> > > > > > > > > > > > > > > > > > > > encryptedPayload);
> > > > > > > > > > > > > > > > > > > > };
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > ```
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > >> But chunks 1, 2, 3, and 4 are
> still
> > > > persisted in the
> > > > > > > > > > > > topic.
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > >I think it's OK to persist them all
> on
> > > > the topic. Is
> > > > > > > > > > > > there any
> > > > > > > > > > > > > > > > issue
> > > > > > > > > > > > > > > > > > > > >with doing that?
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > This is just one scenario. Whether
> only
> > > > check the
> > > > > > > > > > sequence
> > > > > > > > > > > > ID
> > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > first chunk (as I used in PR[1]) or
> check
> > > > the sequence
> > > > > > > > > > ID
> > > > > > > > > > > > of
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > last
> > > > > > > > > > > > > > > > > > > > chunk (as you suggested), in reality,
> > > > neither of these
> > > > > > > > > > > > methods
> > > > > > > > > > > > > > can
> > > > > > > > > > > > > > > > > > > > deduplicate chunks on the broker side
> > > > because the
> > > > > > > > > > broker
> > > > > > > > > > > > cannot
> > > > > > > > > > > > > > > > know
> > > > > > > > > > > > > > > > > > > > the chunk ID of the previous message.
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > However, if combined with the
> optimization
> > > > of
> > > > > > > > > > consumer-side
> > > > > > > > > > > > > > logic,
> > > > > > > > > > > > > > > > > > > > end-to-end deduplication can be
> completed.
> > > > > > > > > > > > > > > > > > > > This is also a less-than-perfect
> solution
> > > > I mentioned
> > > > > > > > > > in my
> > > > > > > > > > > > > > first
> > > > > > > > > > > > > > > > > > > > email and implemented in PR[1].
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > The reason I propose this proposal
> is not
> > > > to solve the
> > > > > > > > > > > > > > end-to-end
> > > > > > > > > > > > > > > > > > > > deduplication of chunk messages
> between
> > > > producers and
> > > > > > > > > > > > > > consumers.
> > > > > > > > > > > > > > > > That
> > > > > > > > > > > > > > > > > > > > aspect has essentially been
> addressed in
> > > > PR[1] and is
> > > > > > > > > > still
> > > > > > > > > > > > > > > > undergoing
> > > > > > > > > > > > > > > > > > > > review.
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > This proposal aims to ensure that no
> > > > corrupt data
> > > > > > > > > > exists
> > > > > > > > > > > > > > within the
> > > > > > > > > > > > > > > > > > > > topic, as our data might be
> offloaded and
> > > > used
> > > > > > > > > > elsewhere in
> > > > > > > > > > > > > > > > scenarios
> > > > > > > > > > > > > > > > > > > > where consumer logic is not
> optimized.
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > BR,
> > > > > > > > > > > > > > > > > > > > Xiangying
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > [0]
> > > > https://github.com/apache/pulsar/pull/21048
> > > > > > > > > > > > > > > > > > > > [1]
> > > > https://github.com/apache/pulsar/pull/20948
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > On Fri, Aug 25, 2023 at 5:18 PM Zike
> Yang <
> > > > > > > > > > z...@apache.org
> > > > > > > > > > > > >
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > HI xiangying
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > The rewind operation is seen in
> the
> > > > test log.
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > That seems weird. Not sure if this
> > > > rewind is related
> > > > > > > > > > to
> > > > > > > > > > > > the
> > > > > > > > > > > > > > chunk
> > > > > > > > > > > > > > > > > > > > consuming.
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0
> > > > > > > > > > > > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1
> > > > > > > > > > > > > > > > > > > > > 3. SequenceID: 0, ChunkID: 1
> > > > > > > > > > > > > > > > > > > > > 4. SequenceID: 0, ChunkID: 2
> > > > > > > > > > > > > > > > > > > > > Such four chunks cannot be
> processed
> > > > correctly by the
> > > > > > > > > > > > > > consumer.
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > How would this happen to get two
> > > > duplicated and
> > > > > > > > > > > > consecutive
> > > > > > > > > > > > > > > > ChunkID-1
> > > > > > > > > > > > > > > > > > > > > messages? The producer should
> guarantee
> > > > to retry the
> > > > > > > > > > > > whole
> > > > > > > > > > > > > > > > chunked
> > > > > > > > > > > > > > > > > > > > > messages instead of some parts of
> the
> > > > chunks.
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > But chunks 1, 2, 3, and 4 are
> still
> > > > persisted in
> > > > > > > > > > the
> > > > > > > > > > > > topic.
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > I think it's OK to persist them
> all in
> > > > the topic. Is
> > > > > > > > > > > > there
> > > > > > > > > > > > > > any
> > > > > > > > > > > > > > > > issue
> > > > > > > > > > > > > > > > > > > > > with doing that?
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > There is another point. The
> resend of
> > > > the chunk
> > > > > > > > > > message
> > > > > > > > > > > > > > has a
> > > > > > > > > > > > > > > > bug
> > > > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > > > > I shared with you, and you fixed
> in PR
> > > > [0]. It will
> > > > > > > > > > make
> > > > > > > > > > > > this
> > > > > > > > > > > > > > > > case
> > > > > > > > > > > > > > > > > > > > > happen in another way.
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > If the user sets the sequence ID
> > > > manually, the case
> > > > > > > > > > > > could be
> > > > > > > > > > > > > > > > > > reproduced.
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > BR,
> > > > > > > > > > > > > > > > > > > > > Zike Yang
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > On Thu, Aug 24, 2023 at 8:48 PM
> > > > Xiangying Meng <
> > > > > > > > > > > > > > > > xiangy...@apache.org
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > >IIUC, this may change the
> existing
> > > > behavior and
> > > > > > > > > > may
> > > > > > > > > > > > > > introduce
> > > > > > > > > > > > > > > > > > > > inconsistencies.
> > > > > > > > > > > > > > > > > > > > > > >Suppose that we have a large
> message
> > > > with 3
> > > > > > > > > > chunks.
> > > > > > > > > > > > But
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > producer
> > > > > > > > > > > > > > > > > > > > > > >crashes and resends the message
> after
> > > > sending the
> > > > > > > > > > > > > > chunk-1. It
> > > > > > > > > > > > > > > > will
> > > > > > > > > > > > > > > > > > > > > > >send a total of 5 messages to
> the
> > > > Pulsar topic:
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > >1. SequenceID: 0, ChunkID: 0
> > > > > > > > > > > > > > > > > > > > > > >2. SequenceID: 0, ChunkID: 1
> > > > > > > > > > > > > > > > > > > > > > >3. SequenceID: 0, ChunkID: 0
>  ->
> > > > This message
> > > > > > > > > > will be
> > > > > > > > > > > > > > dropped
> > > > > > > > > > > > > > > > > > > > > > >4. SequenceID: 0, ChunkID: 1
> ->
> > > > Will also be
> > > > > > > > > > > > dropped
> > > > > > > > > > > > > > > > > > > > > > >5. SequenceID: 0, ChunkID: 2
> ->
> > > > The last chunk
> > > > > > > > > > of
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > > message
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > Hi Zike
> > > > > > > > > > > > > > > > > > > > > > There is another point. The
> resend of
> > > > the chunk
> > > > > > > > > > message
> > > > > > > > > > > > > > has a
> > > > > > > > > > > > > > > > bug
> > > > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > > > > > I shared with you, and you fixed
> in PR
> > > > [0]. It will
> > > > > > > > > > > > make
> > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > case
> > > > > > > > > > > > > > > > > > > > > > happen in another way.
> > > > > > > > > > > > > > > > > > > > > > Sample description for the  bug:
> > > > > > > > > > > > > > > > > > > > > > Because the chunk message uses
> the
> > > > same message
> > > > > > > > > > > > metadata,
> > > > > > > > > > > > > > if
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > chunk
> > > > > > > > > > > > > > > > > > > > > > is not sent out immediately.
> Then,
> > > > when resending,
> > > > > > > > > > all
> > > > > > > > > > > > > > chunks
> > > > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > same chunk message use the chunk
> ID of
> > > > the last
> > > > > > > > > > chunk.
> > > > > > > > > > > > > > > > > > > > > > In this case, It should happen
> as:
> > > > > > > > > > > > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0
> (Put op1
> > > > into
> > > > > > > > > > > > > > `pendingMessages`
> > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > > send)
> > > > > > > > > > > > > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1
> (Put op2
> > > > into
> > > > > > > > > > > > > > `pendingMessages`
> > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > > send)
> > > > > > > > > > > > > > > > > > > > > > 3. SequenceID: 0, ChunkID: 2
>  -> (Put
> > > > op3 into
> > > > > > > > > > > > > > > > `pendingMessages`)
> > > > > > > > > > > > > > > > > > > > > > 4. SequenceID: 0, ChunkID: 2   ->
> > > > (Resend op1)
> > > > > > > > > > > > > > > > > > > > > > 5. SequenceID: 0, ChunkID: 2   ->
> > > > (Resend op2)
> > > > > > > > > > > > > > > > > > > > > > 6. SequenceID: 0, ChunkID: 2   ->
> > > > (Send op3)
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > BR,
> > > > > > > > > > > > > > > > > > > > > > Xiangying
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > [0] -
> > > > https://github.com/apache/pulsar/pull/21048
> > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 24, 2023 at 8:09 PM
> > > > Xiangying Meng <
> > > > > > > > > > > > > > > > > > xiangy...@apache.org>
> > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > >> This solution also cannot
> solve
> > > > the
> > > > > > > > > > out-of-order
> > > > > > > > > > > > > > messages
> > > > > > > > > > > > > > > > > > inside
> > > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > >>chunks. For example, the
> above
> > > > five messages
> > > > > > > > > > will
> > > > > > > > > > > > > > still be
> > > > > > > > > > > > > > > > > > > > persisted.
> > > > > > > > > > > > > > > > > > > > > > > >The consumer already handles
> this
> > > > case. The
> > > > > > > > > > above 5
> > > > > > > > > > > > > > messages
> > > > > > > > > > > > > > > > > > will
> > > > > > > > > > > > > > > > > > > > all
> > > > > > > > > > > > > > > > > > > > > > > >be persisted but the consumer
> will
> > > > skip message
> > > > > > > > > > 1
> > > > > > > > > > > > and 2.
> > > > > > > > > > > > > > > > > > > > > > > >For messages 3, 4, and 5. The
> > > > producer can
> > > > > > > > > > guarantee
> > > > > > > > > > > > > > these
> > > > > > > > > > > > > > > > > > chunks
> > > > > > > > > > > > > > > > > > > > are in order.
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > The rewind operation is seen
> in the
> > > > test log.
> > > > > > > > > > Every
> > > > > > > > > > > > time
> > > > > > > > > > > > > > an
> > > > > > > > > > > > > > > > > > incorrect
> > > > > > > > > > > > > > > > > > > > > > > chunk message is received, it
> will
> > > > rewind, and
> > > > > > > > > > the
> > > > > > > > > > > > code
> > > > > > > > > > > > > > has
> > > > > > > > > > > > > > > > yet
> > > > > > > > > > > > > > > > > > to be
> > > > > > > > > > > > > > > > > > > > > > > studied in depth.
> > > > > > > > > > > > > > > > > > > > > > > If it does not call rewind,
> then
> > > > this case is
> > > > > > > > > > > > considered
> > > > > > > > > > > > > > a
> > > > > > > > > > > > > > > > > > workable
> > > > > > > > > > > > > > > > > > > > > > > case. Let's look at another
> case.
> > > > > > > > > > > > > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0
> > > > > > > > > > > > > > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1
> > > > > > > > > > > > > > > > > > > > > > > 3. SequenceID: 0, ChunkID: 1
> > > > > > > > > > > > > > > > > > > > > > > 4. SequenceID: 0, ChunkID: 2
> > > > > > > > > > > > > > > > > > > > > > > Such four chunks cannot be
> processed
> > > > correctly
> > > > > > > > > > by the
> > > > > > > > > > > > > > > > consumer.
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > In fact, this solution is my
> > > > original idea. The
> > > > > > > > > > PR I
> > > > > > > > > > > > > > > > mentioned
> > > > > > > > > > > > > > > > > > in the
> > > > > > > > > > > > > > > > > > > > > > > first email above uses a
> similar
> > > > solution and
> > > > > > > > > > > > modifies
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > logic
> > > > > > > > > > > > > > > > > > on
> > > > > > > > > > > > > > > > > > > > > > > the consumer side.
> > > > > > > > > > > > > > > > > > > > > > > Also, as I mentioned in the
> first
> > > > email, this
> > > > > > > > > > > > solution
> > > > > > > > > > > > > > can
> > > > > > > > > > > > > > > > only
> > > > > > > > > > > > > > > > > > solve
> > > > > > > > > > > > > > > > > > > > > > > the problem of end-to-end
> > > > duplication. But
> > > > > > > > > > chunks 1,
> > > > > > > > > > > > 2,
> > > > > > > > > > > > > > 3,
> > > > > > > > > > > > > > > > and 4
> > > > > > > > > > > > > > > > > > are
> > > > > > > > > > > > > > > > > > > > > > > still persisted in the topic.
> > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 24, 2023 at
> 3:00 PM Zike
> > > > Yang <
> > > > > > > > > > > > > > z...@apache.org>
> > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > Hi Heesung,
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > I believe in this PIP
> "similar
> > > > to the
> > > > > > > > > > existing
> > > > > > > > > > > > > > "sequence
> > > > > > > > > > > > > > > > ID
> > > > > > > > > > > > > > > > > > map",
> > > > > > > > > > > > > > > > > > > > > > > > to facilitate effective
> filtering"
> > > > actually
> > > > > > > > > > means
> > > > > > > > > > > > > > tracking
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > last
> > > > > > > > > > > > > > > > > > > > > > > > chunkId(not all chunk ids)
> on the
> > > > broker side.
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > With this simple solution, I
> think
> > > > we don't
> > > > > > > > > > need to
> > > > > > > > > > > > > > track
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > > (sequenceID, chunkID) on the
> > > > broker side at
> > > > > > > > > > all.
> > > > > > > > > > > > The
> > > > > > > > > > > > > > broker
> > > > > > > > > > > > > > > > > > just
> > > > > > > > > > > > > > > > > > > > needs
> > > > > > > > > > > > > > > > > > > > > > > > to apply the deduplication
> logic
> > > > to the last
> > > > > > > > > > chunk
> > > > > > > > > > > > > > instead
> > > > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > > all
> > > > > > > > > > > > > > > > > > > > > > > > previous chunks. This PIP
> actually
> > > > could do
> > > > > > > > > > that,
> > > > > > > > > > > > but
> > > > > > > > > > > > > > it
> > > > > > > > > > > > > > > > will
> > > > > > > > > > > > > > > > > > > > > > > > introduce a new data format
> and
> > > > compatibility
> > > > > > > > > > > > issue.
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > This is still a behavior
> > > > change(deduping
> > > > > > > > > > chunk
> > > > > > > > > > > > > > messages
> > > > > > > > > > > > > > > > on
> > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > broker),
> > > > > > > > > > > > > > > > > > > > > > > > and I believe we need to
> discuss
> > > > this addition
> > > > > > > > > > as a
> > > > > > > > > > > > > > PIP.
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > Actually, we didn't
> specifically
> > > > state the
> > > > > > > > > > deduping
> > > > > > > > > > > > > > chunk
> > > > > > > > > > > > > > > > > > message
> > > > > > > > > > > > > > > > > > > > > > > > behavior before. The chunked
> > > > message should be
> > > > > > > > > > > > equally
> > > > > > > > > > > > > > > > > > applicable
> > > > > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > > > > > the de-duplication logic as a
> > > > regular message.
> > > > > > > > > > > > > > Therefore, I
> > > > > > > > > > > > > > > > > > think
> > > > > > > > > > > > > > > > > > > > it
> > > > > > > > > > > > > > > > > > > > > > > > should be considered as a
> bug fix.
> > > > But if this
> > > > > > > > > > FIX
> > > > > > > > > > > > is
> > > > > > > > > > > > > > worth
> > > > > > > > > > > > > > > > > > > > discussing
> > > > > > > > > > > > > > > > > > > > > > > > in depth. I have no
> objection to
> > > > it being a new
> > > > > > > > > > > > PIP.
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > I think brokers can track
> the
> > > > last
> > > > > > > > > > > > > > chunkMaxMessageSize
> > > > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > > > > > > > each producer.
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > Using different
> > > > chunkMaxMessageSize is just
> > > > > > > > > > one of
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > aspects. In
> > > > > > > > > > > > > > > > > > > > > > > > PIP-132 [0], we have
> included the
> > > > message
> > > > > > > > > > metadata
> > > > > > > > > > > > size
> > > > > > > > > > > > > > > > when
> > > > > > > > > > > > > > > > > > > > checking
> > > > > > > > > > > > > > > > > > > > > > > > maxMessageSize.
> > > > > > > > > > > > > > > > > > > > > > > > The message metadata can be
> > > > changed after
> > > > > > > > > > > > splitting the
> > > > > > > > > > > > > > > > > > chunks. We
> > > > > > > > > > > > > > > > > > > > are
> > > > > > > > > > > > > > > > > > > > > > > > still uncertain about the
> way the
> > > > chunked
> > > > > > > > > > message
> > > > > > > > > > > > is
> > > > > > > > > > > > > > split,
> > > > > > > > > > > > > > > > > > even
> > > > > > > > > > > > > > > > > > > > using
> > > > > > > > > > > > > > > > > > > > > > > > the same ss
> chunkMaxMessageSize.
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > then the brokers can
> assume that
> > > > the
> > > > > > > > > > producer is
> > > > > > > > > > > > > > > > resending
> > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > chunks from
> > > > > > > > > > > > > > > > > > > > > > > > the beginning with a
> different
> > > > scheme(restarted
> > > > > > > > > > > > with a
> > > > > > > > > > > > > > > > > > different
> > > > > > > > > > > > > > > > > > > > > > > > chunkMaxMessageSize) and
> accept
> > > > those new
> > > > > > > > > > chunks
> > > > > > > > > > > > from
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > beginning.
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > Regarding this, it seems
> like we
> > > > are
> > > > > > > > > > implementing
> > > > > > > > > > > > > > dynamic
> > > > > > > > > > > > > > > > > > > > > > > > configuration for the
> > > > chunkMaxMessageSize. I'm
> > > > > > > > > > > > afraid
> > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > > > would
> > > > > > > > > > > > > > > > > > > > > > > > change the expected behavior
> and
> > > > introduce more
> > > > > > > > > > > > > > complexity
> > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > > > > > > > configuration.
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > [0]
> > > > > > > > > > https://github.com/apache/pulsar/pull/14007
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > BR,
> > > > > > > > > > > > > > > > > > > > > > > > Zike Yang
> > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 24, 2023 at
> 2:21 PM
> > > > Zike Yang <
> > > > > > > > > > > > > > z...@apache.org
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > Hi, xiangying
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > it will find that the
> message
> > > > > > > > > > > > > > > > > > > > > > > > > is out of order and rewind
> the
> > > > cursor. Loop
> > > > > > > > > > this
> > > > > > > > > > > > > > > > operation,
> > > > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > > > > > > > > > discard this message after
> it
> > > > expires
> > > > > > > > > > instead of
> > > > > > > > > > > > > > > > assembling
> > > > > > > > > > > > > > > > > > 3,
> > > > > > > > > > > > > > > > > > > > 4, 5
> > > > > > > > > > > > > > > > > > > > > > > > > into a message.
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > Could you point out where
> the
> > > > implementation
> > > > > > > > > > for
> > > > > > > > > > > > > > this?
> > > > > > > > > > > > > > > > From
> > > > > > > > > > > > > > > > > > my
> > > > > > > > > > > > > > > > > > > > > > > > > understanding, there
> should not
> > > > be any rewind
> > > > > > > > > > > > > > operation
> > > > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > > > chunking feature. You can
> check
> > > > more detail
> > > > > > > > > > here:
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > >
> > > >
> https://streamnative.io/blog/deep-dive-into-message-chunking-in-pulsar#how-message-chunking-is-implemented
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > This solution also cannot
> > > > solve the
> > > > > > > > > > > > out-of-order
> > > > > > > > > > > > > > > > messages
> > > > > > > > > > > > > > > > > > > > inside the
> > > > > > > > > > > > > > > > > > > > > > > > > chunks. For example, the
> above
> > > > five messages
> > > > > > > > > > will
> > > > > > > > > > > > > > still
> > > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > > > > persisted.
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > The consumer already
> handles
> > > > this case. The
> > > > > > > > > > > > above 5
> > > > > > > > > > > > > > > > messages
> > > > > > > > > > > > > > > > > > > > will all
> > > > > > > > > > > > > > > > > > > > > > > > > be persisted but the
> consumer
> > > > will skip
> > > > > > > > > > message 1
> > > > > > > > > > > > > > and 2.
> > > > > > > > > > > > > > > > > > > > > > > > > For messages 3, 4, and 5.
> The
> > > > producer can
> > > > > > > > > > > > guarantee
> > > > > > > > > > > > > > > > these
> > > > > > > > > > > > > > > > > > > > chunks are in order.
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > BR,
> > > > > > > > > > > > > > > > > > > > > > > > > Zike Yang
> > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 24, 2023 at
> 11:48 AM
> > > > Yubiao Feng
> > > > > > > > > > > > > > > > > > > > > > > > > <
> yubiao.f...@streamnative.io.invalid>
> > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > 1. SequenceID: 0,
> ChunkID: 0
> > > > > > > > > > > > > > > > > > > > > > > > > > > 2. SequenceID: 0,
> ChunkID: 1
> > > > > > > > > > > > > > > > > > > > > > > > > > > 3. SequenceID: 0,
> ChunkID: 0
> > > > > > > > > > > > > > > > > > > > > > > > > > > 4. SequenceID: 0,
> ChunkID: 1
> > > > > > > > > > > > > > > > > > > > > > > > > > > 5. SequenceID: 0,
> ChunkID: 2
> > > > > > > > > > > > > > > > > > > > > > > > > > > For the existing
> behavior,
> > > > the consumer
> > > > > > > > > > > > assembles
> > > > > > > > > > > > > > > > > > > > > > > > > > > messages 3,4,5 into
> > > > > > > > > > > > > > > > > > > > > > > > > > > the original large
> message.
> > > > But the
> > > > > > > > > > changes
> > > > > > > > > > > > > > brought
> > > > > > > > > > > > > > > > > > > > > > > > > > > about by this PIP
> > > > > > > > > > > > > > > > > > > > > > > > > > > will cause the
> consumer to
> > > > use messages
> > > > > > > > > > > > 1,2,5 for
> > > > > > > > > > > > > > > > > > > > > > > > > > > assembly. There is
> > > > > > > > > > > > > > > > > > > > > > > > > > > no guarantee that the
> > > > producer will
> > > > > > > > > > split the
> > > > > > > > > > > > > > message
> > > > > > > > > > > > > > > > > > > > > > > > > > > in the same way
> > > > > > > > > > > > > > > > > > > > > > > > > > > twice before and
> after. For
> > > > example, the
> > > > > > > > > > > > > > producer's
> > > > > > > > > > > > > > > > > > > > > > > > > > > maxMessageSize may
> > > > > > > > > > > > > > > > > > > > > > > > > > > be different. This may
> cause
> > > > the
> > > > > > > > > > consumer to
> > > > > > > > > > > > > > > > > > > > > > > > > > > receive a corrupt
> > > > > > > > > > > > > > > > > > > > > > > > > > > message.
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > Good point.
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > Thanks
> > > > > > > > > > > > > > > > > > > > > > > > > > Yubiao Feng
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Aug 23, 2023 at
> > > > 12:34 PM Zike Yang
> > > > > > > > > > <
> > > > > > > > > > > > > > > > > > z...@apache.org>
> > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, xiangying,
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for your PIP.
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > IIUC, this may change
> the
> > > > existing
> > > > > > > > > > behavior
> > > > > > > > > > > > and
> > > > > > > > > > > > > > may
> > > > > > > > > > > > > > > > > > introduce
> > > > > > > > > > > > > > > > > > > > > > > > > > > inconsistencies.
> > > > > > > > > > > > > > > > > > > > > > > > > > > Suppose that we have a
> large
> > > > message
> > > > > > > > > > with 3
> > > > > > > > > > > > > > chunks.
> > > > > > > > > > > > > > > > But
> > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > producer
> > > > > > > > > > > > > > > > > > > > > > > > > > > crashes and resends the
> > > > message after
> > > > > > > > > > > > sending the
> > > > > > > > > > > > > > > > > > chunk-1.
> > > > > > > > > > > > > > > > > > > > It will
> > > > > > > > > > > > > > > > > > > > > > > > > > > send a total of 5
> messages
> > > > to the Pulsar
> > > > > > > > > > > > topic:
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > 1. SequenceID: 0,
> ChunkID: 0
> > > > > > > > > > > > > > > > > > > > > > > > > > > 2. SequenceID: 0,
> ChunkID: 1
> > > > > > > > > > > > > > > > > > > > > > > > > > > 3. SequenceID: 0,
> ChunkID:
> > > > 0   -> This
> > > > > > > > > > > > message
> > > > > > > > > > > > > > will
> > > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > > > > dropped
> > > > > > > > > > > > > > > > > > > > > > > > > > > 4. SequenceID: 0,
> ChunkID:
> > > > 1    -> Will
> > > > > > > > > > also
> > > > > > > > > > > > be
> > > > > > > > > > > > > > > > dropped
> > > > > > > > > > > > > > > > > > > > > > > > > > > 5. SequenceID: 0,
> ChunkID:
> > > > 2    -> The
> > > > > > > > > > last
> > > > > > > > > > > > > > chunk of
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > message
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > For the existing
> behavior,
> > > > the consumer
> > > > > > > > > > > > assembles
> > > > > > > > > > > > > > > > > > messages
> > > > > > > > > > > > > > > > > > > > 3,4,5 into
> > > > > > > > > > > > > > > > > > > > > > > > > > > the original large
> message.
> > > > But the
> > > > > > > > > > changes
> > > > > > > > > > > > > > brought
> > > > > > > > > > > > > > > > > > about by
> > > > > > > > > > > > > > > > > > > > this PIP
> > > > > > > > > > > > > > > > > > > > > > > > > > > will cause the
> consumer to
> > > > use messages
> > > > > > > > > > > > 1,2,5 for
> > > > > > > > > > > > > > > > > > assembly.
> > > > > > > > > > > > > > > > > > > > There is
> > > > > > > > > > > > > > > > > > > > > > > > > > > no guarantee that the
> > > > producer will
> > > > > > > > > > split the
> > > > > > > > > > > > > > > > message in
> > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > same way
> > > > > > > > > > > > > > > > > > > > > > > > > > > twice before and
> after. For
> > > > example, the
> > > > > > > > > > > > > > producer's
> > > > > > > > > > > > > > > > > > > > maxMessageSize may
> > > > > > > > > > > > > > > > > > > > > > > > > > > be different. This may
> cause
> > > > the
> > > > > > > > > > consumer to
> > > > > > > > > > > > > > receive
> > > > > > > > > > > > > > > > a
> > > > > > > > > > > > > > > > > > > > corrupt
> > > > > > > > > > > > > > > > > > > > > > > > > > > message.
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > Also, this PIP
> increases the
> > > > complexity
> > > > > > > > > > of
> > > > > > > > > > > > > > handling
> > > > > > > > > > > > > > > > > > chunks
> > > > > > > > > > > > > > > > > > > > on the
> > > > > > > > > > > > > > > > > > > > > > > > > > > broker side. Brokers
> should,
> > > > in general,
> > > > > > > > > > > > treat
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > chunk
> > > > > > > > > > > > > > > > > > as
> > > > > > > > > > > > > > > > > > > > a normal
> > > > > > > > > > > > > > > > > > > > > > > > > > > message.
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > I think a simple better
> > > > approach is to
> > > > > > > > > > only
> > > > > > > > > > > > > > check the
> > > > > > > > > > > > > > > > > > > > deduplication
> > > > > > > > > > > > > > > > > > > > > > > > > > > for the last chunk of
> the
> > > > large message.
> > > > > > > > > > The
> > > > > > > > > > > > > > consumer
> > > > > > > > > > > > > > > > > > only
> > > > > > > > > > > > > > > > > > > > gets the
> > > > > > > > > > > > > > > > > > > > > > > > > > > whole message after
> > > > receiving the last
> > > > > > > > > > > > chunk. We
> > > > > > > > > > > > > > > > don't
> > > > > > > > > > > > > > > > > > need
> > > > > > > > > > > > > > > > > > > > to check
> > > > > > > > > > > > > > > > > > > > > > > > > > > the deduplication for
> all
> > > > previous
> > > > > > > > > > chunks.
> > > > > > > > > > > > Also
> > > > > > > > > > > > > > by
> > > > > > > > > > > > > > > > doing
> > > > > > > > > > > > > > > > > > > > this we only
> > > > > > > > > > > > > > > > > > > > > > > > > > > need bug fixes, we
> don't
> > > > need to
> > > > > > > > > > introduce a
> > > > > > > > > > > > new
> > > > > > > > > > > > > > PIP.
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > BR,
> > > > > > > > > > > > > > > > > > > > > > > > > > > Zike Yang
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Aug 18, 2023 at
> > > > 7:54 PM Xiangying
> > > > > > > > > > > > Meng <
> > > > > > > > > > > > > > > > > > > > xiangy...@apache.org>
> > > > > > > > > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > Dear Community,
> > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > I hope this email
> finds
> > > > you well. I'd
> > > > > > > > > > like
> > > > > > > > > > > > to
> > > > > > > > > > > > > > > > address
> > > > > > > > > > > > > > > > > > an
> > > > > > > > > > > > > > > > > > > > important
> > > > > > > > > > > > > > > > > > > > > > > > > > > > issue related to
> Apache
> > > > Pulsar and
> > > > > > > > > > discuss
> > > > > > > > > > > > a
> > > > > > > > > > > > > > > > solution
> > > > > > > > > > > > > > > > > > I've
> > > > > > > > > > > > > > > > > > > > proposed on
> > > > > > > > > > > > > > > > > > > > > > > > > > > > GitHub. The problem
> > > > pertains to the
> > > > > > > > > > > > handling of
> > > > > > > > > > > > > > > > Chunk
> > > > > > > > > > > > > > > > > > > > Messages after
> > > > > > > > > > > > > > > > > > > > > > > > > > > > enabling
> deduplication.
> > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > In the current
> version of
> > > > Apache
> > > > > > > > > > Pulsar,
> > > > > > > > > > > > all
> > > > > > > > > > > > > > > > chunks of
> > > > > > > > > > > > > > > > > > a
> > > > > > > > > > > > > > > > > > > > Chunk Message
> > > > > > > > > > > > > > > > > > > > > > > > > > > > share the same
> sequence
> > > > ID. However,
> > > > > > > > > > > > enabling
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > depublication
> > > > > > > > > > > > > > > > > > > > > > > > > > > > feature results in an
> > > > inability to send
> > > > > > > > > > > > Chunk
> > > > > > > > > > > > > > > > > > Messages. To
> > > > > > > > > > > > > > > > > > > > tackle this
> > > > > > > > > > > > > > > > > > > > > > > > > > > > problem, I've
> proposed a
> > > > solution [1]
> > > > > > > > > > that
> > > > > > > > > > > > > > ensures
> > > > > > > > > > > > > > > > > > > > messages are not
> > > > > > > > > > > > > > > > > > > > > > > > > > > > duplicated throughout
> > > > end-to-end
> > > > > > > > > > delivery.
> > > > > > > > > > > > > > While
> > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > fix
> > > > > > > > > > > > > > > > > > > > addresses
> > > > > > > > > > > > > > > > > > > > > > > > > > > > the duplication
> issue for
> > > > end-to-end
> > > > > > > > > > > > messages,
> > > > > > > > > > > > > > > > there
> > > > > > > > > > > > > > > > > > > > remains a
> > > > > > > > > > > > > > > > > > > > > > > > > > > > possibility of
> duplicate
> > > > chunks within
> > > > > > > > > > > > topics.
> > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > To address this
> concern, I
> > > > believe we
> > > > > > > > > > > > should
> > > > > > > > > > > > > > > > introduce
> > > > > > > > > > > > > > > > > > a
> > > > > > > > > > > > > > > > > > > > "Chunk ID
> > > > > > > > > > > > > > > > > > > > > > > > > > > > map" at the Broker
> level,
> > > > similar to
> > > > > > > > > > the
> > > > > > > > > > > > > > existing
> > > > > > > > > > > > > > > > > > > > "sequence ID map",
> > > > > > > > > > > > > > > > > > > > > > > > > > > > to facilitate
> effective
> > > > filtering.
> > > > > > > > > > However,
> > > > > > > > > > > > > > > > > > implementing
> > > > > > > > > > > > > > > > > > > > this has led
> > > > > > > > > > > > > > > > > > > > > > > > > > > > to a challenge: a
> producer
> > > > requires
> > > > > > > > > > storage
> > > > > > > > > > > > > > for two
> > > > > > > > > > > > > > > > > > Long
> > > > > > > > > > > > > > > > > > > > values
> > > > > > > > > > > > > > > > > > > > > > > > > > > > simultaneously
> (sequence
> > > > ID and chunk
> > > > > > > > > > ID).
> > > > > > > > > > > > > > Because
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > snapshot of the
> > > > > > > > > > > > > > > > > > > > > > > > > > > > sequence ID map is
> stored
> > > > through the
> > > > > > > > > > > > > > properties
> > > > > > > > > > > > > > > > of the
> > > > > > > > > > > > > > > > > > > > cursor
> > > > > > > > > > > > > > > > > > > > > > > > > > > > (Map<String, Long>),
> so in
> > > > order to
> > > > > > > > > > > > satisfy the
> > > > > > > > > > > > > > > > > > storage of
> > > > > > > > > > > > > > > > > > > > two Longs
> > > > > > > > > > > > > > > > > > > > > > > > > > > > (sequence ID, chunk
> ID)
> > > > corresponding
> > > > > > > > > > to
> > > > > > > > > > > > one
> > > > > > > > > > > > > > > > producer,
> > > > > > > > > > > > > > > > > > we
> > > > > > > > > > > > > > > > > > > > hope to add
> > > > > > > > > > > > > > > > > > > > > > > > > > > > a mark
> DeleteProperties
> > > > (Map<String,
> > > > > > > > > > Long>)
> > > > > > > > > > > > > > String,
> > > > > > > > > > > > > > > > > > > > String>) to
> > > > > > > > > > > > > > > > > > > > > > > > > > > > replace the
> properties
> > > > (Map<String,
> > > > > > > > > > Long>)
> > > > > > > > > > > > > > field.
> > > > > > > > > > > > > > > > To
> > > > > > > > > > > > > > > > > > > > resolve this,
> > > > > > > > > > > > > > > > > > > > > > > > > > > > I've proposed an
> > > > alternative proposal
> > > > > > > > > > [2]
> > > > > > > > > > > > > > > > involving the
> > > > > > > > > > > > > > > > > > > > introduction
> > > > > > > > > > > > > > > > > > > > > > > > > > > > of a "mark
> > > > DeleteProperties"
> > > > > > > > > > (Map<String,
> > > > > > > > > > > > > > String>)
> > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > replace the
> > > > > > > > > > > > > > > > > > > > > > > > > > > > current properties
> > > > (Map<String, Long>)
> > > > > > > > > > > > field.
> > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > I'd appreciate it if
> you
> > > > carefully
> > > > > > > > > > review
> > > > > > > > > > > > both
> > > > > > > > > > > > > > PRs
> > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > > > > share your
> > > > > > > > > > > > > > > > > > > > > > > > > > > > valuable feedback and
> > > > insights. Thank
> > > > > > > > > > you
> > > > > > > > > > > > > > > > immensely for
> > > > > > > > > > > > > > > > > > > > your time and
> > > > > > > > > > > > > > > > > > > > > > > > > > > > attention. I eagerly
> > > > anticipate your
> > > > > > > > > > > > valuable
> > > > > > > > > > > > > > > > opinions
> > > > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > > > > > > > > > > > > recommendations.
> > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > Warm regards,
> > > > > > > > > > > > > > > > > > > > > > > > > > > > Xiangying
> > > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > > > > > > > > [1]
> > > > > > > > > > > > > > https://github.com/apache/pulsar/pull/20948
> > > > > > > > > > > > > > > > > > > > > > > > > > > > [2]
> > > > > > > > > > > > > > https://github.com/apache/pulsar/pull/21027
> > > > > > > > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > >
> > > >
>

Reply via email to