Share more information: Even for versions before 3.0.0, the approach doesn't assemble chunks 3, 4, and 5 together.
Please note this line of code: ```java chunkedMsgCtx = chunkedMessagesMap.computeIfAbsent(msgMetadata.getUuid(), (key) -> ChunkedMessageCtx.get(totalChunks, chunkedMsgBuffer)); ``` And the new solution we adopted in the PR [0] is to add a timestamp in the uuid. Thank Heesung for providing this idea again. [0] https://github.com/apache/pulsar/pull/20948 On Sat, Aug 26, 2023 at 5:20 PM Xiangying Meng <xiangy...@apache.org> wrote: > > Hi Zike, > > PR [0] has already fixed this bug and won't introduce compatibility issues. > PR [1] is unnecessary and can be closed. However, I still greatly > appreciate the information you provided. > > [0] https://github.com/apache/pulsar/pull/20948 > [1] https://github.com/apache/pulsar/pull/21070 > > On Sat, Aug 26, 2023 at 4:49 PM Zike Yang <z...@apache.org> wrote: > > > > > Hi Zike > > You can see the processMessageChunk method of the ConsumerImpl. > > > > Ah. That seems like a regression bug introduced by > > https://github.com/apache/pulsar/pull/18511. I have pushed a PR to fix > > it: https://github.com/apache/pulsar/pull/21070 > > > > For the behavior before Pulsar 3.0.0. The consumer should assemble the > > message using 3,4,5. > > > > Thanks for pointing this out. > > > > BR, > > Zike Yang > > > > On Sat, Aug 26, 2023 at 3:58 PM Xiangying Meng <xiangy...@apache.org> wrote: > > > > > > >> Consumer receive: > > > >1. SequenceID: 0, ChunkID: 0 > > > >2. SequenceID: 0, ChunkID: 1 > > > >3. SequenceID: 0, ChunkID: 0 // chunk ID out of order. Release this > > > >chunk and recycle its `chunkedMsgCtx`. > > > >4. SequenceID: 0, ChunkID: 1 // chunkedMsgCtx == null Release it. > > > >5. SequenceID: 0, ChunkID: 2 // chunkedMsgCtx == null Release it. > > > > > > > >I think this case is wrong. For the current implementation, the > > > >message 3,4,5 will be assembled as a original large message. > > > > > > Hi Zike > > > You can see the processMessageChunk method of the ConsumerImpl. > > > > > > ``` > > > > > > ChunkedMessageCtx chunkedMsgCtx = > > > chunkedMessagesMap.get(msgMetadata.getUuid()); > > > > > > if (msgMetadata.getChunkId() == 0 && chunkedMsgCtx == null) { > > > //assemble a chunkedMsgCtx and put into > > > pendingChunkedMessageUuidQueue and chunkedMessagesMap. > > > } > > > > > > if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null > > > || msgMetadata.getChunkId() != > > > (chunkedMsgCtx.lastChunkedMessageId + 1)) { > > > if (chunkedMsgCtx != null) { > > > if (chunkedMsgCtx.chunkedMsgBuffer != null) { > > > > > > ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer); > > > } > > > chunkedMsgCtx.recycle(); > > > } > > > chunkedMessagesMap.remove(msgMetadata.getUuid()); > > > compressedPayload.release(); > > > increaseAvailablePermits(cnx); > > > } > > > > > > ``` > > > > > > On Sat, Aug 26, 2023 at 3:48 PM Zike Yang <z...@apache.org> wrote: > > > > > > > > > Consumer receive: > > > > 1. SequenceID: 0, ChunkID: 0 > > > > 2. SequenceID: 0, ChunkID: 1 > > > > 3. SequenceID: 0, ChunkID: 0 // chunk ID out of order. Release this > > > > chunk and recycle its `chunkedMsgCtx`. > > > > 4. SequenceID: 0, ChunkID: 1 // chunkedMsgCtx == null Release it. > > > > 5. SequenceID: 0, ChunkID: 2 // chunkedMsgCtx == null Release it. > > > > > > > > I think this case is wrong. For the current implementation, the > > > > message 3,4,5 will be assembled as a original large message. > > > > > > > > HI, Heesung > > > > > > > > > > > > > I think brokers probably need to track map<uuid, last_chunk_id> to > > > > > dedup > > > > > > > > I propose a simpler solution in this mail thread earlier, which > > > > doesn't need to introduce map<uuid, last_chunk_id> : > > > > > > > > > I think a simple better approach is to only check the deduplication > > > > for the last chunk of the large message. The consumer only gets the > > > > whole message after receiving the last chunk. We don't need to check > > > > the deduplication for all previous chunks. Also by doing this we only > > > > need bug fixes, we don't need to introduce a new PIP. > > > > > > > > Could you explain or show a case in what cases would lead to this > > > > simpler solution not working? > > > > > > > > Thanks, > > > > Zike Yang > > > > > > > > On Sat, Aug 26, 2023 at 1:38 PM Heesung Sohn > > > > <heesung.s...@streamnative.io.invalid> wrote: > > > > > > > > > > > In this case, the consumer only can receive m1. > > > > > > > > > > Regarding this comment, can you explain how the consumer only > > > > > receives m1? > > > > > Here, m1's and m2's uuid and msgId will be different(if we suffix > > > > > with a > > > > > chunkingSessionId), although the sequence id is the same. > > > > > > > > > > > If we throw an exception when users use the same sequence to send > > > > > > the > > > > > message. > > > > > Do You mean `If "producers" throw an exception when users use the same > > > > > sequence to send the message.`. > > > > > Again, when the producers restart, they lose the last sequence id > > > > > sent. > > > > > > > > > > > > > > > > If we do not throw an exception when users use the same sequence to > > > > > send the message. > > > > > > > > > > For this logic, how do we handle if the producer suddenly resends the > > > > > chunked message with a different chunking scheme(e.g. maxMessageSize) > > > > > ? > > > > > uuid=1, sid=0, cid=0 > > > > > uuid=1, sid=0, cid=1 > > > > > uuid=2, sid=0, cid=0 > > > > > uuid=2, sid=0, cid=1 > > > > > > > > > > We could refine what to track and algo logic on the broker side more, > > > > > but > > > > > do we agree that the broker chunk dedup logic is needed? > > > > > > > > > > I will continue to think more next week. Have a nice weekend. > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Aug 25, 2023 at 9:14 PM Xiangying Meng <xiangy...@apache.org> > > > > > wrote: > > > > > > > > > > > Hi Heesung, > > > > > > > > > > > > Maybe we only need to maintain the last chunk ID in a map. > > > > > > Map<producername, chunkID> map1. > > > > > > And we already have a map maintaining the last sequence ID. > > > > > > Map<producername, sequence ID> map2. > > > > > > > > > > > > If we do not throw an exception when users use the same sequence to > > > > > > send the message. > > > > > > > > > > > > For any incoming msg, m : > > > > > > chunk ID = -1; > > > > > > If m is a chunk message: > > > > > > chunk ID = m.chunkID. > > > > > > If m.currentSeqid < LastSeqId, dedup. > > > > > > If m.currentSeqid > LastSeqId && m.chunk ID = 0, nodedup > > > > > > if chunk ID exists in the map. > > > > > > Update it and log an error. This means there is > > > > > > an > > > > > > incomplete chunk message. > > > > > > If chunk ID does not exist in the map. > > > > > > Put it on the map. > > > > > > If m.currentSeqid == LastSeqId, > > > > > > 1. if m.chunk ID == -1 || m.chunk ID == 0. dedup. > > > > > > 2. If 1 <= m.chunkID <= total chunk. > > > > > > 1. If chunk ID does not exist in the map. dedup. > > > > > > 2. If chunk ID exists in the map. dedup. Check the > > > > > > order > > > > > > of the chunkID to determine whether dedup; > > > > > > 3. If m.chunkID == total chunk, persistent the chunk and > > > > > > remove the chunkID in the map. > > > > > > > > > > > > > > > > > > If we throw an exception when users use the same sequence to send > > > > > > the > > > > > > message. > > > > > > > > > > > > For any incoming msg, m : > > > > > > chunk ID = 0; > > > > > > If m is a chunk message: > > > > > > chunk ID = m.chunkID. > > > > > > If m.currentSeqid < LastSeqId, dedup. > > > > > > If m.currentSeqid == LastSeqId. > > > > > > If chunkID > 0, Check the last chunkID to determine whether > > > > > > to > > > > > > dedup. > > > > > > If chunkID == 1, put chunkID into the map if absent. > > > > > > IF chunkID = 0, dedup. > > > > > > > > > > > > BR, > > > > > > xiangying > > > > > > > > > > > > On Sat, Aug 26, 2023 at 11:53 AM Heesung Sohn > > > > > > <heesung.s...@streamnative.io.invalid> wrote: > > > > > > > > > > > > > > However, what If the producer jvm gets restarted after the broker > > > > > > persists > > > > > > > the m1 (but before updating their sequence id in their persistence > > > > > > > storage), and the producer is trying to resend the same msg(so > > > > > > > m2) with > > > > > > the > > > > > > > same sequence id after restarting? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Aug 25, 2023 at 8:22 PM Xiangying Meng > > > > > > > <xiangy...@apache.org> > > > > > > wrote: > > > > > > > > > > > > > > > Hi Heesung, > > > > > > > > > > > > > > > > In this case, the consumer only can receive m1. > > > > > > > > > > > > > > > > But it has the same content as the previous case: What should > > > > > > > > we do if > > > > > > > > the user sends messages with the sequence ID that was used > > > > > > > > previously? > > > > > > > > > > > > > > > > I am afraid to introduce the incompatibility in this case, so I > > > > > > > > only > > > > > > > > added a warning log in the PR[0] instead of throwing an > > > > > > > > exception. > > > > > > > > Regarding this matter, what do you think? Should we throw an > > > > > > > > exception > > > > > > > > or add error logs? > > > > > > > > > > > > > > > > I'm looking forward to hearing your viewpoint. > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Xiangying > > > > > > > > > > > > > > > > [0] https://github.com/apache/pulsar/pull/21047 > > > > > > > > > > > > > > > > On Sat, Aug 26, 2023 at 10:58 AM Heesung Sohn > > > > > > > > <heesung.s...@streamnative.io.invalid> wrote: > > > > > > > > > > > > > > > > > > Actually, can we think about this case too? > > > > > > > > > > > > > > > > > > What happens if the cx sends the same chunked msg with the > > > > > > > > > same seq > > > > > > id > > > > > > > > when > > > > > > > > > dedup is enabled? > > > > > > > > > > > > > > > > > > // user send a chunked msg, m1 > > > > > > > > > s1, c0 > > > > > > > > > s1, c1 > > > > > > > > > s1, c2 // complete > > > > > > > > > > > > > > > > > > // user resend the duplicate msg, m2 > > > > > > > > > s1, c0 > > > > > > > > > s1, c1 > > > > > > > > > s1, c2 //complete > > > > > > > > > > > > > > > > > > Do consumers receive m1 and m2(no dedup)? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Aug 25, 2023 at 6:55 PM Xiangying Meng > > > > > > > > > <xiangy...@apache.org > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Heesung, > > > > > > > > > > > > > > > > > > > > >I think this means, for the PIP, the broker side's chunk > > > > > > > > deduplication. > > > > > > > > > > >I think brokers probably need to track map<uuid, > > > > > > > > > > >last_chunk_id> to > > > > > > > > dedup > > > > > > > > > > > > > > > > > > > > What is the significance of doing this? > > > > > > > > > > My understanding is that if the client crashes and restarts > > > > > > > > > > after > > > > > > > > > > sending half of a chunk message and then it resends the > > > > > > > > > > previous > > > > > > chunk > > > > > > > > > > message, the resent chunk message should be treated as a new > > > > > > message > > > > > > > > > > since it calls the producer's API again. > > > > > > > > > > If deduplication is enabled, users should ensure that their > > > > > > customized > > > > > > > > > > sequence ID is not lower than the previous sequence ID. > > > > > > > > > > I have considered this scenario and added a warning log in > > > > > > > > > > PR[0]. > > > > > > (I'm > > > > > > > > > > not sure whether an error log should be added or an > > > > > > > > > > exception > > > > > > thrown.) > > > > > > > > > > If deduplication is not enabled, on the consumer side, there > > > > > > should be > > > > > > > > > > an incomplete chunk message received alongside another > > > > > > > > > > complete > > > > > > chunk > > > > > > > > > > message, each with a different UUID, and they will not > > > > > > > > > > interfere > > > > > > with > > > > > > > > > > each other. > > > > > > > > > > > > > > > > > > > > My main point is that every message sent using > > > > > > > > > > `producer.newMessage().send()` should be treated as a new > > > > > > > > > > message. > > > > > > > > > > UUID is solely used for the consumer side to identify > > > > > > > > > > different > > > > > > chunk > > > > > > > > > > messages. > > > > > > > > > > > > > > > > > > > > BR > > > > > > > > > > Xiangying > > > > > > > > > > > > > > > > > > > > [0] https://github.com/apache/pulsar/pull/21047 > > > > > > > > > > > > > > > > > > > > On Sat, Aug 26, 2023 at 9:34 AM Heesung Sohn > > > > > > > > > > <heesung.s...@streamnative.io.invalid> wrote: > > > > > > > > > > > > > > > > > > > > > > I think this means, for the PIP, the broker side's chunk > > > > > > > > deduplication. > > > > > > > > > > > I think brokers probably need to track map<uuid, > > > > > > > > > > > last_chunk_id> > > > > > > to > > > > > > > > dedup > > > > > > > > > > > chunks on the broker side. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Aug 25, 2023 at 6:16 PM Xiangying Meng < > > > > > > xiangy...@apache.org > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hi Heesung > > > > > > > > > > > > > > > > > > > > > > > > It is a good point. > > > > > > > > > > > > Assume the producer application jvm restarts in the > > > > > > > > > > > > middle of > > > > > > > > chunking > > > > > > > > > > and > > > > > > > > > > > > resends the message chunks from the beginning with the > > > > > > > > > > > > previous > > > > > > > > > > sequence > > > > > > > > > > > > id. > > > > > > > > > > > > > > > > > > > > > > > > For the previous version, it should be: > > > > > > > > > > > > > > > > > > > > > > > > Producer send: > > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0 > > > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1 > > > > > > > > > > > > 3. SequenceID: 0, ChunkID: 0 > > > > > > > > > > > > 4. SequenceID: 0, ChunkID: 1 > > > > > > > > > > > > 5. SequenceID: 0, ChunkID: 2 > > > > > > > > > > > > > > > > > > > > > > > > Consumer receive: > > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0 > > > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1 > > > > > > > > > > > > 3. SequenceID: 0, ChunkID: 0 // chunk ID out of order. > > > > > > > > > > > > Release > > > > > > this > > > > > > > > > > > > chunk and recycle its `chunkedMsgCtx`. > > > > > > > > > > > > 4. SequenceID: 0, ChunkID: 1 // chunkedMsgCtx == null > > > > > > > > > > > > Release > > > > > > it. > > > > > > > > > > > > 5. SequenceID: 0, ChunkID: 2 // chunkedMsgCtx == null > > > > > > > > > > > > Release > > > > > > it. > > > > > > > > > > > > > > > > > > > > > > > > Therefore, for the previous version, this chunk message > > > > > > > > > > > > can > > > > > > not be > > > > > > > > > > > > received by the consumer. It is not an incompatibility > > > > > > > > > > > > issue. > > > > > > > > > > > > > > > > > > > > > > > > However, the solution of optimizing the `uuid` is > > > > > > > > > > > > valuable to > > > > > > the > > > > > > > > new > > > > > > > > > > > > implementation. > > > > > > > > > > > > I will modify this in the PR[0]. Thank you very much > > > > > > > > > > > > for your > > > > > > > > reminder > > > > > > > > > > > > and the provided UUID optimization solution. > > > > > > > > > > > > > > > > > > > > > > > > BR, > > > > > > > > > > > > Xiangying > > > > > > > > > > > > > > > > > > > > > > > > [0] https://github.com/apache/pulsar/pull/20948 > > > > > > > > > > > > > > > > > > > > > > > > On Sat, Aug 26, 2023 at 8:48 AM Heesung Sohn > > > > > > > > > > > > <heesung.s...@streamnative.io.invalid> wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, I meant > > > > > > > > > > > > > > > > > > > > > > > > > > What if the producer application jvm restarts in the > > > > > > > > > > > > > middle > > > > > > of > > > > > > > > > > chunking > > > > > > > > > > > > and > > > > > > > > > > > > > resends the message chunks from the beginning with the > > > > > > previous > > > > > > > > > > sequence > > > > > > > > > > > > id? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Aug 25, 2023 at 5:15 PM Xiangying Meng < > > > > > > > > xiangy...@apache.org > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Heesung > > > > > > > > > > > > > > > > > > > > > > > > > > > > It is a good idea to cover this incompatibility > > > > > > > > > > > > > > case if the > > > > > > > > > > producer > > > > > > > > > > > > > > splits the chunk message again when retrying. > > > > > > > > > > > > > > > > > > > > > > > > > > > > But in fact, the producer only resents the chunks > > > > > > > > > > > > > > that are > > > > > > > > > > assembled > > > > > > > > > > > > > > to `OpSendMsg` instead of splitting the chunk > > > > > > > > > > > > > > message > > > > > > again. > > > > > > > > > > > > > > So, there is no incompatibility issue of resenting > > > > > > > > > > > > > > the > > > > > > chunk > > > > > > > > > > message > > > > > > > > > > > > > > by splitting the chunk message again. > > > > > > > > > > > > > > > > > > > > > > > > > > > > The logic of sending chunk messages can be found > > > > > > > > > > > > > > here: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/pulsar/blob/e0c481e5f8d7fa5534d3327785928a234376789e/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L533 > > > > > > > > > > > > > > > > > > > > > > > > > > > > The logic of resending the message can be found > > > > > > > > > > > > > > here: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/pulsar/blob/e0c481e5f8d7fa5534d3327785928a234376789e/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1892 > > > > > > > > > > > > > > > > > > > > > > > > > > > > BR, > > > > > > > > > > > > > > Xiangying > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Sat, Aug 26, 2023 at 2:24 AM Heesung Sohn > > > > > > > > > > > > > > <heesung.s...@streamnative.io.invalid> wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> I think brokers can track the last > > > > > > chunkMaxMessageSize for > > > > > > > > > > each > > > > > > > > > > > > > > producer. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Using different chunkMaxMessageSize is just one > > > > > > > > > > > > > > > > of the > > > > > > > > > > aspects. In > > > > > > > > > > > > > > > PIP-132 [0], we have included the message > > > > > > > > > > > > > > > metadata size > > > > > > when > > > > > > > > > > checking > > > > > > > > > > > > > > > maxMessageSize.The message metadata can be > > > > > > > > > > > > > > > changed after > > > > > > > > > > splitting > > > > > > > > > > > > the > > > > > > > > > > > > > > > chunks. We are still uncertain about the way the > > > > > > > > > > > > > > > chunked > > > > > > > > message > > > > > > > > > > is > > > > > > > > > > > > > > split, > > > > > > > > > > > > > > > even using the same ss chunkMaxMessageSize. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This sounds like we need to revisit chunking uuid > > > > > > > > > > > > > > > logic. > > > > > > > > > > > > > > > Like I commented here, > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/pulsar/pull/20948/files#r1305997883 > > > > > > > > > > > > > > > Why don't we add a chunk session id suffix to > > > > > > > > > > > > > > > identify > > > > > > the > > > > > > > > > > ongoing > > > > > > > > > > > > > > chunking > > > > > > > > > > > > > > > uniquely? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Currently, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > chunking uuid = producer + sequence_id > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Proposal > > > > > > > > > > > > > > > chunking uuid = producer + sequence_id + > > > > > > chunkingSessionId > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > * chunkingSessionId could be a timestamp when the > > > > > > chunking > > > > > > > > > > started. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Aug 25, 2023 at 6:02 AM Xiangying Meng < > > > > > > > > > > xiangy...@apache.org > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Zike, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >How would this happen to get two duplicated and > > > > > > > > consecutive > > > > > > > > > > > > ChunkID-1 > > > > > > > > > > > > > > > > >messages? The producer should guarantee to > > > > > > > > > > > > > > > > >retry the > > > > > > whole > > > > > > > > > > chunked > > > > > > > > > > > > > > > > >messages instead of some parts of the chunks. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > If the producer guarantees to retry the whole > > > > > > > > > > > > > > > > chunked > > > > > > > > messages > > > > > > > > > > > > instead > > > > > > > > > > > > > > > > of some parts of the chunks, > > > > > > > > > > > > > > > > Why doesn't the bug of the producer retry chunk > > > > > > messages in > > > > > > > > > > the PR > > > > > > > > > > > > [0] > > > > > > > > > > > > > > > > appear? > > > > > > > > > > > > > > > > And why do you need to set `chunkId` in > > > > > > `op.rePopulate`? > > > > > > > > > > > > > > > > It will be rested when split chunk messages > > > > > > > > > > > > > > > > again if > > > > > > the > > > > > > > > > > producer > > > > > > > > > > > > > > > > guarantees to retry the whole chunked messages. > > > > > > > > > > > > > > > > ``` > > > > > > > > > > > > > > > > final MessageMetadata finalMsgMetadata = > > > > > > > > > > > > > > > > msgMetadata; > > > > > > > > > > > > > > > > op.rePopulate = () -> { > > > > > > > > > > > > > > > > if (msgMetadata.hasChunkId()) { > > > > > > > > > > > > > > > > // The message metadata is shared between all > > > > > > > > > > > > > > > > chunks > > > > > > in a > > > > > > > > large > > > > > > > > > > > > message > > > > > > > > > > > > > > > > // We need to reset the chunk id for each call > > > > > > > > > > > > > > > > of this > > > > > > > > method > > > > > > > > > > > > > > > > // It's safe to do that because there is only 1 > > > > > > > > > > > > > > > > thread > > > > > > to > > > > > > > > > > > > manipulate > > > > > > > > > > > > > > > > this message metadata > > > > > > > > > > > > > > > > finalMsgMetadata.setChunkId(chunkId); > > > > > > > > > > > > > > > > } > > > > > > > > > > > > > > > > op.cmd = sendMessage(producerId, sequenceId, > > > > > > numMessages, > > > > > > > > > > > > messageId, > > > > > > > > > > > > > > > > finalMsgMetadata, > > > > > > > > > > > > > > > > encryptedPayload); > > > > > > > > > > > > > > > > }; > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > ``` > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> But chunks 1, 2, 3, and 4 are still > > > > > > > > > > > > > > > > >> persisted in the > > > > > > > > topic. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >I think it's OK to persist them all on the > > > > > > > > > > > > > > > > >topic. Is > > > > > > > > there any > > > > > > > > > > > > issue > > > > > > > > > > > > > > > > >with doing that? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This is just one scenario. Whether only check > > > > > > > > > > > > > > > > the > > > > > > sequence > > > > > > > > ID > > > > > > > > > > of > > > > > > > > > > > > the > > > > > > > > > > > > > > > > first chunk (as I used in PR[1]) or check the > > > > > > > > > > > > > > > > sequence > > > > > > ID > > > > > > > > of > > > > > > > > > > the > > > > > > > > > > > > last > > > > > > > > > > > > > > > > chunk (as you suggested), in reality, neither > > > > > > > > > > > > > > > > of these > > > > > > > > methods > > > > > > > > > > can > > > > > > > > > > > > > > > > deduplicate chunks on the broker side because > > > > > > > > > > > > > > > > the > > > > > > broker > > > > > > > > cannot > > > > > > > > > > > > know > > > > > > > > > > > > > > > > the chunk ID of the previous message. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > However, if combined with the optimization of > > > > > > consumer-side > > > > > > > > > > logic, > > > > > > > > > > > > > > > > end-to-end deduplication can be completed. > > > > > > > > > > > > > > > > This is also a less-than-perfect solution I > > > > > > > > > > > > > > > > mentioned > > > > > > in my > > > > > > > > > > first > > > > > > > > > > > > > > > > email and implemented in PR[1]. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The reason I propose this proposal is not to > > > > > > > > > > > > > > > > solve the > > > > > > > > > > end-to-end > > > > > > > > > > > > > > > > deduplication of chunk messages between > > > > > > > > > > > > > > > > producers and > > > > > > > > > > consumers. > > > > > > > > > > > > That > > > > > > > > > > > > > > > > aspect has essentially been addressed in PR[1] > > > > > > > > > > > > > > > > and is > > > > > > still > > > > > > > > > > > > undergoing > > > > > > > > > > > > > > > > review. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This proposal aims to ensure that no corrupt > > > > > > > > > > > > > > > > data > > > > > > exists > > > > > > > > > > within the > > > > > > > > > > > > > > > > topic, as our data might be offloaded and used > > > > > > elsewhere in > > > > > > > > > > > > scenarios > > > > > > > > > > > > > > > > where consumer logic is not optimized. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > BR, > > > > > > > > > > > > > > > > Xiangying > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > [0] https://github.com/apache/pulsar/pull/21048 > > > > > > > > > > > > > > > > [1] https://github.com/apache/pulsar/pull/20948 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Aug 25, 2023 at 5:18 PM Zike Yang < > > > > > > z...@apache.org > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > HI xiangying > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The rewind operation is seen in the test > > > > > > > > > > > > > > > > > > log. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > That seems weird. Not sure if this rewind is > > > > > > > > > > > > > > > > > related > > > > > > to > > > > > > > > the > > > > > > > > > > chunk > > > > > > > > > > > > > > > > consuming. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0 > > > > > > > > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1 > > > > > > > > > > > > > > > > > 3. SequenceID: 0, ChunkID: 1 > > > > > > > > > > > > > > > > > 4. SequenceID: 0, ChunkID: 2 > > > > > > > > > > > > > > > > > Such four chunks cannot be processed > > > > > > > > > > > > > > > > > correctly by the > > > > > > > > > > consumer. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > How would this happen to get two duplicated > > > > > > > > > > > > > > > > > and > > > > > > > > consecutive > > > > > > > > > > > > ChunkID-1 > > > > > > > > > > > > > > > > > messages? The producer should guarantee to > > > > > > > > > > > > > > > > > retry the > > > > > > > > whole > > > > > > > > > > > > chunked > > > > > > > > > > > > > > > > > messages instead of some parts of the chunks. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > But chunks 1, 2, 3, and 4 are still > > > > > > > > > > > > > > > > > > persisted in > > > > > > the > > > > > > > > topic. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I think it's OK to persist them all in the > > > > > > > > > > > > > > > > > topic. Is > > > > > > > > there > > > > > > > > > > any > > > > > > > > > > > > issue > > > > > > > > > > > > > > > > > with doing that? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > There is another point. The resend of the > > > > > > > > > > > > > > > > > > chunk > > > > > > message > > > > > > > > > > has a > > > > > > > > > > > > bug > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > > I shared with you, and you fixed in PR [0]. > > > > > > > > > > > > > > > > > It will > > > > > > make > > > > > > > > this > > > > > > > > > > > > case > > > > > > > > > > > > > > > > > happen in another way. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > If the user sets the sequence ID manually, > > > > > > > > > > > > > > > > > the case > > > > > > > > could be > > > > > > > > > > > > > > reproduced. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > BR, > > > > > > > > > > > > > > > > > Zike Yang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 24, 2023 at 8:48 PM Xiangying > > > > > > > > > > > > > > > > > Meng < > > > > > > > > > > > > xiangy...@apache.org > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >IIUC, this may change the existing > > > > > > > > > > > > > > > > > > >behavior and > > > > > > may > > > > > > > > > > introduce > > > > > > > > > > > > > > > > inconsistencies. > > > > > > > > > > > > > > > > > > >Suppose that we have a large message with 3 > > > > > > chunks. > > > > > > > > But > > > > > > > > > > the > > > > > > > > > > > > > > producer > > > > > > > > > > > > > > > > > > >crashes and resends the message after > > > > > > > > > > > > > > > > > > >sending the > > > > > > > > > > chunk-1. It > > > > > > > > > > > > will > > > > > > > > > > > > > > > > > > >send a total of 5 messages to the Pulsar > > > > > > > > > > > > > > > > > > >topic: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >1. SequenceID: 0, ChunkID: 0 > > > > > > > > > > > > > > > > > > >2. SequenceID: 0, ChunkID: 1 > > > > > > > > > > > > > > > > > > >3. SequenceID: 0, ChunkID: 0 -> This > > > > > > > > > > > > > > > > > > >message > > > > > > will be > > > > > > > > > > dropped > > > > > > > > > > > > > > > > > > >4. SequenceID: 0, ChunkID: 1 -> Will > > > > > > > > > > > > > > > > > > >also be > > > > > > > > dropped > > > > > > > > > > > > > > > > > > >5. SequenceID: 0, ChunkID: 2 -> The > > > > > > > > > > > > > > > > > > >last chunk > > > > > > of > > > > > > > > the > > > > > > > > > > > > message > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Zike > > > > > > > > > > > > > > > > > > There is another point. The resend of the > > > > > > > > > > > > > > > > > > chunk > > > > > > message > > > > > > > > > > has a > > > > > > > > > > > > bug > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > > > I shared with you, and you fixed in PR [0]. > > > > > > > > > > > > > > > > > > It will > > > > > > > > make > > > > > > > > > > this > > > > > > > > > > > > case > > > > > > > > > > > > > > > > > > happen in another way. > > > > > > > > > > > > > > > > > > Sample description for the bug: > > > > > > > > > > > > > > > > > > Because the chunk message uses the same > > > > > > > > > > > > > > > > > > message > > > > > > > > metadata, > > > > > > > > > > if > > > > > > > > > > > > the > > > > > > > > > > > > > > chunk > > > > > > > > > > > > > > > > > > is not sent out immediately. Then, when > > > > > > > > > > > > > > > > > > resending, > > > > > > all > > > > > > > > > > chunks > > > > > > > > > > > > of > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > same chunk message use the chunk ID of the > > > > > > > > > > > > > > > > > > last > > > > > > chunk. > > > > > > > > > > > > > > > > > > In this case, It should happen as: > > > > > > > > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0 (Put op1 into > > > > > > > > > > `pendingMessages` > > > > > > > > > > > > and > > > > > > > > > > > > > > send) > > > > > > > > > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1 (Put op2 into > > > > > > > > > > `pendingMessages` > > > > > > > > > > > > and > > > > > > > > > > > > > > send) > > > > > > > > > > > > > > > > > > 3. SequenceID: 0, ChunkID: 2 -> (Put op3 > > > > > > > > > > > > > > > > > > into > > > > > > > > > > > > `pendingMessages`) > > > > > > > > > > > > > > > > > > 4. SequenceID: 0, ChunkID: 2 -> (Resend > > > > > > > > > > > > > > > > > > op1) > > > > > > > > > > > > > > > > > > 5. SequenceID: 0, ChunkID: 2 -> (Resend > > > > > > > > > > > > > > > > > > op2) > > > > > > > > > > > > > > > > > > 6. SequenceID: 0, ChunkID: 2 -> (Send op3) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > BR, > > > > > > > > > > > > > > > > > > Xiangying > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > [0] - > > > > > > > > > > > > > > > > > > https://github.com/apache/pulsar/pull/21048 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 24, 2023 at 8:09 PM Xiangying > > > > > > > > > > > > > > > > > > Meng < > > > > > > > > > > > > > > xiangy...@apache.org> > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> This solution also cannot solve the > > > > > > out-of-order > > > > > > > > > > messages > > > > > > > > > > > > > > inside > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > >>chunks. For example, the above five > > > > > > > > > > > > > > > > > > > >>messages > > > > > > will > > > > > > > > > > still be > > > > > > > > > > > > > > > > persisted. > > > > > > > > > > > > > > > > > > > >The consumer already handles this case. > > > > > > > > > > > > > > > > > > > >The > > > > > > above 5 > > > > > > > > > > messages > > > > > > > > > > > > > > will > > > > > > > > > > > > > > > > all > > > > > > > > > > > > > > > > > > > >be persisted but the consumer will skip > > > > > > > > > > > > > > > > > > > >message > > > > > > 1 > > > > > > > > and 2. > > > > > > > > > > > > > > > > > > > >For messages 3, 4, and 5. The producer > > > > > > > > > > > > > > > > > > > >can > > > > > > guarantee > > > > > > > > > > these > > > > > > > > > > > > > > chunks > > > > > > > > > > > > > > > > are in order. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The rewind operation is seen in the test > > > > > > > > > > > > > > > > > > > log. > > > > > > Every > > > > > > > > time > > > > > > > > > > an > > > > > > > > > > > > > > incorrect > > > > > > > > > > > > > > > > > > > chunk message is received, it will > > > > > > > > > > > > > > > > > > > rewind, and > > > > > > the > > > > > > > > code > > > > > > > > > > has > > > > > > > > > > > > yet > > > > > > > > > > > > > > to be > > > > > > > > > > > > > > > > > > > studied in depth. > > > > > > > > > > > > > > > > > > > If it does not call rewind, then this > > > > > > > > > > > > > > > > > > > case is > > > > > > > > considered > > > > > > > > > > a > > > > > > > > > > > > > > workable > > > > > > > > > > > > > > > > > > > case. Let's look at another case. > > > > > > > > > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0 > > > > > > > > > > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1 > > > > > > > > > > > > > > > > > > > 3. SequenceID: 0, ChunkID: 1 > > > > > > > > > > > > > > > > > > > 4. SequenceID: 0, ChunkID: 2 > > > > > > > > > > > > > > > > > > > Such four chunks cannot be processed > > > > > > > > > > > > > > > > > > > correctly > > > > > > by the > > > > > > > > > > > > consumer. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > In fact, this solution is my original > > > > > > > > > > > > > > > > > > > idea. The > > > > > > PR I > > > > > > > > > > > > mentioned > > > > > > > > > > > > > > in the > > > > > > > > > > > > > > > > > > > first email above uses a similar solution > > > > > > > > > > > > > > > > > > > and > > > > > > > > modifies > > > > > > > > > > the > > > > > > > > > > > > logic > > > > > > > > > > > > > > on > > > > > > > > > > > > > > > > > > > the consumer side. > > > > > > > > > > > > > > > > > > > Also, as I mentioned in the first email, > > > > > > > > > > > > > > > > > > > this > > > > > > > > solution > > > > > > > > > > can > > > > > > > > > > > > only > > > > > > > > > > > > > > solve > > > > > > > > > > > > > > > > > > > the problem of end-to-end duplication. But > > > > > > chunks 1, > > > > > > > > 2, > > > > > > > > > > 3, > > > > > > > > > > > > and 4 > > > > > > > > > > > > > > are > > > > > > > > > > > > > > > > > > > still persisted in the topic. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 24, 2023 at 3:00 PM Zike Yang > > > > > > > > > > > > > > > > > > > < > > > > > > > > > > z...@apache.org> > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Heesung, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I believe in this PIP "similar to the > > > > > > existing > > > > > > > > > > "sequence > > > > > > > > > > > > ID > > > > > > > > > > > > > > map", > > > > > > > > > > > > > > > > > > > > to facilitate effective filtering" > > > > > > > > > > > > > > > > > > > > actually > > > > > > means > > > > > > > > > > tracking > > > > > > > > > > > > the > > > > > > > > > > > > > > last > > > > > > > > > > > > > > > > > > > > chunkId(not all chunk ids) on the > > > > > > > > > > > > > > > > > > > > broker side. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > With this simple solution, I think we > > > > > > > > > > > > > > > > > > > > don't > > > > > > need to > > > > > > > > > > track > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > (sequenceID, chunkID) on the broker > > > > > > > > > > > > > > > > > > > > side at > > > > > > all. > > > > > > > > The > > > > > > > > > > broker > > > > > > > > > > > > > > just > > > > > > > > > > > > > > > > needs > > > > > > > > > > > > > > > > > > > > to apply the deduplication logic to the > > > > > > > > > > > > > > > > > > > > last > > > > > > chunk > > > > > > > > > > instead > > > > > > > > > > > > of > > > > > > > > > > > > > > all > > > > > > > > > > > > > > > > > > > > previous chunks. This PIP actually > > > > > > > > > > > > > > > > > > > > could do > > > > > > that, > > > > > > > > but > > > > > > > > > > it > > > > > > > > > > > > will > > > > > > > > > > > > > > > > > > > > introduce a new data format and > > > > > > > > > > > > > > > > > > > > compatibility > > > > > > > > issue. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This is still a behavior > > > > > > > > > > > > > > > > > > > > > change(deduping > > > > > > chunk > > > > > > > > > > messages > > > > > > > > > > > > on > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > broker), > > > > > > > > > > > > > > > > > > > > and I believe we need to discuss this > > > > > > > > > > > > > > > > > > > > addition > > > > > > as a > > > > > > > > > > PIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Actually, we didn't specifically state > > > > > > > > > > > > > > > > > > > > the > > > > > > deduping > > > > > > > > > > chunk > > > > > > > > > > > > > > message > > > > > > > > > > > > > > > > > > > > behavior before. The chunked message > > > > > > > > > > > > > > > > > > > > should be > > > > > > > > equally > > > > > > > > > > > > > > applicable > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > > the de-duplication logic as a regular > > > > > > > > > > > > > > > > > > > > message. > > > > > > > > > > Therefore, I > > > > > > > > > > > > > > think > > > > > > > > > > > > > > > > it > > > > > > > > > > > > > > > > > > > > should be considered as a bug fix. But > > > > > > > > > > > > > > > > > > > > if this > > > > > > FIX > > > > > > > > is > > > > > > > > > > worth > > > > > > > > > > > > > > > > discussing > > > > > > > > > > > > > > > > > > > > in depth. I have no objection to it > > > > > > > > > > > > > > > > > > > > being a new > > > > > > > > PIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I think brokers can track the last > > > > > > > > > > chunkMaxMessageSize > > > > > > > > > > > > for > > > > > > > > > > > > > > > > > > > > each producer. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Using different chunkMaxMessageSize is > > > > > > > > > > > > > > > > > > > > just > > > > > > one of > > > > > > > > the > > > > > > > > > > > > > > aspects. In > > > > > > > > > > > > > > > > > > > > PIP-132 [0], we have included the > > > > > > > > > > > > > > > > > > > > message > > > > > > metadata > > > > > > > > size > > > > > > > > > > > > when > > > > > > > > > > > > > > > > checking > > > > > > > > > > > > > > > > > > > > maxMessageSize. > > > > > > > > > > > > > > > > > > > > The message metadata can be changed > > > > > > > > > > > > > > > > > > > > after > > > > > > > > splitting the > > > > > > > > > > > > > > chunks. We > > > > > > > > > > > > > > > > are > > > > > > > > > > > > > > > > > > > > still uncertain about the way the > > > > > > > > > > > > > > > > > > > > chunked > > > > > > message > > > > > > > > is > > > > > > > > > > split, > > > > > > > > > > > > > > even > > > > > > > > > > > > > > > > using > > > > > > > > > > > > > > > > > > > > the same ss chunkMaxMessageSize. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > then the brokers can assume that the > > > > > > producer is > > > > > > > > > > > > resending > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > chunks from > > > > > > > > > > > > > > > > > > > > the beginning with a different > > > > > > > > > > > > > > > > > > > > scheme(restarted > > > > > > > > with a > > > > > > > > > > > > > > different > > > > > > > > > > > > > > > > > > > > chunkMaxMessageSize) and accept those > > > > > > > > > > > > > > > > > > > > new > > > > > > chunks > > > > > > > > from > > > > > > > > > > the > > > > > > > > > > > > > > > > beginning. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regarding this, it seems like we are > > > > > > implementing > > > > > > > > > > dynamic > > > > > > > > > > > > > > > > > > > > configuration for the > > > > > > > > > > > > > > > > > > > > chunkMaxMessageSize. I'm > > > > > > > > afraid > > > > > > > > > > that > > > > > > > > > > > > this > > > > > > > > > > > > > > > > would > > > > > > > > > > > > > > > > > > > > change the expected behavior and > > > > > > > > > > > > > > > > > > > > introduce more > > > > > > > > > > complexity > > > > > > > > > > > > to > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > > > > > configuration. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > [0] > > > > > > https://github.com/apache/pulsar/pull/14007 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > BR, > > > > > > > > > > > > > > > > > > > > Zike Yang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 24, 2023 at 2:21 PM Zike > > > > > > > > > > > > > > > > > > > > Yang < > > > > > > > > > > z...@apache.org > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, xiangying > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > it will find that the message > > > > > > > > > > > > > > > > > > > > > is out of order and rewind the > > > > > > > > > > > > > > > > > > > > > cursor. Loop > > > > > > this > > > > > > > > > > > > operation, > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > > > > > discard this message after it expires > > > > > > instead of > > > > > > > > > > > > assembling > > > > > > > > > > > > > > 3, > > > > > > > > > > > > > > > > 4, 5 > > > > > > > > > > > > > > > > > > > > > into a message. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Could you point out where the > > > > > > > > > > > > > > > > > > > > > implementation > > > > > > for > > > > > > > > > > this? > > > > > > > > > > > > From > > > > > > > > > > > > > > my > > > > > > > > > > > > > > > > > > > > > understanding, there should not be > > > > > > > > > > > > > > > > > > > > > any rewind > > > > > > > > > > operation > > > > > > > > > > > > for > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > chunking feature. You can check more > > > > > > > > > > > > > > > > > > > > > detail > > > > > > here: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://streamnative.io/blog/deep-dive-into-message-chunking-in-pulsar#how-message-chunking-is-implemented > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This solution also cannot solve the > > > > > > > > out-of-order > > > > > > > > > > > > messages > > > > > > > > > > > > > > > > inside the > > > > > > > > > > > > > > > > > > > > > chunks. For example, the above five > > > > > > > > > > > > > > > > > > > > > messages > > > > > > will > > > > > > > > > > still > > > > > > > > > > > > be > > > > > > > > > > > > > > > > persisted. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The consumer already handles this > > > > > > > > > > > > > > > > > > > > > case. The > > > > > > > > above 5 > > > > > > > > > > > > messages > > > > > > > > > > > > > > > > will all > > > > > > > > > > > > > > > > > > > > > be persisted but the consumer will > > > > > > > > > > > > > > > > > > > > > skip > > > > > > message 1 > > > > > > > > > > and 2. > > > > > > > > > > > > > > > > > > > > > For messages 3, 4, and 5. The > > > > > > > > > > > > > > > > > > > > > producer can > > > > > > > > guarantee > > > > > > > > > > > > these > > > > > > > > > > > > > > > > chunks are in order. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > BR, > > > > > > > > > > > > > > > > > > > > > Zike Yang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 24, 2023 at 11:48 AM > > > > > > > > > > > > > > > > > > > > > Yubiao Feng > > > > > > > > > > > > > > > > > > > > > <yubiao.f...@streamnative.io.invalid> > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0 > > > > > > > > > > > > > > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1 > > > > > > > > > > > > > > > > > > > > > > > 3. SequenceID: 0, ChunkID: 0 > > > > > > > > > > > > > > > > > > > > > > > 4. SequenceID: 0, ChunkID: 1 > > > > > > > > > > > > > > > > > > > > > > > 5. SequenceID: 0, ChunkID: 2 > > > > > > > > > > > > > > > > > > > > > > > For the existing behavior, the > > > > > > > > > > > > > > > > > > > > > > > consumer > > > > > > > > assembles > > > > > > > > > > > > > > > > > > > > > > > messages 3,4,5 into > > > > > > > > > > > > > > > > > > > > > > > the original large message. But > > > > > > > > > > > > > > > > > > > > > > > the > > > > > > changes > > > > > > > > > > brought > > > > > > > > > > > > > > > > > > > > > > > about by this PIP > > > > > > > > > > > > > > > > > > > > > > > will cause the consumer to use > > > > > > > > > > > > > > > > > > > > > > > messages > > > > > > > > 1,2,5 for > > > > > > > > > > > > > > > > > > > > > > > assembly. There is > > > > > > > > > > > > > > > > > > > > > > > no guarantee that the producer > > > > > > > > > > > > > > > > > > > > > > > will > > > > > > split the > > > > > > > > > > message > > > > > > > > > > > > > > > > > > > > > > > in the same way > > > > > > > > > > > > > > > > > > > > > > > twice before and after. For > > > > > > > > > > > > > > > > > > > > > > > example, the > > > > > > > > > > producer's > > > > > > > > > > > > > > > > > > > > > > > maxMessageSize may > > > > > > > > > > > > > > > > > > > > > > > be different. This may cause the > > > > > > consumer to > > > > > > > > > > > > > > > > > > > > > > > receive a corrupt > > > > > > > > > > > > > > > > > > > > > > > message. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Good point. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks > > > > > > > > > > > > > > > > > > > > > > Yubiao Feng > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Aug 23, 2023 at 12:34 PM > > > > > > > > > > > > > > > > > > > > > > Zike Yang > > > > > > < > > > > > > > > > > > > > > z...@apache.org> > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, xiangying, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for your PIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > IIUC, this may change the existing > > > > > > behavior > > > > > > > > and > > > > > > > > > > may > > > > > > > > > > > > > > introduce > > > > > > > > > > > > > > > > > > > > > > > inconsistencies. > > > > > > > > > > > > > > > > > > > > > > > Suppose that we have a large > > > > > > > > > > > > > > > > > > > > > > > message > > > > > > with 3 > > > > > > > > > > chunks. > > > > > > > > > > > > But > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > producer > > > > > > > > > > > > > > > > > > > > > > > crashes and resends the message > > > > > > > > > > > > > > > > > > > > > > > after > > > > > > > > sending the > > > > > > > > > > > > > > chunk-1. > > > > > > > > > > > > > > > > It will > > > > > > > > > > > > > > > > > > > > > > > send a total of 5 messages to the > > > > > > > > > > > > > > > > > > > > > > > Pulsar > > > > > > > > topic: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. SequenceID: 0, ChunkID: 0 > > > > > > > > > > > > > > > > > > > > > > > 2. SequenceID: 0, ChunkID: 1 > > > > > > > > > > > > > > > > > > > > > > > 3. SequenceID: 0, ChunkID: 0 -> > > > > > > > > > > > > > > > > > > > > > > > This > > > > > > > > message > > > > > > > > > > will > > > > > > > > > > > > be > > > > > > > > > > > > > > > > dropped > > > > > > > > > > > > > > > > > > > > > > > 4. SequenceID: 0, ChunkID: 1 > > > > > > > > > > > > > > > > > > > > > > > -> Will > > > > > > also > > > > > > > > be > > > > > > > > > > > > dropped > > > > > > > > > > > > > > > > > > > > > > > 5. SequenceID: 0, ChunkID: 2 > > > > > > > > > > > > > > > > > > > > > > > -> The > > > > > > last > > > > > > > > > > chunk of > > > > > > > > > > > > the > > > > > > > > > > > > > > > > message > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For the existing behavior, the > > > > > > > > > > > > > > > > > > > > > > > consumer > > > > > > > > assembles > > > > > > > > > > > > > > messages > > > > > > > > > > > > > > > > 3,4,5 into > > > > > > > > > > > > > > > > > > > > > > > the original large message. But > > > > > > > > > > > > > > > > > > > > > > > the > > > > > > changes > > > > > > > > > > brought > > > > > > > > > > > > > > about by > > > > > > > > > > > > > > > > this PIP > > > > > > > > > > > > > > > > > > > > > > > will cause the consumer to use > > > > > > > > > > > > > > > > > > > > > > > messages > > > > > > > > 1,2,5 for > > > > > > > > > > > > > > assembly. > > > > > > > > > > > > > > > > There is > > > > > > > > > > > > > > > > > > > > > > > no guarantee that the producer > > > > > > > > > > > > > > > > > > > > > > > will > > > > > > split the > > > > > > > > > > > > message in > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > same way > > > > > > > > > > > > > > > > > > > > > > > twice before and after. For > > > > > > > > > > > > > > > > > > > > > > > example, the > > > > > > > > > > producer's > > > > > > > > > > > > > > > > maxMessageSize may > > > > > > > > > > > > > > > > > > > > > > > be different. This may cause the > > > > > > consumer to > > > > > > > > > > receive > > > > > > > > > > > > a > > > > > > > > > > > > > > > > corrupt > > > > > > > > > > > > > > > > > > > > > > > message. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Also, this PIP increases the > > > > > > > > > > > > > > > > > > > > > > > complexity > > > > > > of > > > > > > > > > > handling > > > > > > > > > > > > > > chunks > > > > > > > > > > > > > > > > on the > > > > > > > > > > > > > > > > > > > > > > > broker side. Brokers should, in > > > > > > > > > > > > > > > > > > > > > > > general, > > > > > > > > treat > > > > > > > > > > the > > > > > > > > > > > > chunk > > > > > > > > > > > > > > as > > > > > > > > > > > > > > > > a normal > > > > > > > > > > > > > > > > > > > > > > > message. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I think a simple better approach > > > > > > > > > > > > > > > > > > > > > > > is to > > > > > > only > > > > > > > > > > check the > > > > > > > > > > > > > > > > deduplication > > > > > > > > > > > > > > > > > > > > > > > for the last chunk of the large > > > > > > > > > > > > > > > > > > > > > > > message. > > > > > > The > > > > > > > > > > consumer > > > > > > > > > > > > > > only > > > > > > > > > > > > > > > > gets the > > > > > > > > > > > > > > > > > > > > > > > whole message after receiving the > > > > > > > > > > > > > > > > > > > > > > > last > > > > > > > > chunk. We > > > > > > > > > > > > don't > > > > > > > > > > > > > > need > > > > > > > > > > > > > > > > to check > > > > > > > > > > > > > > > > > > > > > > > the deduplication for all previous > > > > > > chunks. > > > > > > > > Also > > > > > > > > > > by > > > > > > > > > > > > doing > > > > > > > > > > > > > > > > this we only > > > > > > > > > > > > > > > > > > > > > > > need bug fixes, we don't need to > > > > > > introduce a > > > > > > > > new > > > > > > > > > > PIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > BR, > > > > > > > > > > > > > > > > > > > > > > > Zike Yang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Aug 18, 2023 at 7:54 PM > > > > > > > > > > > > > > > > > > > > > > > Xiangying > > > > > > > > Meng < > > > > > > > > > > > > > > > > xiangy...@apache.org> > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Dear Community, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I hope this email finds you > > > > > > > > > > > > > > > > > > > > > > > > well. I'd > > > > > > like > > > > > > > > to > > > > > > > > > > > > address > > > > > > > > > > > > > > an > > > > > > > > > > > > > > > > important > > > > > > > > > > > > > > > > > > > > > > > > issue related to Apache Pulsar > > > > > > > > > > > > > > > > > > > > > > > > and > > > > > > discuss > > > > > > > > a > > > > > > > > > > > > solution > > > > > > > > > > > > > > I've > > > > > > > > > > > > > > > > proposed on > > > > > > > > > > > > > > > > > > > > > > > > GitHub. The problem pertains to > > > > > > > > > > > > > > > > > > > > > > > > the > > > > > > > > handling of > > > > > > > > > > > > Chunk > > > > > > > > > > > > > > > > Messages after > > > > > > > > > > > > > > > > > > > > > > > > enabling deduplication. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > In the current version of Apache > > > > > > Pulsar, > > > > > > > > all > > > > > > > > > > > > chunks of > > > > > > > > > > > > > > a > > > > > > > > > > > > > > > > Chunk Message > > > > > > > > > > > > > > > > > > > > > > > > share the same sequence ID. > > > > > > > > > > > > > > > > > > > > > > > > However, > > > > > > > > enabling > > > > > > > > > > the > > > > > > > > > > > > > > > > depublication > > > > > > > > > > > > > > > > > > > > > > > > feature results in an inability > > > > > > > > > > > > > > > > > > > > > > > > to send > > > > > > > > Chunk > > > > > > > > > > > > > > Messages. To > > > > > > > > > > > > > > > > tackle this > > > > > > > > > > > > > > > > > > > > > > > > problem, I've proposed a > > > > > > > > > > > > > > > > > > > > > > > > solution [1] > > > > > > that > > > > > > > > > > ensures > > > > > > > > > > > > > > > > messages are not > > > > > > > > > > > > > > > > > > > > > > > > duplicated throughout end-to-end > > > > > > delivery. > > > > > > > > > > While > > > > > > > > > > > > this > > > > > > > > > > > > > > fix > > > > > > > > > > > > > > > > addresses > > > > > > > > > > > > > > > > > > > > > > > > the duplication issue for > > > > > > > > > > > > > > > > > > > > > > > > end-to-end > > > > > > > > messages, > > > > > > > > > > > > there > > > > > > > > > > > > > > > > remains a > > > > > > > > > > > > > > > > > > > > > > > > possibility of duplicate chunks > > > > > > > > > > > > > > > > > > > > > > > > within > > > > > > > > topics. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > To address this concern, I > > > > > > > > > > > > > > > > > > > > > > > > believe we > > > > > > > > should > > > > > > > > > > > > introduce > > > > > > > > > > > > > > a > > > > > > > > > > > > > > > > "Chunk ID > > > > > > > > > > > > > > > > > > > > > > > > map" at the Broker level, > > > > > > > > > > > > > > > > > > > > > > > > similar to > > > > > > the > > > > > > > > > > existing > > > > > > > > > > > > > > > > "sequence ID map", > > > > > > > > > > > > > > > > > > > > > > > > to facilitate effective > > > > > > > > > > > > > > > > > > > > > > > > filtering. > > > > > > However, > > > > > > > > > > > > > > implementing > > > > > > > > > > > > > > > > this has led > > > > > > > > > > > > > > > > > > > > > > > > to a challenge: a producer > > > > > > > > > > > > > > > > > > > > > > > > requires > > > > > > storage > > > > > > > > > > for two > > > > > > > > > > > > > > Long > > > > > > > > > > > > > > > > values > > > > > > > > > > > > > > > > > > > > > > > > simultaneously (sequence ID and > > > > > > > > > > > > > > > > > > > > > > > > chunk > > > > > > ID). > > > > > > > > > > Because > > > > > > > > > > > > the > > > > > > > > > > > > > > > > snapshot of the > > > > > > > > > > > > > > > > > > > > > > > > sequence ID map is stored > > > > > > > > > > > > > > > > > > > > > > > > through the > > > > > > > > > > properties > > > > > > > > > > > > of the > > > > > > > > > > > > > > > > cursor > > > > > > > > > > > > > > > > > > > > > > > > (Map<String, Long>), so in > > > > > > > > > > > > > > > > > > > > > > > > order to > > > > > > > > satisfy the > > > > > > > > > > > > > > storage of > > > > > > > > > > > > > > > > two Longs > > > > > > > > > > > > > > > > > > > > > > > > (sequence ID, chunk ID) > > > > > > > > > > > > > > > > > > > > > > > > corresponding > > > > > > to > > > > > > > > one > > > > > > > > > > > > producer, > > > > > > > > > > > > > > we > > > > > > > > > > > > > > > > hope to add > > > > > > > > > > > > > > > > > > > > > > > > a mark DeleteProperties > > > > > > > > > > > > > > > > > > > > > > > > (Map<String, > > > > > > Long>) > > > > > > > > > > String, > > > > > > > > > > > > > > > > String>) to > > > > > > > > > > > > > > > > > > > > > > > > replace the properties > > > > > > > > > > > > > > > > > > > > > > > > (Map<String, > > > > > > Long>) > > > > > > > > > > field. > > > > > > > > > > > > To > > > > > > > > > > > > > > > > resolve this, > > > > > > > > > > > > > > > > > > > > > > > > I've proposed an alternative > > > > > > > > > > > > > > > > > > > > > > > > proposal > > > > > > [2] > > > > > > > > > > > > involving the > > > > > > > > > > > > > > > > introduction > > > > > > > > > > > > > > > > > > > > > > > > of a "mark DeleteProperties" > > > > > > (Map<String, > > > > > > > > > > String>) > > > > > > > > > > > > to > > > > > > > > > > > > > > > > replace the > > > > > > > > > > > > > > > > > > > > > > > > current properties (Map<String, > > > > > > > > > > > > > > > > > > > > > > > > Long>) > > > > > > > > field. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'd appreciate it if you > > > > > > > > > > > > > > > > > > > > > > > > carefully > > > > > > review > > > > > > > > both > > > > > > > > > > PRs > > > > > > > > > > > > and > > > > > > > > > > > > > > > > share your > > > > > > > > > > > > > > > > > > > > > > > > valuable feedback and insights. > > > > > > > > > > > > > > > > > > > > > > > > Thank > > > > > > you > > > > > > > > > > > > immensely for > > > > > > > > > > > > > > > > your time and > > > > > > > > > > > > > > > > > > > > > > > > attention. I eagerly anticipate > > > > > > > > > > > > > > > > > > > > > > > > your > > > > > > > > valuable > > > > > > > > > > > > opinions > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > > > > > > > > recommendations. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Warm regards, > > > > > > > > > > > > > > > > > > > > > > > > Xiangying > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > https://github.com/apache/pulsar/pull/20948 > > > > > > > > > > > > > > > > > > > > > > > > [2] > > > > > > > > > > https://github.com/apache/pulsar/pull/21027 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >