This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 1de29156052 [fix][client] Fix failover/exclusive consumer with batch
cumulate ack issue. (#18454)
1de29156052 is described below
commit 1de291560524218ad35be6bbbc30e3acc95b53ee
Author: Jiwei Guo <[email protected]>
AuthorDate: Tue Nov 15 11:37:16 2022 +0800
[fix][client] Fix failover/exclusive consumer with batch cumulate ack
issue. (#18454)
(cherry picked from commit 7712aa396bfca55a79a45761e0a405e90185e0f8)
---
.../impl/ConsumerDedupPermitsUpdateTest.java | 21 ++++--
.../pulsar/client/impl/NegativeAcksTest.java | 77 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerImpl.java | 9 ++-
.../client/impl/MultiTopicsConsumerImpl.java | 3 -
4 files changed, 100 insertions(+), 10 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java
index 4c9922acbec..ceb7d7fd484 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java
@@ -116,10 +116,23 @@ public class ConsumerDedupPermitsUpdateTest extends
ProducerConsumerBase {
}
producer.flush();
- for (int i = 0; i < 30; i++) {
- Message<String> msg = consumer.receive();
- assertEquals(msg.getValue(), "new-message-" + i);
- consumer.acknowledge(msg);
+ if (batchingEnabled) {
+ for (int i = 0; i < 30; i++) {
+ Message<String> msg = consumer.receive();
+ assertEquals(msg.getValue(), "hello-" + i);
+ consumer.acknowledge(msg);
+ }
+ for (int i = 0; i < 30; i++) {
+ Message<String> msg = consumer.receive();
+ assertEquals(msg.getValue(), "new-message-" + i);
+ consumer.acknowledge(msg);
+ }
+ } else {
+ for (int i = 0; i < 30; i++) {
+ Message<String> msg = consumer.receive();
+ assertEquals(msg.getValue(), "new-message-" + i);
+ consumer.acknowledge(msg);
+ }
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index 5eb43af38f7..de130b78270 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -23,6 +23,7 @@ import static org.testng.Assert.assertNull;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
@@ -35,6 +36,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
@@ -154,4 +156,79 @@ public class NegativeAcksTest extends ProducerConsumerBase
{
consumer.close();
producer.close();
}
+
+ @Test
+ public void testFailoverConsumerBatchCumulateAck() throws Exception {
+ final String topic = BrokerTestUtil.newUniqueName("my-topic");
+ admin.topics().createPartitionedTopic(topic, 2);
+
+ @Cleanup
+ Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .subscriptionName("sub")
+ .subscriptionType(SubscriptionType.Failover)
+ .enableBatchIndexAcknowledgment(true)
+ .acknowledgmentGroupTime(100, TimeUnit.MILLISECONDS)
+ .receiverQueueSize(10)
+ .subscribe();
+
+ @Cleanup
+ Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+ .topic(topic)
+ .batchingMaxMessages(10)
+ .batchingMaxPublishDelay(3, TimeUnit.SECONDS)
+ .blockIfQueueFull(true)
+ .create();
+
+ int count = 0;
+ Set<Integer> datas = new HashSet<>();
+ CountDownLatch producerLatch = new CountDownLatch(10);
+ while (count < 10) {
+ datas.add(count);
+ producer.sendAsync(count).whenComplete((m, e) -> {
+ producerLatch.countDown();
+ });
+ count++;
+ }
+ producerLatch.await();
+ CountDownLatch consumerLatch = new CountDownLatch(1);
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ consumer.receiveAsync()
+ .thenCompose(m -> {
+ log.info("received one msg : {}",
m.getMessageId());
+ datas.remove(m.getValue());
+ return consumer.acknowledgeCumulativeAsync(m);
+ })
+ .thenAccept(ignore -> {
+ try {
+ Thread.sleep(500);
+ consumer.redeliverUnacknowledgedMessages();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .whenComplete((r, e) -> {
+ consumerLatch.countDown();
+ });
+ }
+ }).start();
+ consumerLatch.await();
+ Thread.sleep(500);
+ count = 0;
+ while(true) {
+ Message<Integer> msg = consumer.receive(5, TimeUnit.SECONDS);
+ if (msg == null) {
+ break;
+ }
+ consumer.acknowledgeCumulative(msg);
+ Thread.sleep(200);
+ datas.remove(msg.getValue());
+ log.info("received msg : {}", msg.getMessageId());
+ count++;
+ }
+ Assert.assertEquals(count, 9);
+ Assert.assertEquals(0, datas.size());
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 1a185d4c17d..5381b280330 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1174,9 +1174,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
final int numMessages = msgMetadata.getNumMessagesInBatch();
final int numChunks = msgMetadata.hasNumChunksFromMsg() ?
msgMetadata.getNumChunksFromMsg() : 0;
final boolean isChunkedMessage = numChunks > 1 &&
conf.getSubscriptionType() != SubscriptionType.Shared;
-
MessageIdImpl msgId = new MessageIdImpl(messageId.getLedgerId(),
messageId.getEntryId(), getPartitionIndex());
- if (acknowledgmentsGroupingTracker.isDuplicate(msgId)) {
+ if (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch()
+ && acknowledgmentsGroupingTracker.isDuplicate(msgId)) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Ignoring message as it was already being
acked earlier by same consumer {}/{}",
topic, subscription, consumerName, msgId);
@@ -1429,7 +1429,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
skippedMessages++;
continue;
}
-
+ }
+ if
(acknowledgmentsGroupingTracker.isDuplicate(message.getMessageId())) {
+ skippedMessages++;
+ continue;
}
executeNotifyCallback(message);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index e5cc32af8fe..b2de0e3b92c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -985,7 +985,6 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
partitionIndex -> {
String partitionName =
TopicName.get(topicName).getPartition(partitionIndex).toString();
CompletableFuture<Consumer<T>> subFuture = new
CompletableFuture<>();
- configurationData.setStartPaused(paused);
ConsumerImpl<T> newConsumer =
createInternalConsumer(configurationData, partitionName,
partitionIndex, subFuture,
createIfDoesNotExist, schema);
synchronized (pauseMutex) {
@@ -1012,7 +1011,6 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
subscribeResult.completeExceptionally(new
PulsarClientException(errorMessage));
return existingValue;
} else {
- internalConfig.setStartPaused(paused);
ConsumerImpl<T> newConsumer =
createInternalConsumer(internalConfig, topicName,
-1, subFuture, createIfDoesNotExist, schema);
@@ -1328,7 +1326,6 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
int partitionIndex =
TopicName.getPartitionIndex(partitionName);
CompletableFuture<Consumer<T>> subFuture = new
CompletableFuture<>();
ConsumerConfigurationData<T> configurationData =
getInternalConsumerConfig();
- configurationData.setStartPaused(paused);
ConsumerImpl<T> newConsumer =
createInternalConsumer(configurationData, partitionName,
partitionIndex, subFuture, true, schema);
synchronized (pauseMutex) {