This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 638eb55  Fixed ack message tracker cleanup for single message batches 
(#2224)
638eb55 is described below

commit 638eb55970f2d30eb09282731f0402903b5ee469
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Wed Jul 25 02:00:16 2018 -0700

    Fixed ack message tracker cleanup for single message batches (#2224)
    
    ### Motivation
    
    Fixes #2221
    
    The issue is that we were removing a batch message id impl from the 
tracking set which contains the batch-wide ids.
---
 .../impl/UnAcknowledgedMessagesTimeoutTest.java    | 32 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 28 +++++++++----------
 2 files changed, 46 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
index 0b5126b..da53760 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.client.impl;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
 
 import java.util.HashSet;
 import java.util.concurrent.TimeUnit;
@@ -30,6 +32,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -386,4 +389,33 @@ public class UnAcknowledgedMessagesTimeoutTest extends 
BrokerTestBase {
         assertEquals(consumer.getUnAckedMessageTracker().size(), 0);
     }
 
+
+    @Test
+    public void testSingleMessageBatch() throws Exception {
+        String topicName = "prop/ns-abc/topic-estSingleMessageBatch";
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .enableBatching(true)
+                .batchingMaxPublishDelay(10, TimeUnit.SECONDS)
+                .create();
+
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName("subscription")
+                .ackTimeout(1, TimeUnit.HOURS)
+                .subscribe();
+
+        // Force the creation of a batch with a single message
+        producer.sendAsync("hello");
+        producer.flush();
+
+        Message<String> message = consumer.receive();
+
+        assertFalse(((ConsumerImpl<?>) 
consumer).getUnAckedMessageTracker().isEmpty());
+
+        consumer.acknowledge(message);
+
+        assertTrue(((ConsumerImpl<?>) 
consumer).getUnAckedMessageTracker().isEmpty());
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 6f78119..1c69c75 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -385,11 +385,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 log.debug("[{}] [{}] can ack message to broker {}, acktype {}, 
cardinality {}, length {}", subscription,
                         consumerName, batchMessageId, ackType, 
outstandingAcks, batchSize);
             }
-            // increment Acknowledge-msg counter with number of messages in 
batch only if AckType is Individual.
-            // CumulativeAckType is handled while sending ack to broker
-            if (ackType == AckType.Individual) {
-                stats.incrementNumAcksSent(batchSize);
-            }
             return true;
         } else {
             if (AckType.Cumulative == ackType
@@ -433,11 +428,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                                                     Map<String,Long> 
properties) {
         MessageIdImpl msgId = (MessageIdImpl) messageId;
 
-
         if (ackType == AckType.Individual) {
-            unAckedMessageTracker.remove(msgId);
-            // increment counter by 1 for non-batch msg
-            if (!(messageId instanceof BatchMessageIdImpl)) {
+            if (messageId instanceof BatchMessageIdImpl) {
+                BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) 
messageId;
+
+                stats.incrementNumAcksSent(batchMessageId.getBatchSize());
+                unAckedMessageTracker.remove(new 
MessageIdImpl(batchMessageId.getLedgerId(),
+                        batchMessageId.getEntryId(), 
batchMessageId.getPartitionIndex()));
+            } else {
+                // increment counter by 1 for non-batch msg
+                unAckedMessageTracker.remove(msgId);
                 stats.incrementNumAcksSent(1);
             }
         } else if (ackType == AckType.Cumulative) {
@@ -717,16 +717,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         }
 
         ByteBuf decryptedPayload = decryptPayloadIfNeeded(messageId, 
msgMetadata, payload, cnx);
-        
+
         boolean isMessageUndecryptable = isMessageUndecryptable(msgMetadata);
-        
+
         if (decryptedPayload == null) {
             // Message was discarded or CryptoKeyReader isn't implemented
             return;
         }
-        
+
         // uncompress decryptedPayload and release decryptedPayload-ByteBuf
-        ByteBuf uncompressedPayload = isMessageUndecryptable ? 
decryptedPayload.retain() 
+        ByteBuf uncompressedPayload = isMessageUndecryptable ? 
decryptedPayload.retain()
                 : uncompressPayloadIfNeeded(messageId, msgMetadata, 
decryptedPayload, cnx);
         decryptedPayload.release();
         if (uncompressedPayload == null) {
@@ -1348,7 +1348,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
     /**
      * Create EncryptionContext if message payload is encrypted
-     * 
+     *
      * @param msgMetadata
      * @return {@link Optional}<{@link EncryptionContext}>
      */

Reply via email to