This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 12cca8a5c9cef45eba0cd72c3df9d30c95931af1 Author: lipenghui <peng...@apache.org> AuthorDate: Tue Jul 28 23:49:42 2020 +0800 Fix backward compatibility issues with batch index acknowledgment. (#7655) ### Motivation Fix backward compatibility issues with batch index acknowledgment. ### Modifications Disable batch index acknowledgment by default at the consumer side. (cherry picked from commit fffd9f144bb14a220d17e951fea29b16ad2db103) --- .../client/impl/BatchMessageIndexAckTest.java | 3 ++ .../apache/pulsar/client/api/ConsumerBuilder.java | 6 +++ .../pulsar/client/impl/ConsumerBuilderImpl.java | 6 +++ .../apache/pulsar/client/impl/ConsumerImpl.java | 8 ++-- .../impl/conf/ConsumerConfigurationData.java | 2 + ...t2_2.java => PulsarStandaloneTestSuite2_5.java} | 26 +++++++----- .../backwardscompatibility/SmokeTest2_2.java | 4 ++ .../backwardscompatibility/SmokeTest2_3.java | 4 ++ .../backwardscompatibility/SmokeTest2_4.java | 4 ++ .../{SmokeTest2_2.java => SmokeTest2_5.java} | 6 ++- .../integration/containers/PulsarContainer.java | 1 + .../integration/topologies/PulsarTestBase.java | 46 ++++++++++++++++++++++ 12 files changed, 102 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java index 3150f10..582d461 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java @@ -67,6 +67,7 @@ public class BatchMessageIndexAckTest extends ProducerConsumerBase { .subscriptionName("sub") .receiverQueueSize(100) .subscriptionType(SubscriptionType.Shared) + .enableBatchIndexAcknowledgment(true) .negativeAckRedeliveryDelay(2, TimeUnit.SECONDS) .subscribe(); @@ -125,6 +126,7 @@ public class BatchMessageIndexAckTest extends ProducerConsumerBase { .topic(topic) .subscriptionName("sub") .receiverQueueSize(100) + .enableBatchIndexAcknowledgment(true) .subscribe(); @Cleanup @@ -194,6 +196,7 @@ public class BatchMessageIndexAckTest extends ProducerConsumerBase { Consumer<byte[]> consumer = pulsarClient.newConsumer() .acknowledgmentGroupTime(1, TimeUnit.MILLISECONDS) .topic(topic) + .enableBatchIndexAcknowledgment(true) .subscriptionName("test") .subscribe(); diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index def1807..9bb8ce0 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -610,6 +610,12 @@ public interface ConsumerBuilder<T> extends Cloneable { ConsumerBuilder<T> enableRetry(boolean retryEnable); /** + * Enable or disable the batch index acknowledgment. To enable this feature must ensure batch index acknowledgment + * feature is enabled at the broker side. + */ + ConsumerBuilder<T> enableBatchIndexAcknowledgment(boolean batchIndexAcknowledgmentEnabled); + + /** * Consumer buffers chunk messages into memory until it receives all the chunks of the original message. While * consuming chunk-messages, chunks from same message might not be contiguous in the stream and they might be mixed * with other messages' chunks. so, consumer has to maintain multiple buffers to manage chunks coming from different diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index 9240ab3..64eadfe 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -416,6 +416,12 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> { } @Override + public ConsumerBuilder<T> enableBatchIndexAcknowledgment(boolean batchIndexAcknowledgmentEnabled) { + conf.setBatchIndexAckEnabled(batchIndexAcknowledgmentEnabled); + return this; + } + + @Override public ConsumerBuilder<T> expireTimeOfIncompleteChunkedMessage(long duration, TimeUnit unit) { conf.setExpireTimeOfIncompleteChunkedMessageMillis(unit.toMillis(duration)); return null; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 10320da..28995ad 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -543,9 +543,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle ackType); } } else { - BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId; - acknowledgmentsGroupingTracker.addBatchIndexAcknowledgment(batchMessageId, batchMessageId.getBatchIndex(), - batchMessageId.getBatchSize(), ackType, properties); + if (conf.isBatchIndexAckEnabled()) { + BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId; + acknowledgmentsGroupingTracker.addBatchIndexAcknowledgment(batchMessageId, batchMessageId.getBatchIndex(), + batchMessageId.getBatchSize(), ackType, properties); + } // other messages in batch are still pending ack. return CompletableFuture.completedFuture(null); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index eb82703..14595e6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -125,6 +125,8 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable { private KeySharedPolicy keySharedPolicy; + private boolean batchIndexAckEnabled = false; + @JsonIgnore public String getSingleTopic() { checkArgument(topicNames.size() == 1); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/PulsarStandaloneTestSuite2_5.java similarity index 56% copy from tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java copy to tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/PulsarStandaloneTestSuite2_5.java index 7c1c2a1..cf88ca2 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/PulsarStandaloneTestSuite2_5.java @@ -16,21 +16,27 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.pulsar.tests.integration.backwardscompatibility; -import org.testng.annotations.Test; +import org.apache.pulsar.tests.integration.containers.PulsarContainer; +import org.apache.pulsar.tests.integration.topologies.PulsarStandaloneTestBase; +import org.testng.ITest; +import org.testng.annotations.AfterSuite; +import org.testng.annotations.BeforeSuite; -public class SmokeTest2_2 extends PulsarStandaloneTestSuite2_2 { +public class PulsarStandaloneTestSuite2_5 extends PulsarStandaloneTestBase implements ITest { - @Test(dataProvider = "StandaloneServiceUrlAndTopics") - public void testPublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception { - super.testPublishAndConsume(serviceUrl, isPersistent); + @BeforeSuite + public void setUpCluster() throws Exception { + super.startCluster(PulsarContainer.PULSAR_2_5_IMAGE_NAME); } - @Test(dataProvider = "StandaloneServiceUrlAndTopics") - public void testBatchMessagePublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception { - super.testBatchMessagePublishAndConsume(serviceUrl, isPersistent); + @AfterSuite + public void tearDownCluster() throws Exception { + super.stopCluster(); + } + @Override + public String getTestName() { + return "pulsar-standalone-suite"; } - } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java index 7c1c2a1..20e9926 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java @@ -33,4 +33,8 @@ public class SmokeTest2_2 extends PulsarStandaloneTestSuite2_2 { super.testBatchMessagePublishAndConsume(serviceUrl, isPersistent); } + @Test(dataProvider = "StandaloneServiceUrlAndTopics") + public void testBatchIndexAckDisabled(String serviceUrl, boolean isPersistent) throws Exception { + super.testBatchIndexAckDisabled(serviceUrl); + } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_3.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_3.java index ab317d0..e1b37e3 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_3.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_3.java @@ -33,4 +33,8 @@ public class SmokeTest2_3 extends PulsarStandaloneTestSuite2_3 { super.testBatchMessagePublishAndConsume(serviceUrl, isPersistent); } + @Test(dataProvider = "StandaloneServiceUrlAndTopics") + public void testBatchIndexAckDisabled(String serviceUrl, boolean isPersistent) throws Exception { + super.testBatchIndexAckDisabled(serviceUrl); + } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_4.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_4.java index d74ad8e..eb77eaa 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_4.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_4.java @@ -33,4 +33,8 @@ public class SmokeTest2_4 extends PulsarStandaloneTestSuite2_4 { super.testBatchMessagePublishAndConsume(serviceUrl, isPersistent); } + @Test(dataProvider = "StandaloneServiceUrlAndTopics") + public void testBatchIndexAckDisabled(String serviceUrl, boolean isPersistent) throws Exception { + super.testBatchIndexAckDisabled(serviceUrl); + } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_5.java similarity index 83% copy from tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java copy to tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_5.java index 7c1c2a1..2bcf584 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_5.java @@ -21,7 +21,7 @@ package org.apache.pulsar.tests.integration.backwardscompatibility; import org.testng.annotations.Test; -public class SmokeTest2_2 extends PulsarStandaloneTestSuite2_2 { +public class SmokeTest2_5 extends PulsarStandaloneTestSuite2_5 { @Test(dataProvider = "StandaloneServiceUrlAndTopics") public void testPublishAndConsume(String serviceUrl, boolean isPersistent) throws Exception { @@ -33,4 +33,8 @@ public class SmokeTest2_2 extends PulsarStandaloneTestSuite2_2 { super.testBatchMessagePublishAndConsume(serviceUrl, isPersistent); } + @Test(dataProvider = "StandaloneServiceUrlAndTopics") + public void testBatchIndexAckDisabled(String serviceUrl, boolean isPersistent) throws Exception { + super.testBatchIndexAckDisabled(serviceUrl); + } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java index 15c99cc..ae5e57a 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java @@ -41,6 +41,7 @@ public abstract class PulsarContainer<SelfT extends PulsarContainer<SelfT>> exte public static final int BROKER_HTTP_PORT = 8080; public static final String DEFAULT_IMAGE_NAME = "apachepulsar/pulsar-test-latest-version:latest"; + public static final String PULSAR_2_5_IMAGE_NAME = "apachepulsar/pulsar:2.5.0"; public static final String PULSAR_2_4_IMAGE_NAME = "apachepulsar/pulsar:2.4.0"; public static final String PULSAR_2_3_IMAGE_NAME = "apachepulsar/pulsar:2.3.0"; public static final String PULSAR_2_2_IMAGE_NAME = "apachepulsar/pulsar:2.2.0"; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java index 9e75e95..da58713 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java @@ -24,13 +24,17 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.util.FutureUtil; +import org.junit.Assert; public class PulsarTestBase { @@ -130,4 +134,46 @@ public class PulsarTestBase { } } + public void testBatchIndexAckDisabled(String serviceUrl) throws Exception { + String topicName = generateTopicName("test-batch-index-ack-disabled", true); + final int numMessages = 100; + try (PulsarClient client = PulsarClient.builder() + .serviceUrl(serviceUrl) + .build()) { + + try (Consumer<Integer> consumer = client.newConsumer(Schema.INT32) + .topic(topicName) + .subscriptionName("sub") + .receiverQueueSize(100) + .subscriptionType(SubscriptionType.Shared) + .enableBatchIndexAcknowledgment(false) + .ackTimeout(1, TimeUnit.SECONDS) + .subscribe();) { + + try (Producer<Integer> producer = client.newProducer(Schema.INT32) + .topic(topicName) + .batchingMaxPublishDelay(50, TimeUnit.MILLISECONDS) + .create()) { + + List<CompletableFuture<MessageId>> futures = new ArrayList<>(); + for (int i = 0; i < numMessages; i++) { + futures.add(producer.sendAsync(i)); + } + // Wait for all messages are publish succeed. + FutureUtil.waitForAll(futures).get(); + } + + for (int i = 0; i < numMessages; i++) { + Message<Integer> m = consumer.receive(); + if (i % 2 == 0) { + consumer.acknowledge(m); + } + } + + Message<Integer> redelivery = consumer.receive(3, TimeUnit.SECONDS); + Assert.assertNotNull(redelivery); + } + } + } + }