This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new f3b50d6c114 [fix][client] Avoid ack hole for chunk message (#21101)
f3b50d6c114 is described below
commit f3b50d6c114f12ec8e65fa0234f8ab796ed5cf33
Author: Xiangying Meng <[email protected]>
AuthorDate: Mon Sep 4 08:50:49 2023 +0800
[fix][client] Avoid ack hole for chunk message (#21101)
## Motivation
Handle ack hole case:
For example:
```markdown
Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:3
Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
```
Consumer ack chunk message via ChunkMessageIdImpl that consists of all the
chunks in this chunk
message(Chunk-3, Chunk-4, Chunk-5). The Chunk-1 and Chunk-2 are not
included in the
ChunkMessageIdImpl, so we should process it here.
## Modification
Ack chunk-1 and chunk-2.
---
.../pulsar/client/impl/MessageChunkingTest.java | 33 ++++++++++++++++
.../apache/pulsar/client/impl/ConsumerImpl.java | 46 ++++++++++++++++++----
2 files changed, 72 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
index dffa0035248..f266afd8a2e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@@ -356,6 +356,38 @@ public class MessageChunkingTest extends
ProducerConsumerBase {
assertNull(consumer.receive(5, TimeUnit.SECONDS));
}
+ @Test
+ public void testResendChunkMessagesWithoutAckHole() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+ final String topicName =
"persistent://my-property/my-ns/testResendChunkMessagesWithoutAckHole";
+ final String subName = "my-subscriber-name";
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName(subName)
+ .maxPendingChunkedMessage(10)
+ .autoAckOldestChunkedMessageOnQueueFull(true)
+ .subscribe();
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName)
+ .chunkMaxMessageSize(100)
+ .enableChunking(true)
+ .enableBatching(false)
+ .create();
+
+ sendSingleChunk(producer, "0", 0, 2);
+
+ sendSingleChunk(producer, "0", 0, 2); // Resending the first chunk
+ sendSingleChunk(producer, "0", 1, 2);
+
+ Message<String> receivedMsg = consumer.receive(5, TimeUnit.SECONDS);
+ assertEquals(receivedMsg.getValue(), "chunk-0-0|chunk-0-1|");
+ consumer.acknowledge(receivedMsg);
+
assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName)
+ .getNonContiguousDeletedMessagesRanges(), 0);
+ }
+
@Test
public void testResendChunkMessages() throws Exception {
log.info("-- Starting {} test --", methodName);
@@ -395,6 +427,7 @@ public class MessageChunkingTest extends
ProducerConsumerBase {
receivedMsg = consumer.receive(5, TimeUnit.SECONDS);
assertEquals(receivedMsg.getValue(), "chunk-1-0|chunk-1-1|chunk-1-2|");
consumer.acknowledge(receivedMsg);
+ Assert.assertEquals(((ConsumerImpl<String>)
consumer).getAvailablePermits(), 8);
}
/**
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 8263e7f3198..3a542e66104 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1413,7 +1413,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
private ByteBuf processMessageChunk(ByteBuf compressedPayload,
MessageMetadata msgMetadata, MessageIdImpl msgId,
MessageIdData messageId, ClientCnx cnx) {
-
+ if (msgMetadata.getChunkId() != (msgMetadata.getNumChunksFromMsg() -
1)) {
+ increaseAvailablePermits(cnx);
+ }
// Lazy task scheduling to expire incomplete chunk message
if (expireTimeOfIncompleteChunkedMessageMillis > 0 &&
expireChunkMessageTaskScheduled.compareAndSet(false,
true)) {
@@ -1429,6 +1431,37 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if (msgMetadata.getChunkId() == 0) {
if (chunkedMsgCtx != null) {
+ // Handle ack hole case when receive duplicated chunks.
+ // There are two situation that receives chunks with the same
sequence ID and chunk ID.
+ // Situation 1 - Message redeliver:
+ // For example:
+ // Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+ // Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+ // Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:1
+ // Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:2
+ // Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:3
+ // In this case, chunk-3 and chunk-4 have the same msgID with
chunk-1 and chunk-2.
+ // This may be caused by message redeliver, we can't ack any
chunk in this case here.
+ // Situation 2 - Corrupted chunk message
+ // For example:
+ // Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+ // Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+ // Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:3
+ // Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
+ // Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
+ // In this case, all the chunks with different msgIDs and are
persistent in the topic.
+ // But Chunk-1 and Chunk-2 belong to a corrupted chunk message
that must be skipped since
+ // they will not be delivered to end users. So we should ack
them here to avoid ack hole.
+ boolean isCorruptedChunkMessageDetected =
Arrays.stream(chunkedMsgCtx.chunkedMessageIds)
+ .noneMatch(messageId1 -> messageId1 != null &&
messageId1.ledgerId == messageId.getLedgerId()
+ && messageId1.entryId ==
messageId.getEntryId());
+ if (isCorruptedChunkMessageDetected) {
+
Arrays.stream(chunkedMsgCtx.chunkedMessageIds).forEach(messageId1 -> {
+ if (messageId1 != null) {
+ doAcknowledge(messageId1, AckType.Individual,
Collections.emptyMap(), null);
+ }
+ });
+ }
// The first chunk of a new chunked-message received before
receiving other chunks of previous
// chunked-message
// so, remove previous chunked-message from map and release
buffer
@@ -1468,11 +1501,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
msgMetadata.getProducerName(), msgId,
chunkedMsgCtx.lastChunkedMessageId,
msgMetadata.getChunkId(), msgMetadata.getSequenceId());
compressedPayload.release();
- increaseAvailablePermits(cnx);
- boolean repeatedlyReceived =
Arrays.stream(chunkedMsgCtx.chunkedMessageIds)
- .anyMatch(messageId1 -> messageId1 != null &&
messageId1.ledgerId == messageId.getLedgerId()
+ // Just like the above logic of receiving the first chunk
again. We only ack this chunk in the message
+ // duplication case.
+ boolean isDuplicatedChunk =
Arrays.stream(chunkedMsgCtx.chunkedMessageIds)
+ .noneMatch(messageId1 -> messageId1 != null &&
messageId1.ledgerId == messageId.getLedgerId()
&& messageId1.entryId ==
messageId.getEntryId());
- if (!repeatedlyReceived) {
+ if (isDuplicatedChunk) {
doAcknowledge(msgId, AckType.Individual,
Collections.emptyMap(), null);
}
return null;
@@ -1489,7 +1523,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
chunkedMessagesMap.remove(msgMetadata.getUuid());
compressedPayload.release();
- increaseAvailablePermits(cnx);
if (expireTimeOfIncompleteChunkedMessageMillis > 0
&& System.currentTimeMillis() >
(msgMetadata.getPublishTime()
+ expireTimeOfIncompleteChunkedMessageMillis)) {
@@ -1508,7 +1541,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
// if final chunk is not received yet then release payload and return
if (msgMetadata.getChunkId() != (msgMetadata.getNumChunksFromMsg() -
1)) {
compressedPayload.release();
- increaseAvailablePermits(cnx);
return null;
}