Hi Heesung

It is a good idea to cover this incompatibility case if the producer
splits the chunk message again when retrying.

But in fact, the producer only resents the chunks that are assembled
to `OpSendMsg` instead of splitting the chunk message again.
So, there is no incompatibility issue of resenting the chunk message
by splitting the chunk message again.

The logic of sending chunk messages can be found here:
https://github.com/apache/pulsar/blob/e0c481e5f8d7fa5534d3327785928a234376789e/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L533

The logic of resending the message can be found here:
https://github.com/apache/pulsar/blob/e0c481e5f8d7fa5534d3327785928a234376789e/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1892

BR,
Xiangying







On Sat, Aug 26, 2023 at 2:24 AM Heesung Sohn
<heesung.s...@streamnative.io.invalid> wrote:
>
> >> I think brokers can track the last chunkMaxMessageSize for each producer.
>
> > Using different chunkMaxMessageSize is just one of the aspects. In
> PIP-132 [0], we have included the message metadata size when checking
> maxMessageSize.The message metadata can be changed after splitting the
> chunks. We are still uncertain about the way the chunked message is split,
> even using the same ss chunkMaxMessageSize.
>
> This sounds like we need to revisit chunking uuid logic.
> Like I commented here,
> https://github.com/apache/pulsar/pull/20948/files#r1305997883
> Why don't we add a chunk session id suffix to identify the ongoing chunking
> uniquely?
>
> Currently,
>
> chunking uuid = producer + sequence_id
>
> Proposal
> chunking  uuid = producer + sequence_id + chunkingSessionId
>
> * chunkingSessionId could be a timestamp when the chunking started.
>
>
>
> On Fri, Aug 25, 2023 at 6:02 AM Xiangying Meng <xiangy...@apache.org> wrote:
>
> > Hi Zike,
> >
> > >How would this happen to get two duplicated and consecutive ChunkID-1
> > >messages? The producer should guarantee to retry the whole chunked
> > >messages instead of some parts of the chunks.
> >
> > If the producer guarantees to retry the whole chunked messages instead
> > of some parts of the chunks,
> > Why doesn't the bug of the producer retry chunk messages in the PR [0]
> > appear?
> > And why do you need to set `chunkId` in `op.rePopulate`?
> > It will be rested when split chunk messages again if the producer
> > guarantees to retry the whole chunked messages.
> > ```
> > final MessageMetadata finalMsgMetadata = msgMetadata;
> > op.rePopulate = () -> {
> > if (msgMetadata.hasChunkId()) {
> > // The message metadata is shared between all chunks in a large message
> > // We need to reset the chunk id for each call of this method
> > // It's safe to do that because there is only 1 thread to manipulate
> > this message metadata
> > finalMsgMetadata.setChunkId(chunkId);
> > }
> > op.cmd = sendMessage(producerId, sequenceId, numMessages, messageId,
> > finalMsgMetadata,
> > encryptedPayload);
> > };
> >
> > ```
> >
> > >> But chunks 1, 2, 3, and 4 are still persisted in the topic.
> > >
> > >I think it's OK to persist them all on the topic. Is there any issue
> > >with doing that?
> >
> > This is just one scenario. Whether only check the sequence ID of the
> > first chunk (as I used in PR[1]) or check the sequence ID of the last
> > chunk (as you suggested), in reality, neither of these methods can
> > deduplicate chunks on the broker side because the broker cannot know
> > the chunk ID of the previous message.
> >
> > However, if combined with the optimization of consumer-side logic,
> > end-to-end deduplication can be completed.
> > This is also a less-than-perfect solution I mentioned in my first
> > email and implemented in PR[1].
> >
> > The reason I propose this proposal is not to solve the end-to-end
> > deduplication of chunk messages between producers and consumers. That
> > aspect has essentially been addressed in PR[1] and is still undergoing
> > review.
> >
> > This proposal aims to ensure that no corrupt data exists within the
> > topic, as our data might be offloaded and used elsewhere in scenarios
> > where consumer logic is not optimized.
> >
> > BR,
> > Xiangying
> >
> > [0] https://github.com/apache/pulsar/pull/21048
> > [1] https://github.com/apache/pulsar/pull/20948
> >
> > On Fri, Aug 25, 2023 at 5:18 PM Zike Yang <z...@apache.org> wrote:
> > >
> > > HI xiangying
> > >
> > > > The rewind operation is seen in the test log.
> > >
> > > That seems weird. Not sure if this rewind is related to the chunk
> > consuming.
> > >
> > > > 1. SequenceID: 0, ChunkID: 0
> > > 2. SequenceID: 0, ChunkID: 1
> > > 3. SequenceID: 0, ChunkID: 1
> > > 4. SequenceID: 0, ChunkID: 2
> > > Such four chunks cannot be processed correctly by the consumer.
> > >
> > > How would this happen to get two duplicated and consecutive ChunkID-1
> > > messages? The producer should guarantee to retry the whole chunked
> > > messages instead of some parts of the chunks.
> > >
> > > > But chunks 1, 2, 3, and 4 are still persisted in the topic.
> > >
> > > I think it's OK to persist them all in the topic. Is there any issue
> > > with doing that?
> > >
> > > > There is another point. The resend of the chunk message has a bug that
> > > I shared with you, and you fixed in PR [0]. It will make this case
> > > happen in another way.
> > >
> > > If the user sets the sequence ID manually, the case could be reproduced.
> > >
> > > BR,
> > > Zike Yang
> > >
> > > On Thu, Aug 24, 2023 at 8:48 PM Xiangying Meng <xiangy...@apache.org>
> > wrote:
> > > >
> > > > >IIUC, this may change the existing behavior and may introduce
> > inconsistencies.
> > > > >Suppose that we have a large message with 3 chunks. But the producer
> > > > >crashes and resends the message after sending the chunk-1. It will
> > > > >send a total of 5 messages to the Pulsar topic:
> > > > >
> > > > >1. SequenceID: 0, ChunkID: 0
> > > > >2. SequenceID: 0, ChunkID: 1
> > > > >3. SequenceID: 0, ChunkID: 0   -> This message will be dropped
> > > > >4. SequenceID: 0, ChunkID: 1    -> Will also be dropped
> > > > >5. SequenceID: 0, ChunkID: 2    -> The last chunk of the message
> > > >
> > > > Hi Zike
> > > > There is another point. The resend of the chunk message has a bug that
> > > > I shared with you, and you fixed in PR [0]. It will make this case
> > > > happen in another way.
> > > > Sample description for the  bug:
> > > > Because the chunk message uses the same message metadata, if the chunk
> > > > is not sent out immediately. Then, when resending, all chunks of the
> > > > same chunk message use the chunk ID of the last chunk.
> > > > In this case, It should happen as:
> > > > 1. SequenceID: 0, ChunkID: 0 (Put op1 into `pendingMessages` and send)
> > > > 2. SequenceID: 0, ChunkID: 1 (Put op2 into `pendingMessages` and send)
> > > > 3. SequenceID: 0, ChunkID: 2   -> (Put op3 into `pendingMessages`)
> > > > 4. SequenceID: 0, ChunkID: 2   -> (Resend op1)
> > > > 5. SequenceID: 0, ChunkID: 2   -> (Resend op2)
> > > > 6. SequenceID: 0, ChunkID: 2   -> (Send op3)
> > > >
> > > >
> > > > BR,
> > > > Xiangying
> > > >
> > > > [0] - https://github.com/apache/pulsar/pull/21048
> > > >
> > > > On Thu, Aug 24, 2023 at 8:09 PM Xiangying Meng <xiangy...@apache.org>
> > wrote:
> > > > >
> > > > > >> This solution also cannot solve the out-of-order messages inside
> > the
> > > > > >>chunks. For example, the above five messages will still be
> > persisted.
> > > > > >The consumer already handles this case. The above 5 messages will
> > all
> > > > > >be persisted but the consumer will skip message 1 and 2.
> > > > > >For messages 3, 4, and 5. The producer can guarantee these chunks
> > are in order.
> > > > >
> > > > > The rewind operation is seen in the test log. Every time an incorrect
> > > > > chunk message is received, it will rewind, and the code has yet to be
> > > > > studied in depth.
> > > > > If it does not call rewind, then this case is considered a workable
> > > > > case. Let's look at another case.
> > > > > 1. SequenceID: 0, ChunkID: 0
> > > > > 2. SequenceID: 0, ChunkID: 1
> > > > > 3. SequenceID: 0, ChunkID: 1
> > > > > 4. SequenceID: 0, ChunkID: 2
> > > > > Such four chunks cannot be processed correctly by the consumer.
> > > > >
> > > > > In fact, this solution is my original idea. The PR I mentioned in the
> > > > > first email above uses a similar solution and modifies the logic on
> > > > > the consumer side.
> > > > > Also, as I mentioned in the first email, this solution can only solve
> > > > > the problem of end-to-end duplication. But chunks 1, 2, 3, and 4 are
> > > > > still persisted in the topic.
> > > > >
> > > > > On Thu, Aug 24, 2023 at 3:00 PM Zike Yang <z...@apache.org> wrote:
> > > > > >
> > > > > > Hi Heesung,
> > > > > >
> > > > > > > I believe in this PIP "similar to the existing "sequence ID map",
> > > > > > to facilitate effective filtering" actually means tracking the last
> > > > > > chunkId(not all chunk ids) on the broker side.
> > > > > >
> > > > > > With this simple solution, I think we don't need to track the
> > > > > > (sequenceID, chunkID) on the broker side at all. The broker just
> > needs
> > > > > > to apply the deduplication logic to the last chunk instead of all
> > > > > > previous chunks. This PIP actually could do that, but it will
> > > > > > introduce a new data format and compatibility issue.
> > > > > >
> > > > > > > This is still a behavior change(deduping chunk messages on the
> > broker),
> > > > > > and I believe we need to discuss this addition as a PIP.
> > > > > >
> > > > > > Actually, we didn't specifically state the deduping chunk message
> > > > > > behavior before. The chunked message should be equally applicable
> > to
> > > > > > the de-duplication logic as a regular message. Therefore, I think
> > it
> > > > > > should be considered as a bug fix. But if this FIX is worth
> > discussing
> > > > > > in depth. I have no objection to it being a new PIP.
> > > > > >
> > > > > > > I think brokers can track the last chunkMaxMessageSize for
> > > > > > each producer.
> > > > > >
> > > > > > Using different chunkMaxMessageSize is just one of the aspects. In
> > > > > > PIP-132 [0], we have included the message metadata size when
> > checking
> > > > > > maxMessageSize.
> > > > > > The message metadata can be changed after splitting the chunks. We
> > are
> > > > > > still uncertain about the way the chunked message is split, even
> > using
> > > > > > the same ss chunkMaxMessageSize.
> > > > > >
> > > > > > > then the brokers can assume that the producer is resending the
> > chunks from
> > > > > > the beginning with a different scheme(restarted with a different
> > > > > > chunkMaxMessageSize) and accept those new chunks from the
> > beginning.
> > > > > >
> > > > > > Regarding this, it seems like we are implementing dynamic
> > > > > > configuration for the chunkMaxMessageSize. I'm afraid that this
> > would
> > > > > > change the expected behavior and introduce more complexity to this
> > > > > > configuration.
> > > > > >
> > > > > >
> > > > > > [0] https://github.com/apache/pulsar/pull/14007
> > > > > >
> > > > > > BR,
> > > > > > Zike Yang
> > > > > >
> > > > > > On Thu, Aug 24, 2023 at 2:21 PM Zike Yang <z...@apache.org> wrote:
> > > > > > >
> > > > > > > Hi, xiangying
> > > > > > >
> > > > > > > > it will find that the message
> > > > > > > is out of order and rewind the cursor. Loop this operation, and
> > > > > > > discard this message after it expires instead of assembling 3,
> > 4, 5
> > > > > > > into a message.
> > > > > > >
> > > > > > > Could you point out where the implementation for this?  From my
> > > > > > > understanding, there should not be any rewind operation for the
> > > > > > > chunking feature. You can check more detail here:
> > > > > > >
> > https://streamnative.io/blog/deep-dive-into-message-chunking-in-pulsar#how-message-chunking-is-implemented
> > > > > > >
> > > > > > > > This solution also cannot solve the out-of-order messages
> > inside the
> > > > > > > chunks. For example, the above five messages will still be
> > persisted.
> > > > > > >
> > > > > > > The consumer already handles this case. The above 5 messages
> > will all
> > > > > > > be persisted but the consumer will skip message 1 and 2.
> > > > > > > For messages 3, 4, and 5. The producer can guarantee these
> > chunks are in order.
> > > > > > >
> > > > > > > BR,
> > > > > > > Zike Yang
> > > > > > >
> > > > > > > On Thu, Aug 24, 2023 at 11:48 AM Yubiao Feng
> > > > > > > <yubiao.f...@streamnative.io.invalid> wrote:
> > > > > > > >
> > > > > > > > > 1. SequenceID: 0, ChunkID: 0
> > > > > > > > > 2. SequenceID: 0, ChunkID: 1
> > > > > > > > > 3. SequenceID: 0, ChunkID: 0
> > > > > > > > > 4. SequenceID: 0, ChunkID: 1
> > > > > > > > > 5. SequenceID: 0, ChunkID: 2
> > > > > > > > > For the existing behavior, the consumer assembles
> > > > > > > > > messages 3,4,5 into
> > > > > > > > > the original large message. But the changes brought
> > > > > > > > > about by this PIP
> > > > > > > > > will cause the consumer to use messages 1,2,5 for
> > > > > > > > > assembly. There is
> > > > > > > > > no guarantee that the producer will split the message
> > > > > > > > > in the same way
> > > > > > > > > twice before and after. For example, the producer's
> > > > > > > > > maxMessageSize may
> > > > > > > > > be different. This may cause the consumer to
> > > > > > > > > receive a corrupt
> > > > > > > > > message.
> > > > > > > >
> > > > > > > > Good point.
> > > > > > > >
> > > > > > > >
> > > > > > > > Thanks
> > > > > > > > Yubiao Feng
> > > > > > > >
> > > > > > > > On Wed, Aug 23, 2023 at 12:34 PM Zike Yang <z...@apache.org>
> > wrote:
> > > > > > > >
> > > > > > > > > Hi, xiangying,
> > > > > > > > >
> > > > > > > > > Thanks for your PIP.
> > > > > > > > >
> > > > > > > > > IIUC, this may change the existing behavior and may introduce
> > > > > > > > > inconsistencies.
> > > > > > > > > Suppose that we have a large message with 3 chunks. But the
> > producer
> > > > > > > > > crashes and resends the message after sending the chunk-1.
> > It will
> > > > > > > > > send a total of 5 messages to the Pulsar topic:
> > > > > > > > >
> > > > > > > > > 1. SequenceID: 0, ChunkID: 0
> > > > > > > > > 2. SequenceID: 0, ChunkID: 1
> > > > > > > > > 3. SequenceID: 0, ChunkID: 0   -> This message will be
> > dropped
> > > > > > > > > 4. SequenceID: 0, ChunkID: 1    -> Will also be dropped
> > > > > > > > > 5. SequenceID: 0, ChunkID: 2    -> The last chunk of the
> > message
> > > > > > > > >
> > > > > > > > > For the existing behavior, the consumer assembles messages
> > 3,4,5 into
> > > > > > > > > the original large message. But the changes brought about by
> > this PIP
> > > > > > > > > will cause the consumer to use messages 1,2,5 for assembly.
> > There is
> > > > > > > > > no guarantee that the producer will split the message in the
> > same way
> > > > > > > > > twice before and after. For example, the producer's
> > maxMessageSize may
> > > > > > > > > be different. This may cause the consumer to receive a
> > corrupt
> > > > > > > > > message.
> > > > > > > > >
> > > > > > > > > Also, this PIP increases the complexity of handling chunks
> > on the
> > > > > > > > > broker side. Brokers should, in general, treat the chunk as
> > a normal
> > > > > > > > > message.
> > > > > > > > >
> > > > > > > > > I think a simple better approach is to only check the
> > deduplication
> > > > > > > > > for the last chunk of the large message. The consumer only
> > gets the
> > > > > > > > > whole message after receiving the last chunk. We don't need
> > to check
> > > > > > > > > the deduplication for all previous chunks. Also by doing
> > this we only
> > > > > > > > > need bug fixes, we don't need to introduce a new PIP.
> > > > > > > > >
> > > > > > > > > BR,
> > > > > > > > > Zike Yang
> > > > > > > > >
> > > > > > > > > On Fri, Aug 18, 2023 at 7:54 PM Xiangying Meng <
> > xiangy...@apache.org>
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > Dear Community,
> > > > > > > > > >
> > > > > > > > > > I hope this email finds you well. I'd like to address an
> > important
> > > > > > > > > > issue related to Apache Pulsar and discuss a solution I've
> > proposed on
> > > > > > > > > > GitHub. The problem pertains to the handling of Chunk
> > Messages after
> > > > > > > > > > enabling deduplication.
> > > > > > > > > >
> > > > > > > > > > In the current version of Apache Pulsar, all chunks of a
> > Chunk Message
> > > > > > > > > > share the same sequence ID. However, enabling the
> > depublication
> > > > > > > > > > feature results in an inability to send Chunk Messages. To
> > tackle this
> > > > > > > > > > problem, I've proposed a solution [1] that ensures
> > messages are not
> > > > > > > > > > duplicated throughout end-to-end delivery. While this fix
> > addresses
> > > > > > > > > > the duplication issue for end-to-end messages, there
> > remains a
> > > > > > > > > > possibility of duplicate chunks within topics.
> > > > > > > > > >
> > > > > > > > > > To address this concern, I believe we should introduce a
> > "Chunk ID
> > > > > > > > > > map" at the Broker level, similar to the existing
> > "sequence ID map",
> > > > > > > > > > to facilitate effective filtering. However, implementing
> > this has led
> > > > > > > > > > to a challenge: a producer requires storage for two Long
> > values
> > > > > > > > > > simultaneously (sequence ID and chunk ID). Because the
> > snapshot of the
> > > > > > > > > > sequence ID map is stored through the properties of the
> > cursor
> > > > > > > > > > (Map<String, Long>), so in order to satisfy the storage of
> > two Longs
> > > > > > > > > > (sequence ID, chunk ID) corresponding to one producer, we
> > hope to add
> > > > > > > > > > a mark DeleteProperties (Map<String, Long>) String,
> > String>) to
> > > > > > > > > > replace the properties (Map<String, Long>) field. To
> > resolve this,
> > > > > > > > > > I've proposed an alternative proposal [2] involving the
> > introduction
> > > > > > > > > > of a "mark DeleteProperties" (Map<String, String>) to
> > replace the
> > > > > > > > > > current properties (Map<String, Long>) field.
> > > > > > > > > >
> > > > > > > > > > I'd appreciate it if you carefully review both PRs and
> > share your
> > > > > > > > > > valuable feedback and insights. Thank you immensely for
> > your time and
> > > > > > > > > > attention. I eagerly anticipate your valuable opinions and
> > > > > > > > > > recommendations.
> > > > > > > > > >
> > > > > > > > > > Warm regards,
> > > > > > > > > > Xiangying
> > > > > > > > > >
> > > > > > > > > > [1] https://github.com/apache/pulsar/pull/20948
> > > > > > > > > > [2] https://github.com/apache/pulsar/pull/21027
> > > > > > > > >
> >

Reply via email to