This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 12cca8a5c9cef45eba0cd72c3df9d30c95931af1
Author: lipenghui <peng...@apache.org>
AuthorDate: Tue Jul 28 23:49:42 2020 +0800

    Fix backward compatibility issues with batch index acknowledgment. (#7655)
    
    ### Motivation
    
    Fix backward compatibility issues with batch index acknowledgment.
    
    ### Modifications
    
    Disable batch index acknowledgment by default at the consumer side.
    
    (cherry picked from commit fffd9f144bb14a220d17e951fea29b16ad2db103)
---
 .../client/impl/BatchMessageIndexAckTest.java      |  3 ++
 .../apache/pulsar/client/api/ConsumerBuilder.java  |  6 +++
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |  6 +++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  8 ++--
 .../impl/conf/ConsumerConfigurationData.java       |  2 +
 ...t2_2.java => PulsarStandaloneTestSuite2_5.java} | 26 +++++++-----
 .../backwardscompatibility/SmokeTest2_2.java       |  4 ++
 .../backwardscompatibility/SmokeTest2_3.java       |  4 ++
 .../backwardscompatibility/SmokeTest2_4.java       |  4 ++
 .../{SmokeTest2_2.java => SmokeTest2_5.java}       |  6 ++-
 .../integration/containers/PulsarContainer.java    |  1 +
 .../integration/topologies/PulsarTestBase.java     | 46 ++++++++++++++++++++++
 12 files changed, 102 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
index 3150f10..582d461 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
@@ -67,6 +67,7 @@ public class BatchMessageIndexAckTest extends 
ProducerConsumerBase {
             .subscriptionName("sub")
             .receiverQueueSize(100)
             .subscriptionType(SubscriptionType.Shared)
+            .enableBatchIndexAcknowledgment(true)
             .negativeAckRedeliveryDelay(2, TimeUnit.SECONDS)
             .subscribe();
 
@@ -125,6 +126,7 @@ public class BatchMessageIndexAckTest extends 
ProducerConsumerBase {
             .topic(topic)
             .subscriptionName("sub")
             .receiverQueueSize(100)
+            .enableBatchIndexAcknowledgment(true)
             .subscribe();
 
         @Cleanup
@@ -194,6 +196,7 @@ public class BatchMessageIndexAckTest extends 
ProducerConsumerBase {
         Consumer<byte[]> consumer = pulsarClient.newConsumer()
                 .acknowledgmentGroupTime(1, TimeUnit.MILLISECONDS)
                 .topic(topic)
+                .enableBatchIndexAcknowledgment(true)
                 .subscriptionName("test")
                 .subscribe();
 
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index def1807..9bb8ce0 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -610,6 +610,12 @@ public interface ConsumerBuilder<T> extends Cloneable {
     ConsumerBuilder<T> enableRetry(boolean retryEnable);
 
     /**
+     * Enable or disable the batch index acknowledgment. To enable this 
feature must ensure batch index acknowledgment
+     * feature is enabled at the broker side.
+     */
+    ConsumerBuilder<T> enableBatchIndexAcknowledgment(boolean 
batchIndexAcknowledgmentEnabled);
+
+    /**
      * Consumer buffers chunk messages into memory until it receives all the 
chunks of the original message. While
      * consuming chunk-messages, chunks from same message might not be 
contiguous in the stream and they might be mixed
      * with other messages' chunks. so, consumer has to maintain multiple 
buffers to manage chunks coming from different
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 9240ab3..64eadfe 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -416,6 +416,12 @@ public class ConsumerBuilderImpl<T> implements 
ConsumerBuilder<T> {
     }
 
     @Override
+    public ConsumerBuilder<T> enableBatchIndexAcknowledgment(boolean 
batchIndexAcknowledgmentEnabled) {
+        conf.setBatchIndexAckEnabled(batchIndexAcknowledgmentEnabled);
+        return this;
+    }
+
+    @Override
     public ConsumerBuilder<T> expireTimeOfIncompleteChunkedMessage(long 
duration, TimeUnit unit) {
         
conf.setExpireTimeOfIncompleteChunkedMessageMillis(unit.toMillis(duration));
         return null;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 10320da..28995ad 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -543,9 +543,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                             ackType);
                 }
             } else {
-                BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) 
messageId;
-                
acknowledgmentsGroupingTracker.addBatchIndexAcknowledgment(batchMessageId, 
batchMessageId.getBatchIndex(),
-                    batchMessageId.getBatchSize(), ackType, properties);
+                if (conf.isBatchIndexAckEnabled()) {
+                    BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) 
messageId;
+                    
acknowledgmentsGroupingTracker.addBatchIndexAcknowledgment(batchMessageId, 
batchMessageId.getBatchIndex(),
+                            batchMessageId.getBatchSize(), ackType, 
properties);
+                }
                 // other messages in batch are still pending ack.
                 return CompletableFuture.completedFuture(null);
             }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index eb82703..14595e6 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -125,6 +125,8 @@ public class ConsumerConfigurationData<T> implements 
Serializable, Cloneable {
 
     private KeySharedPolicy keySharedPolicy;
 
+    private boolean batchIndexAckEnabled = false;
+
     @JsonIgnore
     public String getSingleTopic() {
         checkArgument(topicNames.size() == 1);
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/PulsarStandaloneTestSuite2_5.java
similarity index 56%
copy from 
tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java
copy to 
tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/PulsarStandaloneTestSuite2_5.java
index 7c1c2a1..cf88ca2 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/PulsarStandaloneTestSuite2_5.java
@@ -16,21 +16,27 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.pulsar.tests.integration.backwardscompatibility;
 
-import org.testng.annotations.Test;
+import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarStandaloneTestBase;
+import org.testng.ITest;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
 
-public class SmokeTest2_2 extends PulsarStandaloneTestSuite2_2 {
+public class PulsarStandaloneTestSuite2_5 extends PulsarStandaloneTestBase 
implements ITest {
 
-    @Test(dataProvider = "StandaloneServiceUrlAndTopics")
-    public void testPublishAndConsume(String serviceUrl, boolean isPersistent) 
throws Exception {
-        super.testPublishAndConsume(serviceUrl, isPersistent);
+    @BeforeSuite
+    public void setUpCluster() throws Exception {
+        super.startCluster(PulsarContainer.PULSAR_2_5_IMAGE_NAME);
     }
 
-    @Test(dataProvider = "StandaloneServiceUrlAndTopics")
-    public void testBatchMessagePublishAndConsume(String serviceUrl, boolean 
isPersistent) throws Exception {
-        super.testBatchMessagePublishAndConsume(serviceUrl, isPersistent);
+    @AfterSuite
+    public void tearDownCluster() throws Exception {
+        super.stopCluster();
+    }
+    @Override
+    public String getTestName() {
+        return "pulsar-standalone-suite";
     }
-
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java
index 7c1c2a1..20e9926 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java
@@ -33,4 +33,8 @@ public class SmokeTest2_2 extends 
PulsarStandaloneTestSuite2_2 {
         super.testBatchMessagePublishAndConsume(serviceUrl, isPersistent);
     }
 
+    @Test(dataProvider = "StandaloneServiceUrlAndTopics")
+    public void testBatchIndexAckDisabled(String serviceUrl, boolean 
isPersistent) throws Exception {
+        super.testBatchIndexAckDisabled(serviceUrl);
+    }
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_3.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_3.java
index ab317d0..e1b37e3 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_3.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_3.java
@@ -33,4 +33,8 @@ public class SmokeTest2_3 extends 
PulsarStandaloneTestSuite2_3 {
         super.testBatchMessagePublishAndConsume(serviceUrl, isPersistent);
     }
 
+    @Test(dataProvider = "StandaloneServiceUrlAndTopics")
+    public void testBatchIndexAckDisabled(String serviceUrl, boolean 
isPersistent) throws Exception {
+        super.testBatchIndexAckDisabled(serviceUrl);
+    }
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_4.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_4.java
index d74ad8e..eb77eaa 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_4.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_4.java
@@ -33,4 +33,8 @@ public class SmokeTest2_4 extends 
PulsarStandaloneTestSuite2_4 {
         super.testBatchMessagePublishAndConsume(serviceUrl, isPersistent);
     }
 
+    @Test(dataProvider = "StandaloneServiceUrlAndTopics")
+    public void testBatchIndexAckDisabled(String serviceUrl, boolean 
isPersistent) throws Exception {
+        super.testBatchIndexAckDisabled(serviceUrl);
+    }
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_5.java
similarity index 83%
copy from 
tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java
copy to 
tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_5.java
index 7c1c2a1..2bcf584 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_2.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/SmokeTest2_5.java
@@ -21,7 +21,7 @@ package 
org.apache.pulsar.tests.integration.backwardscompatibility;
 
 import org.testng.annotations.Test;
 
-public class SmokeTest2_2 extends PulsarStandaloneTestSuite2_2 {
+public class SmokeTest2_5 extends PulsarStandaloneTestSuite2_5 {
 
     @Test(dataProvider = "StandaloneServiceUrlAndTopics")
     public void testPublishAndConsume(String serviceUrl, boolean isPersistent) 
throws Exception {
@@ -33,4 +33,8 @@ public class SmokeTest2_2 extends 
PulsarStandaloneTestSuite2_2 {
         super.testBatchMessagePublishAndConsume(serviceUrl, isPersistent);
     }
 
+    @Test(dataProvider = "StandaloneServiceUrlAndTopics")
+    public void testBatchIndexAckDisabled(String serviceUrl, boolean 
isPersistent) throws Exception {
+        super.testBatchIndexAckDisabled(serviceUrl);
+    }
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
index 15c99cc..ae5e57a 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java
@@ -41,6 +41,7 @@ public abstract class PulsarContainer<SelfT extends 
PulsarContainer<SelfT>> exte
     public static final int BROKER_HTTP_PORT = 8080;
 
     public static final String DEFAULT_IMAGE_NAME = 
"apachepulsar/pulsar-test-latest-version:latest";
+    public static final String PULSAR_2_5_IMAGE_NAME = 
"apachepulsar/pulsar:2.5.0";
     public static final String PULSAR_2_4_IMAGE_NAME = 
"apachepulsar/pulsar:2.4.0";
     public static final String PULSAR_2_3_IMAGE_NAME = 
"apachepulsar/pulsar:2.3.0";
     public static final String PULSAR_2_2_IMAGE_NAME = 
"apachepulsar/pulsar:2.2.0";
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java
index 9e75e95..da58713 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java
@@ -24,13 +24,17 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.junit.Assert;
 
 public class PulsarTestBase {
 
@@ -130,4 +134,46 @@ public class PulsarTestBase {
         }
     }
 
+    public void testBatchIndexAckDisabled(String serviceUrl) throws Exception {
+        String topicName = generateTopicName("test-batch-index-ack-disabled", 
true);
+        final int numMessages = 100;
+        try (PulsarClient client = PulsarClient.builder()
+                .serviceUrl(serviceUrl)
+                .build()) {
+
+            try (Consumer<Integer> consumer = client.newConsumer(Schema.INT32)
+                    .topic(topicName)
+                    .subscriptionName("sub")
+                    .receiverQueueSize(100)
+                    .subscriptionType(SubscriptionType.Shared)
+                    .enableBatchIndexAcknowledgment(false)
+                    .ackTimeout(1, TimeUnit.SECONDS)
+                    .subscribe();) {
+
+                try (Producer<Integer> producer = 
client.newProducer(Schema.INT32)
+                        .topic(topicName)
+                        .batchingMaxPublishDelay(50, TimeUnit.MILLISECONDS)
+                        .create()) {
+
+                    List<CompletableFuture<MessageId>> futures = new 
ArrayList<>();
+                    for (int i = 0; i < numMessages; i++) {
+                        futures.add(producer.sendAsync(i));
+                    }
+                    // Wait for all messages are publish succeed.
+                    FutureUtil.waitForAll(futures).get();
+                }
+
+                for (int i = 0; i < numMessages; i++) {
+                    Message<Integer> m = consumer.receive();
+                    if (i % 2 == 0) {
+                        consumer.acknowledge(m);
+                    }
+                }
+
+                Message<Integer> redelivery = consumer.receive(3, 
TimeUnit.SECONDS);
+                Assert.assertNotNull(redelivery);
+            }
+        }
+    }
+
 }

Reply via email to