geniusjoe commented on code in PR #1464:
URL: https://github.com/apache/pulsar-client-go/pull/1464#discussion_r2746397714


##########
pulsar/consumer_partition.go:
##########
@@ -1456,31 +1456,137 @@ func (pc *partitionConsumer) 
processMessageChunk(compressedPayload internal.Buff
                partitionIdx: pc.partitionIdx,
        }
 
-       if msgMeta.GetChunkId() == 0 {
+       if chunkID == 0 {
+               // Handle ack hole case when receive duplicated chunks.
+               // There are two situation that receives chunks with the same 
sequence ID and chunk ID.
+               // Situation 1 - Message redeliver:
+               // For example:
+               //     Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+               //     Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+               //     Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:1
+               //     Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:2
+               //     Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:3
+               // In this case, chunk-3 and chunk-4 have the same msgID with 
chunk-1 and chunk-2.
+               // This may be caused by message redeliver, we can't ack any 
chunk in this case here.
+               // Situation 2 - Corrupted chunk message
+               // For example:
+               //     Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+               //     Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+               //     Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:3
+               //     Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
+               //     Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
+               // In this case, all the chunks with different msgIDs and are 
persistent in the topic.
+               // But Chunk-1 and Chunk-2 belong to a corrupted chunk message 
that must be skipped since
+               // they will not be delivered to end users. So we should ack 
them here to avoid ack hole.
+               if pc.chunkedMsgCtxMap.get(uuid) != nil {
+                       ctx := pc.chunkedMsgCtxMap.get(uuid)

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to