> In this case, the consumer only can receive m1.

Regarding this comment, can you explain how the consumer only receives m1?
Here, m1's and m2's uuid and msgId will be different(if we suffix with a
chunkingSessionId), although the sequence id is the same.

> If we throw an exception when users use the same sequence to send the
message.
Do You mean `If "producers" throw an exception when users use the same
sequence to send the message.`.
Again, when the producers restart, they lose the last sequence id sent.


> If we do not throw an exception when users use the same sequence to
send the message.

For this logic, how do we handle if the producer suddenly resends the
chunked message with a different chunking scheme(e.g. maxMessageSize) ?
uuid=1, sid=0, cid=0
uuid=1, sid=0, cid=1
uuid=2, sid=0, cid=0
uuid=2, sid=0, cid=1

We could refine what to track and algo logic on the broker side more, but
do we agree that the broker chunk dedup logic is needed?

I will continue to think more next week. Have a nice weekend.




On Fri, Aug 25, 2023 at 9:14 PM Xiangying Meng <xiangy...@apache.org> wrote:

> Hi Heesung,
>
> Maybe we only need to maintain the last chunk ID in a map.
> Map<producername, chunkID> map1.
> And we already have a map maintaining the last sequence ID.
> Map<producername, sequence ID> map2.
>
> If we do not throw an exception when users use the same sequence to
> send the message.
>
> For any incoming msg, m :
> chunk ID = -1;
> If m is a chunk message:
> chunk ID = m.chunkID.
>       If m.currentSeqid < LastSeqId, dedup.
>       If m.currentSeqid > LastSeqId && m.chunk ID = 0, nodedup
>                 if chunk ID exists in the map.
>                    Update it and log an error. This means there is an
> incomplete chunk message.
>                 If chunk ID does not exist in the map.
>                    Put it on the map.
>       If m.currentSeqid == LastSeqId,
>            1. if m.chunk ID == -1 || m.chunk ID == 0. dedup.
>            2. If 1 <= m.chunkID <= total chunk.
>               1. If chunk ID does not exist in the map. dedup.
>               2. If chunk ID exists in the map. dedup. Check the order
> of the chunkID to determine whether dedup;
>            3. If m.chunkID == total chunk, persistent the chunk and
> remove the chunkID in the map.
>
>
> If we throw an exception when users use the same sequence to send the
> message.
>
> For any incoming msg, m :
> chunk ID = 0;
> If m is a chunk message:
> chunk ID = m.chunkID.
>    If m.currentSeqid < LastSeqId, dedup.
>    If m.currentSeqid == LastSeqId.
>        If chunkID > 0, Check the last chunkID to determine whether to
> dedup.
>             If chunkID == 1, put chunkID into the map if absent.
>        IF chunkID = 0, dedup.
>
> BR,
> xiangying
>
> On Sat, Aug 26, 2023 at 11:53 AM Heesung Sohn
> <heesung.s...@streamnative.io.invalid> wrote:
> >
> > However, what If the producer jvm gets restarted after the broker
> persists
> > the m1 (but before updating their sequence id in their persistence
> > storage), and the producer is trying to resend the same msg(so m2) with
> the
> > same sequence id after restarting?
> >
> >
> >
> >
> >
> > On Fri, Aug 25, 2023 at 8:22 PM Xiangying Meng <xiangy...@apache.org>
> wrote:
> >
> > > Hi Heesung,
> > >
> > > In this case, the consumer only can receive m1.
> > >
> > > But it has the same content as the previous case: What should we do if
> > > the user sends messages with the sequence ID that was used previously?
> > >
> > > I am afraid to introduce the incompatibility in this case, so I only
> > > added a warning log in the PR[0] instead of throwing an exception.
> > > Regarding this matter, what do you think? Should we throw an exception
> > > or add error logs?
> > >
> > > I'm looking forward to hearing your viewpoint.
> > >
> > > Thanks,
> > > Xiangying
> > >
> > > [0] https://github.com/apache/pulsar/pull/21047
> > >
> > > On Sat, Aug 26, 2023 at 10:58 AM Heesung Sohn
> > > <heesung.s...@streamnative.io.invalid> wrote:
> > > >
> > > > Actually, can we think about this case too?
> > > >
> > > > What happens if the cx sends the same chunked msg with the same seq
> id
> > > when
> > > > dedup is enabled?
> > > >
> > > > // user send a chunked msg, m1
> > > > s1, c0
> > > > s1, c1
> > > > s1, c2 // complete
> > > >
> > > > // user resend the duplicate msg, m2
> > > > s1, c0
> > > > s1, c1
> > > > s1, c2 //complete
> > > >
> > > > Do consumers receive m1 and m2(no dedup)?
> > > >
> > > >
> > > >
> > > > On Fri, Aug 25, 2023 at 6:55 PM Xiangying Meng <xiangy...@apache.org
> >
> > > wrote:
> > > >
> > > > > Hi Heesung,
> > > > >
> > > > > >I think this means, for the PIP, the broker side's chunk
> > > deduplication.
> > > > > >I think brokers probably need to track map<uuid, last_chunk_id> to
> > > dedup
> > > > >
> > > > > What is the significance of doing this?
> > > > > My understanding is that if the client crashes and restarts after
> > > > > sending half of a chunk message and then it resends the previous
> chunk
> > > > > message, the resent chunk message should be treated as a new
> message
> > > > > since it calls the producer's API again.
> > > > > If deduplication is enabled, users should ensure that their
> customized
> > > > > sequence ID is not lower than the previous sequence ID.
> > > > > I have considered this scenario and added a warning log in PR[0].
> (I'm
> > > > > not sure whether an error log should be added or an exception
> thrown.)
> > > > > If deduplication is not enabled, on the consumer side, there
> should be
> > > > > an incomplete chunk message received alongside another complete
> chunk
> > > > > message, each with a different UUID, and they will not interfere
> with
> > > > > each other.
> > > > >
> > > > > My main point is that every message sent using
> > > > > `producer.newMessage().send()` should be treated as a new message.
> > > > > UUID is solely used for the consumer side to identify different
> chunk
> > > > > messages.
> > > > >
> > > > > BR
> > > > > Xiangying
> > > > >
> > > > > [0] https://github.com/apache/pulsar/pull/21047
> > > > >
> > > > > On Sat, Aug 26, 2023 at 9:34 AM Heesung Sohn
> > > > > <heesung.s...@streamnative.io.invalid> wrote:
> > > > > >
> > > > > > I think this means, for the PIP, the broker side's chunk
> > > deduplication.
> > > > > > I think brokers probably need to track map<uuid, last_chunk_id>
> to
> > > dedup
> > > > > > chunks on the broker side.
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Fri, Aug 25, 2023 at 6:16 PM Xiangying Meng <
> xiangy...@apache.org
> > > >
> > > > > wrote:
> > > > > >
> > > > > > > Hi Heesung
> > > > > > >
> > > > > > > It is a good point.
> > > > > > > Assume the producer application jvm restarts in the middle of
> > > chunking
> > > > > and
> > > > > > > resends the message chunks from the beginning with the previous
> > > > > sequence
> > > > > > > id.
> > > > > > >
> > > > > > > For the previous version, it should be:
> > > > > > >
> > > > > > > Producer send:
> > > > > > > 1. SequenceID: 0, ChunkID: 0
> > > > > > > 2. SequenceID: 0, ChunkID: 1
> > > > > > > 3. SequenceID: 0, ChunkID: 0
> > > > > > > 4. SequenceID: 0, ChunkID: 1
> > > > > > > 5. SequenceID: 0, ChunkID: 2
> > > > > > >
> > > > > > > Consumer receive:
> > > > > > > 1. SequenceID: 0, ChunkID: 0
> > > > > > > 2. SequenceID: 0, ChunkID: 1
> > > > > > > 3. SequenceID: 0, ChunkID: 0 // chunk ID out of order. Release
> this
> > > > > > > chunk and recycle its `chunkedMsgCtx`.
> > > > > > > 4. SequenceID: 0, ChunkID: 1  // chunkedMsgCtx == null Release
> it.
> > > > > > > 5. SequenceID: 0, ChunkID: 2  // chunkedMsgCtx == null Release
> it.
> > > > > > >
> > > > > > > Therefore, for the previous version, this chunk message can
> not be
> > > > > > > received by the consumer. It is not an incompatibility issue.
> > > > > > >
> > > > > > > However, the solution of optimizing the `uuid` is valuable to
> the
> > > new
> > > > > > > implementation.
> > > > > > > I will modify this in the PR[0]. Thank you very much for your
> > > reminder
> > > > > > > and the provided UUID optimization solution.
> > > > > > >
> > > > > > > BR,
> > > > > > > Xiangying
> > > > > > >
> > > > > > > [0] https://github.com/apache/pulsar/pull/20948
> > > > > > >
> > > > > > > On Sat, Aug 26, 2023 at 8:48 AM Heesung Sohn
> > > > > > > <heesung.s...@streamnative.io.invalid> wrote:
> > > > > > > >
> > > > > > > > Hi, I meant
> > > > > > > >
> > > > > > > > What if the producer application jvm restarts in the middle
> of
> > > > > chunking
> > > > > > > and
> > > > > > > > resends the message chunks from the beginning with the
> previous
> > > > > sequence
> > > > > > > id?
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Aug 25, 2023 at 5:15 PM Xiangying Meng <
> > > xiangy...@apache.org
> > > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Heesung
> > > > > > > > >
> > > > > > > > > It is a good idea to cover this incompatibility case if the
> > > > > producer
> > > > > > > > > splits the chunk message again when retrying.
> > > > > > > > >
> > > > > > > > > But in fact, the producer only resents the chunks that are
> > > > > assembled
> > > > > > > > > to `OpSendMsg` instead of splitting the chunk message
> again.
> > > > > > > > > So, there is no incompatibility issue of resenting the
> chunk
> > > > > message
> > > > > > > > > by splitting the chunk message again.
> > > > > > > > >
> > > > > > > > > The logic of sending chunk messages can be found here:
> > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > >
> > >
> https://github.com/apache/pulsar/blob/e0c481e5f8d7fa5534d3327785928a234376789e/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L533
> > > > > > > > >
> > > > > > > > > The logic of resending the message can be found here:
> > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > >
> > >
> https://github.com/apache/pulsar/blob/e0c481e5f8d7fa5534d3327785928a234376789e/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1892
> > > > > > > > >
> > > > > > > > > BR,
> > > > > > > > > Xiangying
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Sat, Aug 26, 2023 at 2:24 AM Heesung Sohn
> > > > > > > > > <heesung.s...@streamnative.io.invalid> wrote:
> > > > > > > > > >
> > > > > > > > > > >> I think brokers can track the last
> chunkMaxMessageSize for
> > > > > each
> > > > > > > > > producer.
> > > > > > > > > >
> > > > > > > > > > > Using different chunkMaxMessageSize is just one of the
> > > > > aspects. In
> > > > > > > > > > PIP-132 [0], we have included the message metadata size
> when
> > > > > checking
> > > > > > > > > > maxMessageSize.The message metadata can be changed after
> > > > > splitting
> > > > > > > the
> > > > > > > > > > chunks. We are still uncertain about the way the chunked
> > > message
> > > > > is
> > > > > > > > > split,
> > > > > > > > > > even using the same ss chunkMaxMessageSize.
> > > > > > > > > >
> > > > > > > > > > This sounds like we need to revisit chunking uuid logic.
> > > > > > > > > > Like I commented here,
> > > > > > > > > >
> > > https://github.com/apache/pulsar/pull/20948/files#r1305997883
> > > > > > > > > > Why don't we add a chunk session id suffix to identify
> the
> > > > > ongoing
> > > > > > > > > chunking
> > > > > > > > > > uniquely?
> > > > > > > > > >
> > > > > > > > > > Currently,
> > > > > > > > > >
> > > > > > > > > > chunking uuid = producer + sequence_id
> > > > > > > > > >
> > > > > > > > > > Proposal
> > > > > > > > > > chunking  uuid = producer + sequence_id +
> chunkingSessionId
> > > > > > > > > >
> > > > > > > > > > * chunkingSessionId could be a timestamp when the
> chunking
> > > > > started.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Fri, Aug 25, 2023 at 6:02 AM Xiangying Meng <
> > > > > xiangy...@apache.org
> > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Zike,
> > > > > > > > > > >
> > > > > > > > > > > >How would this happen to get two duplicated and
> > > consecutive
> > > > > > > ChunkID-1
> > > > > > > > > > > >messages? The producer should guarantee to retry the
> whole
> > > > > chunked
> > > > > > > > > > > >messages instead of some parts of the chunks.
> > > > > > > > > > >
> > > > > > > > > > > If the producer guarantees to retry the whole chunked
> > > messages
> > > > > > > instead
> > > > > > > > > > > of some parts of the chunks,
> > > > > > > > > > > Why doesn't the bug of the producer retry chunk
> messages in
> > > > > the PR
> > > > > > > [0]
> > > > > > > > > > > appear?
> > > > > > > > > > > And why do you need to set `chunkId` in
> `op.rePopulate`?
> > > > > > > > > > > It will be rested when split chunk messages again if
> the
> > > > > producer
> > > > > > > > > > > guarantees to retry the whole chunked messages.
> > > > > > > > > > > ```
> > > > > > > > > > > final MessageMetadata finalMsgMetadata = msgMetadata;
> > > > > > > > > > > op.rePopulate = () -> {
> > > > > > > > > > > if (msgMetadata.hasChunkId()) {
> > > > > > > > > > > // The message metadata is shared between all chunks
> in a
> > > large
> > > > > > > message
> > > > > > > > > > > // We need to reset the chunk id for each call of this
> > > method
> > > > > > > > > > > // It's safe to do that because there is only 1 thread
> to
> > > > > > > manipulate
> > > > > > > > > > > this message metadata
> > > > > > > > > > > finalMsgMetadata.setChunkId(chunkId);
> > > > > > > > > > > }
> > > > > > > > > > > op.cmd = sendMessage(producerId, sequenceId,
> numMessages,
> > > > > > > messageId,
> > > > > > > > > > > finalMsgMetadata,
> > > > > > > > > > > encryptedPayload);
> > > > > > > > > > > };
> > > > > > > > > > >
> > > > > > > > > > > ```
> > > > > > > > > > >
> > > > > > > > > > > >> But chunks 1, 2, 3, and 4 are still persisted in the
> > > topic.
> > > > > > > > > > > >
> > > > > > > > > > > >I think it's OK to persist them all on the topic. Is
> > > there any
> > > > > > > issue
> > > > > > > > > > > >with doing that?
> > > > > > > > > > >
> > > > > > > > > > > This is just one scenario. Whether only check the
> sequence
> > > ID
> > > > > of
> > > > > > > the
> > > > > > > > > > > first chunk (as I used in PR[1]) or check the sequence
> ID
> > > of
> > > > > the
> > > > > > > last
> > > > > > > > > > > chunk (as you suggested), in reality, neither of these
> > > methods
> > > > > can
> > > > > > > > > > > deduplicate chunks on the broker side because the
> broker
> > > cannot
> > > > > > > know
> > > > > > > > > > > the chunk ID of the previous message.
> > > > > > > > > > >
> > > > > > > > > > > However, if combined with the optimization of
> consumer-side
> > > > > logic,
> > > > > > > > > > > end-to-end deduplication can be completed.
> > > > > > > > > > > This is also a less-than-perfect solution I mentioned
> in my
> > > > > first
> > > > > > > > > > > email and implemented in PR[1].
> > > > > > > > > > >
> > > > > > > > > > > The reason I propose this proposal is not to solve the
> > > > > end-to-end
> > > > > > > > > > > deduplication of chunk messages between producers and
> > > > > consumers.
> > > > > > > That
> > > > > > > > > > > aspect has essentially been addressed in PR[1] and is
> still
> > > > > > > undergoing
> > > > > > > > > > > review.
> > > > > > > > > > >
> > > > > > > > > > > This proposal aims to ensure that no corrupt data
> exists
> > > > > within the
> > > > > > > > > > > topic, as our data might be offloaded and used
> elsewhere in
> > > > > > > scenarios
> > > > > > > > > > > where consumer logic is not optimized.
> > > > > > > > > > >
> > > > > > > > > > > BR,
> > > > > > > > > > > Xiangying
> > > > > > > > > > >
> > > > > > > > > > > [0] https://github.com/apache/pulsar/pull/21048
> > > > > > > > > > > [1] https://github.com/apache/pulsar/pull/20948
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Aug 25, 2023 at 5:18 PM Zike Yang <
> z...@apache.org
> > > >
> > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > HI xiangying
> > > > > > > > > > > >
> > > > > > > > > > > > > The rewind operation is seen in the test log.
> > > > > > > > > > > >
> > > > > > > > > > > > That seems weird. Not sure if this rewind is related
> to
> > > the
> > > > > chunk
> > > > > > > > > > > consuming.
> > > > > > > > > > > >
> > > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0
> > > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1
> > > > > > > > > > > > 3. SequenceID: 0, ChunkID: 1
> > > > > > > > > > > > 4. SequenceID: 0, ChunkID: 2
> > > > > > > > > > > > Such four chunks cannot be processed correctly by the
> > > > > consumer.
> > > > > > > > > > > >
> > > > > > > > > > > > How would this happen to get two duplicated and
> > > consecutive
> > > > > > > ChunkID-1
> > > > > > > > > > > > messages? The producer should guarantee to retry the
> > > whole
> > > > > > > chunked
> > > > > > > > > > > > messages instead of some parts of the chunks.
> > > > > > > > > > > >
> > > > > > > > > > > > > But chunks 1, 2, 3, and 4 are still persisted in
> the
> > > topic.
> > > > > > > > > > > >
> > > > > > > > > > > > I think it's OK to persist them all in the topic. Is
> > > there
> > > > > any
> > > > > > > issue
> > > > > > > > > > > > with doing that?
> > > > > > > > > > > >
> > > > > > > > > > > > > There is another point. The resend of the chunk
> message
> > > > > has a
> > > > > > > bug
> > > > > > > > > that
> > > > > > > > > > > > I shared with you, and you fixed in PR [0]. It will
> make
> > > this
> > > > > > > case
> > > > > > > > > > > > happen in another way.
> > > > > > > > > > > >
> > > > > > > > > > > > If the user sets the sequence ID manually, the case
> > > could be
> > > > > > > > > reproduced.
> > > > > > > > > > > >
> > > > > > > > > > > > BR,
> > > > > > > > > > > > Zike Yang
> > > > > > > > > > > >
> > > > > > > > > > > > On Thu, Aug 24, 2023 at 8:48 PM Xiangying Meng <
> > > > > > > xiangy...@apache.org
> > > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > >IIUC, this may change the existing behavior and
> may
> > > > > introduce
> > > > > > > > > > > inconsistencies.
> > > > > > > > > > > > > >Suppose that we have a large message with 3
> chunks.
> > > But
> > > > > the
> > > > > > > > > producer
> > > > > > > > > > > > > >crashes and resends the message after sending the
> > > > > chunk-1. It
> > > > > > > will
> > > > > > > > > > > > > >send a total of 5 messages to the Pulsar topic:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >1. SequenceID: 0, ChunkID: 0
> > > > > > > > > > > > > >2. SequenceID: 0, ChunkID: 1
> > > > > > > > > > > > > >3. SequenceID: 0, ChunkID: 0   -> This message
> will be
> > > > > dropped
> > > > > > > > > > > > > >4. SequenceID: 0, ChunkID: 1    -> Will also be
> > > dropped
> > > > > > > > > > > > > >5. SequenceID: 0, ChunkID: 2    -> The last chunk
> of
> > > the
> > > > > > > message
> > > > > > > > > > > > >
> > > > > > > > > > > > > Hi Zike
> > > > > > > > > > > > > There is another point. The resend of the chunk
> message
> > > > > has a
> > > > > > > bug
> > > > > > > > > that
> > > > > > > > > > > > > I shared with you, and you fixed in PR [0]. It will
> > > make
> > > > > this
> > > > > > > case
> > > > > > > > > > > > > happen in another way.
> > > > > > > > > > > > > Sample description for the  bug:
> > > > > > > > > > > > > Because the chunk message uses the same message
> > > metadata,
> > > > > if
> > > > > > > the
> > > > > > > > > chunk
> > > > > > > > > > > > > is not sent out immediately. Then, when resending,
> all
> > > > > chunks
> > > > > > > of
> > > > > > > > > the
> > > > > > > > > > > > > same chunk message use the chunk ID of the last
> chunk.
> > > > > > > > > > > > > In this case, It should happen as:
> > > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0 (Put op1 into
> > > > > `pendingMessages`
> > > > > > > and
> > > > > > > > > send)
> > > > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1 (Put op2 into
> > > > > `pendingMessages`
> > > > > > > and
> > > > > > > > > send)
> > > > > > > > > > > > > 3. SequenceID: 0, ChunkID: 2   -> (Put op3 into
> > > > > > > `pendingMessages`)
> > > > > > > > > > > > > 4. SequenceID: 0, ChunkID: 2   -> (Resend op1)
> > > > > > > > > > > > > 5. SequenceID: 0, ChunkID: 2   -> (Resend op2)
> > > > > > > > > > > > > 6. SequenceID: 0, ChunkID: 2   -> (Send op3)
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > BR,
> > > > > > > > > > > > > Xiangying
> > > > > > > > > > > > >
> > > > > > > > > > > > > [0] - https://github.com/apache/pulsar/pull/21048
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Thu, Aug 24, 2023 at 8:09 PM Xiangying Meng <
> > > > > > > > > xiangy...@apache.org>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > >> This solution also cannot solve the
> out-of-order
> > > > > messages
> > > > > > > > > inside
> > > > > > > > > > > the
> > > > > > > > > > > > > > >>chunks. For example, the above five messages
> will
> > > > > still be
> > > > > > > > > > > persisted.
> > > > > > > > > > > > > > >The consumer already handles this case. The
> above 5
> > > > > messages
> > > > > > > > > will
> > > > > > > > > > > all
> > > > > > > > > > > > > > >be persisted but the consumer will skip message
> 1
> > > and 2.
> > > > > > > > > > > > > > >For messages 3, 4, and 5. The producer can
> guarantee
> > > > > these
> > > > > > > > > chunks
> > > > > > > > > > > are in order.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > The rewind operation is seen in the test log.
> Every
> > > time
> > > > > an
> > > > > > > > > incorrect
> > > > > > > > > > > > > > chunk message is received, it will rewind, and
> the
> > > code
> > > > > has
> > > > > > > yet
> > > > > > > > > to be
> > > > > > > > > > > > > > studied in depth.
> > > > > > > > > > > > > > If it does not call rewind, then this case is
> > > considered
> > > > > a
> > > > > > > > > workable
> > > > > > > > > > > > > > case. Let's look at another case.
> > > > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0
> > > > > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1
> > > > > > > > > > > > > > 3. SequenceID: 0, ChunkID: 1
> > > > > > > > > > > > > > 4. SequenceID: 0, ChunkID: 2
> > > > > > > > > > > > > > Such four chunks cannot be processed correctly
> by the
> > > > > > > consumer.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > In fact, this solution is my original idea. The
> PR I
> > > > > > > mentioned
> > > > > > > > > in the
> > > > > > > > > > > > > > first email above uses a similar solution and
> > > modifies
> > > > > the
> > > > > > > logic
> > > > > > > > > on
> > > > > > > > > > > > > > the consumer side.
> > > > > > > > > > > > > > Also, as I mentioned in the first email, this
> > > solution
> > > > > can
> > > > > > > only
> > > > > > > > > solve
> > > > > > > > > > > > > > the problem of end-to-end duplication. But
> chunks 1,
> > > 2,
> > > > > 3,
> > > > > > > and 4
> > > > > > > > > are
> > > > > > > > > > > > > > still persisted in the topic.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Thu, Aug 24, 2023 at 3:00 PM Zike Yang <
> > > > > z...@apache.org>
> > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hi Heesung,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > I believe in this PIP "similar to the
> existing
> > > > > "sequence
> > > > > > > ID
> > > > > > > > > map",
> > > > > > > > > > > > > > > to facilitate effective filtering" actually
> means
> > > > > tracking
> > > > > > > the
> > > > > > > > > last
> > > > > > > > > > > > > > > chunkId(not all chunk ids) on the broker side.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > With this simple solution, I think we don't
> need to
> > > > > track
> > > > > > > the
> > > > > > > > > > > > > > > (sequenceID, chunkID) on the broker side at
> all.
> > > The
> > > > > broker
> > > > > > > > > just
> > > > > > > > > > > needs
> > > > > > > > > > > > > > > to apply the deduplication logic to the last
> chunk
> > > > > instead
> > > > > > > of
> > > > > > > > > all
> > > > > > > > > > > > > > > previous chunks. This PIP actually could do
> that,
> > > but
> > > > > it
> > > > > > > will
> > > > > > > > > > > > > > > introduce a new data format and compatibility
> > > issue.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > This is still a behavior change(deduping
> chunk
> > > > > messages
> > > > > > > on
> > > > > > > > > the
> > > > > > > > > > > broker),
> > > > > > > > > > > > > > > and I believe we need to discuss this addition
> as a
> > > > > PIP.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Actually, we didn't specifically state the
> deduping
> > > > > chunk
> > > > > > > > > message
> > > > > > > > > > > > > > > behavior before. The chunked message should be
> > > equally
> > > > > > > > > applicable
> > > > > > > > > > > to
> > > > > > > > > > > > > > > the de-duplication logic as a regular message.
> > > > > Therefore, I
> > > > > > > > > think
> > > > > > > > > > > it
> > > > > > > > > > > > > > > should be considered as a bug fix. But if this
> FIX
> > > is
> > > > > worth
> > > > > > > > > > > discussing
> > > > > > > > > > > > > > > in depth. I have no objection to it being a new
> > > PIP.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > I think brokers can track the last
> > > > > chunkMaxMessageSize
> > > > > > > for
> > > > > > > > > > > > > > > each producer.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Using different chunkMaxMessageSize is just
> one of
> > > the
> > > > > > > > > aspects. In
> > > > > > > > > > > > > > > PIP-132 [0], we have included the message
> metadata
> > > size
> > > > > > > when
> > > > > > > > > > > checking
> > > > > > > > > > > > > > > maxMessageSize.
> > > > > > > > > > > > > > > The message metadata can be changed after
> > > splitting the
> > > > > > > > > chunks. We
> > > > > > > > > > > are
> > > > > > > > > > > > > > > still uncertain about the way the chunked
> message
> > > is
> > > > > split,
> > > > > > > > > even
> > > > > > > > > > > using
> > > > > > > > > > > > > > > the same ss chunkMaxMessageSize.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > then the brokers can assume that the
> producer is
> > > > > > > resending
> > > > > > > > > the
> > > > > > > > > > > chunks from
> > > > > > > > > > > > > > > the beginning with a different scheme(restarted
> > > with a
> > > > > > > > > different
> > > > > > > > > > > > > > > chunkMaxMessageSize) and accept those new
> chunks
> > > from
> > > > > the
> > > > > > > > > > > beginning.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Regarding this, it seems like we are
> implementing
> > > > > dynamic
> > > > > > > > > > > > > > > configuration for the chunkMaxMessageSize. I'm
> > > afraid
> > > > > that
> > > > > > > this
> > > > > > > > > > > would
> > > > > > > > > > > > > > > change the expected behavior and introduce more
> > > > > complexity
> > > > > > > to
> > > > > > > > > this
> > > > > > > > > > > > > > > configuration.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > [0]
> https://github.com/apache/pulsar/pull/14007
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > BR,
> > > > > > > > > > > > > > > Zike Yang
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Thu, Aug 24, 2023 at 2:21 PM Zike Yang <
> > > > > z...@apache.org
> > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Hi, xiangying
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > it will find that the message
> > > > > > > > > > > > > > > > is out of order and rewind the cursor. Loop
> this
> > > > > > > operation,
> > > > > > > > > and
> > > > > > > > > > > > > > > > discard this message after it expires
> instead of
> > > > > > > assembling
> > > > > > > > > 3,
> > > > > > > > > > > 4, 5
> > > > > > > > > > > > > > > > into a message.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Could you point out where the implementation
> for
> > > > > this?
> > > > > > > From
> > > > > > > > > my
> > > > > > > > > > > > > > > > understanding, there should not be any rewind
> > > > > operation
> > > > > > > for
> > > > > > > > > the
> > > > > > > > > > > > > > > > chunking feature. You can check more detail
> here:
> > > > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > >
> > >
> https://streamnative.io/blog/deep-dive-into-message-chunking-in-pulsar#how-message-chunking-is-implemented
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > This solution also cannot solve the
> > > out-of-order
> > > > > > > messages
> > > > > > > > > > > inside the
> > > > > > > > > > > > > > > > chunks. For example, the above five messages
> will
> > > > > still
> > > > > > > be
> > > > > > > > > > > persisted.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > The consumer already handles this case. The
> > > above 5
> > > > > > > messages
> > > > > > > > > > > will all
> > > > > > > > > > > > > > > > be persisted but the consumer will skip
> message 1
> > > > > and 2.
> > > > > > > > > > > > > > > > For messages 3, 4, and 5. The producer can
> > > guarantee
> > > > > > > these
> > > > > > > > > > > chunks are in order.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > BR,
> > > > > > > > > > > > > > > > Zike Yang
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On Thu, Aug 24, 2023 at 11:48 AM Yubiao Feng
> > > > > > > > > > > > > > > > <yubiao.f...@streamnative.io.invalid> wrote:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0
> > > > > > > > > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1
> > > > > > > > > > > > > > > > > > 3. SequenceID: 0, ChunkID: 0
> > > > > > > > > > > > > > > > > > 4. SequenceID: 0, ChunkID: 1
> > > > > > > > > > > > > > > > > > 5. SequenceID: 0, ChunkID: 2
> > > > > > > > > > > > > > > > > > For the existing behavior, the consumer
> > > assembles
> > > > > > > > > > > > > > > > > > messages 3,4,5 into
> > > > > > > > > > > > > > > > > > the original large message. But the
> changes
> > > > > brought
> > > > > > > > > > > > > > > > > > about by this PIP
> > > > > > > > > > > > > > > > > > will cause the consumer to use messages
> > > 1,2,5 for
> > > > > > > > > > > > > > > > > > assembly. There is
> > > > > > > > > > > > > > > > > > no guarantee that the producer will
> split the
> > > > > message
> > > > > > > > > > > > > > > > > > in the same way
> > > > > > > > > > > > > > > > > > twice before and after. For example, the
> > > > > producer's
> > > > > > > > > > > > > > > > > > maxMessageSize may
> > > > > > > > > > > > > > > > > > be different. This may cause the
> consumer to
> > > > > > > > > > > > > > > > > > receive a corrupt
> > > > > > > > > > > > > > > > > > message.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Good point.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Thanks
> > > > > > > > > > > > > > > > > Yubiao Feng
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > On Wed, Aug 23, 2023 at 12:34 PM Zike Yang
> <
> > > > > > > > > z...@apache.org>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Hi, xiangying,
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Thanks for your PIP.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > IIUC, this may change the existing
> behavior
> > > and
> > > > > may
> > > > > > > > > introduce
> > > > > > > > > > > > > > > > > > inconsistencies.
> > > > > > > > > > > > > > > > > > Suppose that we have a large message
> with 3
> > > > > chunks.
> > > > > > > But
> > > > > > > > > the
> > > > > > > > > > > producer
> > > > > > > > > > > > > > > > > > crashes and resends the message after
> > > sending the
> > > > > > > > > chunk-1.
> > > > > > > > > > > It will
> > > > > > > > > > > > > > > > > > send a total of 5 messages to the Pulsar
> > > topic:
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0
> > > > > > > > > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1
> > > > > > > > > > > > > > > > > > 3. SequenceID: 0, ChunkID: 0   -> This
> > > message
> > > > > will
> > > > > > > be
> > > > > > > > > > > dropped
> > > > > > > > > > > > > > > > > > 4. SequenceID: 0, ChunkID: 1    -> Will
> also
> > > be
> > > > > > > dropped
> > > > > > > > > > > > > > > > > > 5. SequenceID: 0, ChunkID: 2    -> The
> last
> > > > > chunk of
> > > > > > > the
> > > > > > > > > > > message
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > For the existing behavior, the consumer
> > > assembles
> > > > > > > > > messages
> > > > > > > > > > > 3,4,5 into
> > > > > > > > > > > > > > > > > > the original large message. But the
> changes
> > > > > brought
> > > > > > > > > about by
> > > > > > > > > > > this PIP
> > > > > > > > > > > > > > > > > > will cause the consumer to use messages
> > > 1,2,5 for
> > > > > > > > > assembly.
> > > > > > > > > > > There is
> > > > > > > > > > > > > > > > > > no guarantee that the producer will
> split the
> > > > > > > message in
> > > > > > > > > the
> > > > > > > > > > > same way
> > > > > > > > > > > > > > > > > > twice before and after. For example, the
> > > > > producer's
> > > > > > > > > > > maxMessageSize may
> > > > > > > > > > > > > > > > > > be different. This may cause the
> consumer to
> > > > > receive
> > > > > > > a
> > > > > > > > > > > corrupt
> > > > > > > > > > > > > > > > > > message.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Also, this PIP increases the complexity
> of
> > > > > handling
> > > > > > > > > chunks
> > > > > > > > > > > on the
> > > > > > > > > > > > > > > > > > broker side. Brokers should, in general,
> > > treat
> > > > > the
> > > > > > > chunk
> > > > > > > > > as
> > > > > > > > > > > a normal
> > > > > > > > > > > > > > > > > > message.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > I think a simple better approach is to
> only
> > > > > check the
> > > > > > > > > > > deduplication
> > > > > > > > > > > > > > > > > > for the last chunk of the large message.
> The
> > > > > consumer
> > > > > > > > > only
> > > > > > > > > > > gets the
> > > > > > > > > > > > > > > > > > whole message after receiving the last
> > > chunk. We
> > > > > > > don't
> > > > > > > > > need
> > > > > > > > > > > to check
> > > > > > > > > > > > > > > > > > the deduplication for all previous
> chunks.
> > > Also
> > > > > by
> > > > > > > doing
> > > > > > > > > > > this we only
> > > > > > > > > > > > > > > > > > need bug fixes, we don't need to
> introduce a
> > > new
> > > > > PIP.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > BR,
> > > > > > > > > > > > > > > > > > Zike Yang
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > On Fri, Aug 18, 2023 at 7:54 PM Xiangying
> > > Meng <
> > > > > > > > > > > xiangy...@apache.org>
> > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Dear Community,
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > I hope this email finds you well. I'd
> like
> > > to
> > > > > > > address
> > > > > > > > > an
> > > > > > > > > > > important
> > > > > > > > > > > > > > > > > > > issue related to Apache Pulsar and
> discuss
> > > a
> > > > > > > solution
> > > > > > > > > I've
> > > > > > > > > > > proposed on
> > > > > > > > > > > > > > > > > > > GitHub. The problem pertains to the
> > > handling of
> > > > > > > Chunk
> > > > > > > > > > > Messages after
> > > > > > > > > > > > > > > > > > > enabling deduplication.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > In the current version of Apache
> Pulsar,
> > > all
> > > > > > > chunks of
> > > > > > > > > a
> > > > > > > > > > > Chunk Message
> > > > > > > > > > > > > > > > > > > share the same sequence ID. However,
> > > enabling
> > > > > the
> > > > > > > > > > > depublication
> > > > > > > > > > > > > > > > > > > feature results in an inability to send
> > > Chunk
> > > > > > > > > Messages. To
> > > > > > > > > > > tackle this
> > > > > > > > > > > > > > > > > > > problem, I've proposed a solution [1]
> that
> > > > > ensures
> > > > > > > > > > > messages are not
> > > > > > > > > > > > > > > > > > > duplicated throughout end-to-end
> delivery.
> > > > > While
> > > > > > > this
> > > > > > > > > fix
> > > > > > > > > > > addresses
> > > > > > > > > > > > > > > > > > > the duplication issue for end-to-end
> > > messages,
> > > > > > > there
> > > > > > > > > > > remains a
> > > > > > > > > > > > > > > > > > > possibility of duplicate chunks within
> > > topics.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > To address this concern, I believe we
> > > should
> > > > > > > introduce
> > > > > > > > > a
> > > > > > > > > > > "Chunk ID
> > > > > > > > > > > > > > > > > > > map" at the Broker level, similar to
> the
> > > > > existing
> > > > > > > > > > > "sequence ID map",
> > > > > > > > > > > > > > > > > > > to facilitate effective filtering.
> However,
> > > > > > > > > implementing
> > > > > > > > > > > this has led
> > > > > > > > > > > > > > > > > > > to a challenge: a producer requires
> storage
> > > > > for two
> > > > > > > > > Long
> > > > > > > > > > > values
> > > > > > > > > > > > > > > > > > > simultaneously (sequence ID and chunk
> ID).
> > > > > Because
> > > > > > > the
> > > > > > > > > > > snapshot of the
> > > > > > > > > > > > > > > > > > > sequence ID map is stored through the
> > > > > properties
> > > > > > > of the
> > > > > > > > > > > cursor
> > > > > > > > > > > > > > > > > > > (Map<String, Long>), so in order to
> > > satisfy the
> > > > > > > > > storage of
> > > > > > > > > > > two Longs
> > > > > > > > > > > > > > > > > > > (sequence ID, chunk ID) corresponding
> to
> > > one
> > > > > > > producer,
> > > > > > > > > we
> > > > > > > > > > > hope to add
> > > > > > > > > > > > > > > > > > > a mark DeleteProperties (Map<String,
> Long>)
> > > > > String,
> > > > > > > > > > > String>) to
> > > > > > > > > > > > > > > > > > > replace the properties (Map<String,
> Long>)
> > > > > field.
> > > > > > > To
> > > > > > > > > > > resolve this,
> > > > > > > > > > > > > > > > > > > I've proposed an alternative proposal
> [2]
> > > > > > > involving the
> > > > > > > > > > > introduction
> > > > > > > > > > > > > > > > > > > of a "mark DeleteProperties"
> (Map<String,
> > > > > String>)
> > > > > > > to
> > > > > > > > > > > replace the
> > > > > > > > > > > > > > > > > > > current properties (Map<String, Long>)
> > > field.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > I'd appreciate it if you carefully
> review
> > > both
> > > > > PRs
> > > > > > > and
> > > > > > > > > > > share your
> > > > > > > > > > > > > > > > > > > valuable feedback and insights. Thank
> you
> > > > > > > immensely for
> > > > > > > > > > > your time and
> > > > > > > > > > > > > > > > > > > attention. I eagerly anticipate your
> > > valuable
> > > > > > > opinions
> > > > > > > > > and
> > > > > > > > > > > > > > > > > > > recommendations.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Warm regards,
> > > > > > > > > > > > > > > > > > > Xiangying
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > [1]
> > > > > https://github.com/apache/pulsar/pull/20948
> > > > > > > > > > > > > > > > > > > [2]
> > > > > https://github.com/apache/pulsar/pull/21027
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > >
> > >
>

Reply via email to