Hi Zike,

>How would this happen to get two duplicated and consecutive ChunkID-1
>messages? The producer should guarantee to retry the whole chunked
>messages instead of some parts of the chunks.

If the producer guarantees to retry the whole chunked messages instead
of some parts of the chunks,
Why doesn't the bug of the producer retry chunk messages in the PR [0] appear?
And why do you need to set `chunkId` in `op.rePopulate`?
It will be rested when split chunk messages again if the producer
guarantees to retry the whole chunked messages.
```
final MessageMetadata finalMsgMetadata = msgMetadata;
op.rePopulate = () -> {
if (msgMetadata.hasChunkId()) {
// The message metadata is shared between all chunks in a large message
// We need to reset the chunk id for each call of this method
// It's safe to do that because there is only 1 thread to manipulate
this message metadata
finalMsgMetadata.setChunkId(chunkId);
}
op.cmd = sendMessage(producerId, sequenceId, numMessages, messageId,
finalMsgMetadata,
encryptedPayload);
};

```

>> But chunks 1, 2, 3, and 4 are still persisted in the topic.
>
>I think it's OK to persist them all on the topic. Is there any issue
>with doing that?

This is just one scenario. Whether only check the sequence ID of the
first chunk (as I used in PR[1]) or check the sequence ID of the last
chunk (as you suggested), in reality, neither of these methods can
deduplicate chunks on the broker side because the broker cannot know
the chunk ID of the previous message.

However, if combined with the optimization of consumer-side logic,
end-to-end deduplication can be completed.
This is also a less-than-perfect solution I mentioned in my first
email and implemented in PR[1].

The reason I propose this proposal is not to solve the end-to-end
deduplication of chunk messages between producers and consumers. That
aspect has essentially been addressed in PR[1] and is still undergoing
review.

This proposal aims to ensure that no corrupt data exists within the
topic, as our data might be offloaded and used elsewhere in scenarios
where consumer logic is not optimized.

BR,
Xiangying

[0] https://github.com/apache/pulsar/pull/21048
[1] https://github.com/apache/pulsar/pull/20948

On Fri, Aug 25, 2023 at 5:18 PM Zike Yang <z...@apache.org> wrote:
>
> HI xiangying
>
> > The rewind operation is seen in the test log.
>
> That seems weird. Not sure if this rewind is related to the chunk consuming.
>
> > 1. SequenceID: 0, ChunkID: 0
> 2. SequenceID: 0, ChunkID: 1
> 3. SequenceID: 0, ChunkID: 1
> 4. SequenceID: 0, ChunkID: 2
> Such four chunks cannot be processed correctly by the consumer.
>
> How would this happen to get two duplicated and consecutive ChunkID-1
> messages? The producer should guarantee to retry the whole chunked
> messages instead of some parts of the chunks.
>
> > But chunks 1, 2, 3, and 4 are still persisted in the topic.
>
> I think it's OK to persist them all in the topic. Is there any issue
> with doing that?
>
> > There is another point. The resend of the chunk message has a bug that
> I shared with you, and you fixed in PR [0]. It will make this case
> happen in another way.
>
> If the user sets the sequence ID manually, the case could be reproduced.
>
> BR,
> Zike Yang
>
> On Thu, Aug 24, 2023 at 8:48 PM Xiangying Meng <xiangy...@apache.org> wrote:
> >
> > >IIUC, this may change the existing behavior and may introduce 
> > >inconsistencies.
> > >Suppose that we have a large message with 3 chunks. But the producer
> > >crashes and resends the message after sending the chunk-1. It will
> > >send a total of 5 messages to the Pulsar topic:
> > >
> > >1. SequenceID: 0, ChunkID: 0
> > >2. SequenceID: 0, ChunkID: 1
> > >3. SequenceID: 0, ChunkID: 0   -> This message will be dropped
> > >4. SequenceID: 0, ChunkID: 1    -> Will also be dropped
> > >5. SequenceID: 0, ChunkID: 2    -> The last chunk of the message
> >
> > Hi Zike
> > There is another point. The resend of the chunk message has a bug that
> > I shared with you, and you fixed in PR [0]. It will make this case
> > happen in another way.
> > Sample description for the  bug:
> > Because the chunk message uses the same message metadata, if the chunk
> > is not sent out immediately. Then, when resending, all chunks of the
> > same chunk message use the chunk ID of the last chunk.
> > In this case, It should happen as:
> > 1. SequenceID: 0, ChunkID: 0 (Put op1 into `pendingMessages` and send)
> > 2. SequenceID: 0, ChunkID: 1 (Put op2 into `pendingMessages` and send)
> > 3. SequenceID: 0, ChunkID: 2   -> (Put op3 into `pendingMessages`)
> > 4. SequenceID: 0, ChunkID: 2   -> (Resend op1)
> > 5. SequenceID: 0, ChunkID: 2   -> (Resend op2)
> > 6. SequenceID: 0, ChunkID: 2   -> (Send op3)
> >
> >
> > BR,
> > Xiangying
> >
> > [0] - https://github.com/apache/pulsar/pull/21048
> >
> > On Thu, Aug 24, 2023 at 8:09 PM Xiangying Meng <xiangy...@apache.org> wrote:
> > >
> > > >> This solution also cannot solve the out-of-order messages inside the
> > > >>chunks. For example, the above five messages will still be persisted.
> > > >The consumer already handles this case. The above 5 messages will all
> > > >be persisted but the consumer will skip message 1 and 2.
> > > >For messages 3, 4, and 5. The producer can guarantee these chunks are in 
> > > >order.
> > >
> > > The rewind operation is seen in the test log. Every time an incorrect
> > > chunk message is received, it will rewind, and the code has yet to be
> > > studied in depth.
> > > If it does not call rewind, then this case is considered a workable
> > > case. Let's look at another case.
> > > 1. SequenceID: 0, ChunkID: 0
> > > 2. SequenceID: 0, ChunkID: 1
> > > 3. SequenceID: 0, ChunkID: 1
> > > 4. SequenceID: 0, ChunkID: 2
> > > Such four chunks cannot be processed correctly by the consumer.
> > >
> > > In fact, this solution is my original idea. The PR I mentioned in the
> > > first email above uses a similar solution and modifies the logic on
> > > the consumer side.
> > > Also, as I mentioned in the first email, this solution can only solve
> > > the problem of end-to-end duplication. But chunks 1, 2, 3, and 4 are
> > > still persisted in the topic.
> > >
> > > On Thu, Aug 24, 2023 at 3:00 PM Zike Yang <z...@apache.org> wrote:
> > > >
> > > > Hi Heesung,
> > > >
> > > > > I believe in this PIP "similar to the existing "sequence ID map",
> > > > to facilitate effective filtering" actually means tracking the last
> > > > chunkId(not all chunk ids) on the broker side.
> > > >
> > > > With this simple solution, I think we don't need to track the
> > > > (sequenceID, chunkID) on the broker side at all. The broker just needs
> > > > to apply the deduplication logic to the last chunk instead of all
> > > > previous chunks. This PIP actually could do that, but it will
> > > > introduce a new data format and compatibility issue.
> > > >
> > > > > This is still a behavior change(deduping chunk messages on the 
> > > > > broker),
> > > > and I believe we need to discuss this addition as a PIP.
> > > >
> > > > Actually, we didn't specifically state the deduping chunk message
> > > > behavior before. The chunked message should be equally applicable to
> > > > the de-duplication logic as a regular message. Therefore, I think it
> > > > should be considered as a bug fix. But if this FIX is worth discussing
> > > > in depth. I have no objection to it being a new PIP.
> > > >
> > > > > I think brokers can track the last chunkMaxMessageSize for
> > > > each producer.
> > > >
> > > > Using different chunkMaxMessageSize is just one of the aspects. In
> > > > PIP-132 [0], we have included the message metadata size when checking
> > > > maxMessageSize.
> > > > The message metadata can be changed after splitting the chunks. We are
> > > > still uncertain about the way the chunked message is split, even using
> > > > the same ss chunkMaxMessageSize.
> > > >
> > > > > then the brokers can assume that the producer is resending the chunks 
> > > > > from
> > > > the beginning with a different scheme(restarted with a different
> > > > chunkMaxMessageSize) and accept those new chunks from the beginning.
> > > >
> > > > Regarding this, it seems like we are implementing dynamic
> > > > configuration for the chunkMaxMessageSize. I'm afraid that this would
> > > > change the expected behavior and introduce more complexity to this
> > > > configuration.
> > > >
> > > >
> > > > [0] https://github.com/apache/pulsar/pull/14007
> > > >
> > > > BR,
> > > > Zike Yang
> > > >
> > > > On Thu, Aug 24, 2023 at 2:21 PM Zike Yang <z...@apache.org> wrote:
> > > > >
> > > > > Hi, xiangying
> > > > >
> > > > > > it will find that the message
> > > > > is out of order and rewind the cursor. Loop this operation, and
> > > > > discard this message after it expires instead of assembling 3, 4, 5
> > > > > into a message.
> > > > >
> > > > > Could you point out where the implementation for this?  From my
> > > > > understanding, there should not be any rewind operation for the
> > > > > chunking feature. You can check more detail here:
> > > > > https://streamnative.io/blog/deep-dive-into-message-chunking-in-pulsar#how-message-chunking-is-implemented
> > > > >
> > > > > > This solution also cannot solve the out-of-order messages inside the
> > > > > chunks. For example, the above five messages will still be persisted.
> > > > >
> > > > > The consumer already handles this case. The above 5 messages will all
> > > > > be persisted but the consumer will skip message 1 and 2.
> > > > > For messages 3, 4, and 5. The producer can guarantee these chunks are 
> > > > > in order.
> > > > >
> > > > > BR,
> > > > > Zike Yang
> > > > >
> > > > > On Thu, Aug 24, 2023 at 11:48 AM Yubiao Feng
> > > > > <yubiao.f...@streamnative.io.invalid> wrote:
> > > > > >
> > > > > > > 1. SequenceID: 0, ChunkID: 0
> > > > > > > 2. SequenceID: 0, ChunkID: 1
> > > > > > > 3. SequenceID: 0, ChunkID: 0
> > > > > > > 4. SequenceID: 0, ChunkID: 1
> > > > > > > 5. SequenceID: 0, ChunkID: 2
> > > > > > > For the existing behavior, the consumer assembles
> > > > > > > messages 3,4,5 into
> > > > > > > the original large message. But the changes brought
> > > > > > > about by this PIP
> > > > > > > will cause the consumer to use messages 1,2,5 for
> > > > > > > assembly. There is
> > > > > > > no guarantee that the producer will split the message
> > > > > > > in the same way
> > > > > > > twice before and after. For example, the producer's
> > > > > > > maxMessageSize may
> > > > > > > be different. This may cause the consumer to
> > > > > > > receive a corrupt
> > > > > > > message.
> > > > > >
> > > > > > Good point.
> > > > > >
> > > > > >
> > > > > > Thanks
> > > > > > Yubiao Feng
> > > > > >
> > > > > > On Wed, Aug 23, 2023 at 12:34 PM Zike Yang <z...@apache.org> wrote:
> > > > > >
> > > > > > > Hi, xiangying,
> > > > > > >
> > > > > > > Thanks for your PIP.
> > > > > > >
> > > > > > > IIUC, this may change the existing behavior and may introduce
> > > > > > > inconsistencies.
> > > > > > > Suppose that we have a large message with 3 chunks. But the 
> > > > > > > producer
> > > > > > > crashes and resends the message after sending the chunk-1. It will
> > > > > > > send a total of 5 messages to the Pulsar topic:
> > > > > > >
> > > > > > > 1. SequenceID: 0, ChunkID: 0
> > > > > > > 2. SequenceID: 0, ChunkID: 1
> > > > > > > 3. SequenceID: 0, ChunkID: 0   -> This message will be dropped
> > > > > > > 4. SequenceID: 0, ChunkID: 1    -> Will also be dropped
> > > > > > > 5. SequenceID: 0, ChunkID: 2    -> The last chunk of the message
> > > > > > >
> > > > > > > For the existing behavior, the consumer assembles messages 3,4,5 
> > > > > > > into
> > > > > > > the original large message. But the changes brought about by this 
> > > > > > > PIP
> > > > > > > will cause the consumer to use messages 1,2,5 for assembly. There 
> > > > > > > is
> > > > > > > no guarantee that the producer will split the message in the same 
> > > > > > > way
> > > > > > > twice before and after. For example, the producer's 
> > > > > > > maxMessageSize may
> > > > > > > be different. This may cause the consumer to receive a corrupt
> > > > > > > message.
> > > > > > >
> > > > > > > Also, this PIP increases the complexity of handling chunks on the
> > > > > > > broker side. Brokers should, in general, treat the chunk as a 
> > > > > > > normal
> > > > > > > message.
> > > > > > >
> > > > > > > I think a simple better approach is to only check the 
> > > > > > > deduplication
> > > > > > > for the last chunk of the large message. The consumer only gets 
> > > > > > > the
> > > > > > > whole message after receiving the last chunk. We don't need to 
> > > > > > > check
> > > > > > > the deduplication for all previous chunks. Also by doing this we 
> > > > > > > only
> > > > > > > need bug fixes, we don't need to introduce a new PIP.
> > > > > > >
> > > > > > > BR,
> > > > > > > Zike Yang
> > > > > > >
> > > > > > > On Fri, Aug 18, 2023 at 7:54 PM Xiangying Meng 
> > > > > > > <xiangy...@apache.org>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > Dear Community,
> > > > > > > >
> > > > > > > > I hope this email finds you well. I'd like to address an 
> > > > > > > > important
> > > > > > > > issue related to Apache Pulsar and discuss a solution I've 
> > > > > > > > proposed on
> > > > > > > > GitHub. The problem pertains to the handling of Chunk Messages 
> > > > > > > > after
> > > > > > > > enabling deduplication.
> > > > > > > >
> > > > > > > > In the current version of Apache Pulsar, all chunks of a Chunk 
> > > > > > > > Message
> > > > > > > > share the same sequence ID. However, enabling the depublication
> > > > > > > > feature results in an inability to send Chunk Messages. To 
> > > > > > > > tackle this
> > > > > > > > problem, I've proposed a solution [1] that ensures messages are 
> > > > > > > > not
> > > > > > > > duplicated throughout end-to-end delivery. While this fix 
> > > > > > > > addresses
> > > > > > > > the duplication issue for end-to-end messages, there remains a
> > > > > > > > possibility of duplicate chunks within topics.
> > > > > > > >
> > > > > > > > To address this concern, I believe we should introduce a "Chunk 
> > > > > > > > ID
> > > > > > > > map" at the Broker level, similar to the existing "sequence ID 
> > > > > > > > map",
> > > > > > > > to facilitate effective filtering. However, implementing this 
> > > > > > > > has led
> > > > > > > > to a challenge: a producer requires storage for two Long values
> > > > > > > > simultaneously (sequence ID and chunk ID). Because the snapshot 
> > > > > > > > of the
> > > > > > > > sequence ID map is stored through the properties of the cursor
> > > > > > > > (Map<String, Long>), so in order to satisfy the storage of two 
> > > > > > > > Longs
> > > > > > > > (sequence ID, chunk ID) corresponding to one producer, we hope 
> > > > > > > > to add
> > > > > > > > a mark DeleteProperties (Map<String, Long>) String, String>) to
> > > > > > > > replace the properties (Map<String, Long>) field. To resolve 
> > > > > > > > this,
> > > > > > > > I've proposed an alternative proposal [2] involving the 
> > > > > > > > introduction
> > > > > > > > of a "mark DeleteProperties" (Map<String, String>) to replace 
> > > > > > > > the
> > > > > > > > current properties (Map<String, Long>) field.
> > > > > > > >
> > > > > > > > I'd appreciate it if you carefully review both PRs and share 
> > > > > > > > your
> > > > > > > > valuable feedback and insights. Thank you immensely for your 
> > > > > > > > time and
> > > > > > > > attention. I eagerly anticipate your valuable opinions and
> > > > > > > > recommendations.
> > > > > > > >
> > > > > > > > Warm regards,
> > > > > > > > Xiangying
> > > > > > > >
> > > > > > > > [1] https://github.com/apache/pulsar/pull/20948
> > > > > > > > [2] https://github.com/apache/pulsar/pull/21027
> > > > > > >

Reply via email to