This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new f3b50d6c114 [fix][client] Avoid ack hole for chunk message (#21101)
f3b50d6c114 is described below

commit f3b50d6c114f12ec8e65fa0234f8ab796ed5cf33
Author: Xiangying Meng <[email protected]>
AuthorDate: Mon Sep 4 08:50:49 2023 +0800

    [fix][client] Avoid ack hole for chunk message (#21101)
    
    ## Motivation
    Handle ack hole case:
    For example:
    ```markdown
                         Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
                         Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
                         Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:3
                         Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
                         Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
    ```
     Consumer ack chunk message via ChunkMessageIdImpl that consists of all the 
chunks in this chunk
     message(Chunk-3, Chunk-4, Chunk-5). The Chunk-1 and Chunk-2 are not 
included in the
     ChunkMessageIdImpl, so we should process it here.
    ## Modification
    Ack chunk-1 and chunk-2.
---
 .../pulsar/client/impl/MessageChunkingTest.java    | 33 ++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 46 ++++++++++++++++++----
 2 files changed, 72 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
index dffa0035248..f266afd8a2e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@@ -356,6 +356,38 @@ public class MessageChunkingTest extends 
ProducerConsumerBase {
         assertNull(consumer.receive(5, TimeUnit.SECONDS));
     }
 
+    @Test
+    public void testResendChunkMessagesWithoutAckHole() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+        final String topicName = 
"persistent://my-property/my-ns/testResendChunkMessagesWithoutAckHole";
+        final String subName = "my-subscriber-name";
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(subName)
+                .maxPendingChunkedMessage(10)
+                .autoAckOldestChunkedMessageOnQueueFull(true)
+                .subscribe();
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .chunkMaxMessageSize(100)
+                .enableChunking(true)
+                .enableBatching(false)
+                .create();
+
+        sendSingleChunk(producer, "0", 0, 2);
+
+        sendSingleChunk(producer, "0", 0, 2); // Resending the first chunk
+        sendSingleChunk(producer, "0", 1, 2);
+
+        Message<String> receivedMsg = consumer.receive(5, TimeUnit.SECONDS);
+        assertEquals(receivedMsg.getValue(), "chunk-0-0|chunk-0-1|");
+        consumer.acknowledge(receivedMsg);
+        
assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName)
+                .getNonContiguousDeletedMessagesRanges(), 0);
+    }
+
     @Test
     public void testResendChunkMessages() throws Exception {
         log.info("-- Starting {} test --", methodName);
@@ -395,6 +427,7 @@ public class MessageChunkingTest extends 
ProducerConsumerBase {
         receivedMsg = consumer.receive(5, TimeUnit.SECONDS);
         assertEquals(receivedMsg.getValue(), "chunk-1-0|chunk-1-1|chunk-1-2|");
         consumer.acknowledge(receivedMsg);
+        Assert.assertEquals(((ConsumerImpl<String>) 
consumer).getAvailablePermits(), 8);
     }
 
     /**
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 8263e7f3198..3a542e66104 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1413,7 +1413,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
     private ByteBuf processMessageChunk(ByteBuf compressedPayload, 
MessageMetadata msgMetadata, MessageIdImpl msgId,
             MessageIdData messageId, ClientCnx cnx) {
-
+        if (msgMetadata.getChunkId() != (msgMetadata.getNumChunksFromMsg() - 
1)) {
+            increaseAvailablePermits(cnx);
+        }
         // Lazy task scheduling to expire incomplete chunk message
         if (expireTimeOfIncompleteChunkedMessageMillis > 0 && 
expireChunkMessageTaskScheduled.compareAndSet(false,
                 true)) {
@@ -1429,6 +1431,37 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
         if (msgMetadata.getChunkId() == 0) {
             if (chunkedMsgCtx != null) {
+                // Handle ack hole case when receive duplicated chunks.
+                // There are two situation that receives chunks with the same 
sequence ID and chunk ID.
+                // Situation 1 - Message redeliver:
+                // For example:
+                //     Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+                //     Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+                //     Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:1
+                //     Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:2
+                //     Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:3
+                // In this case, chunk-3 and chunk-4 have the same msgID with 
chunk-1 and chunk-2.
+                // This may be caused by message redeliver, we can't ack any 
chunk in this case here.
+                // Situation 2 - Corrupted chunk message
+                // For example:
+                //     Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+                //     Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+                //     Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:3
+                //     Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
+                //     Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
+                // In this case, all the chunks with different msgIDs and are 
persistent in the topic.
+                // But Chunk-1 and Chunk-2 belong to a corrupted chunk message 
that must be skipped since
+                // they will not be delivered to end users. So we should ack 
them here to avoid ack hole.
+                boolean isCorruptedChunkMessageDetected = 
Arrays.stream(chunkedMsgCtx.chunkedMessageIds)
+                        .noneMatch(messageId1 -> messageId1 != null && 
messageId1.ledgerId == messageId.getLedgerId()
+                                && messageId1.entryId == 
messageId.getEntryId());
+                if (isCorruptedChunkMessageDetected) {
+                    
Arrays.stream(chunkedMsgCtx.chunkedMessageIds).forEach(messageId1 -> {
+                        if (messageId1 != null) {
+                            doAcknowledge(messageId1, AckType.Individual, 
Collections.emptyMap(), null);
+                        }
+                    });
+                }
                 // The first chunk of a new chunked-message received before 
receiving other chunks of previous
                 // chunked-message
                 // so, remove previous chunked-message from map and release 
buffer
@@ -1468,11 +1501,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                         msgMetadata.getProducerName(), msgId, 
chunkedMsgCtx.lastChunkedMessageId,
                         msgMetadata.getChunkId(), msgMetadata.getSequenceId());
                 compressedPayload.release();
-                increaseAvailablePermits(cnx);
-                boolean repeatedlyReceived = 
Arrays.stream(chunkedMsgCtx.chunkedMessageIds)
-                        .anyMatch(messageId1 -> messageId1 != null && 
messageId1.ledgerId == messageId.getLedgerId()
+                // Just like the above logic of receiving the first chunk 
again. We only ack this chunk in the message
+                // duplication case.
+                boolean isDuplicatedChunk = 
Arrays.stream(chunkedMsgCtx.chunkedMessageIds)
+                        .noneMatch(messageId1 -> messageId1 != null && 
messageId1.ledgerId == messageId.getLedgerId()
                                 && messageId1.entryId == 
messageId.getEntryId());
-                if (!repeatedlyReceived) {
+                if (isDuplicatedChunk) {
                     doAcknowledge(msgId, AckType.Individual, 
Collections.emptyMap(), null);
                 }
                 return null;
@@ -1489,7 +1523,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             }
             chunkedMessagesMap.remove(msgMetadata.getUuid());
             compressedPayload.release();
-            increaseAvailablePermits(cnx);
             if (expireTimeOfIncompleteChunkedMessageMillis > 0
                     && System.currentTimeMillis() > 
(msgMetadata.getPublishTime()
                             + expireTimeOfIncompleteChunkedMessageMillis)) {
@@ -1508,7 +1541,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         // if final chunk is not received yet then release payload and return
         if (msgMetadata.getChunkId() != (msgMetadata.getNumChunksFromMsg() - 
1)) {
             compressedPayload.release();
-            increaseAvailablePermits(cnx);
             return null;
         }
 

Reply via email to