This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 638eb55 Fixed ack message tracker cleanup for single message batches (#2224) 638eb55 is described below commit 638eb55970f2d30eb09282731f0402903b5ee469 Author: Matteo Merli <mme...@apache.org> AuthorDate: Wed Jul 25 02:00:16 2018 -0700 Fixed ack message tracker cleanup for single message batches (#2224) ### Motivation Fixes #2221 The issue is that we were removing a batch message id impl from the tracking set which contains the batch-wide ids. --- .../impl/UnAcknowledgedMessagesTimeoutTest.java | 32 ++++++++++++++++++++++ .../apache/pulsar/client/impl/ConsumerImpl.java | 28 +++++++++---------- 2 files changed, 46 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java index 0b5126b..da53760 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/UnAcknowledgedMessagesTimeoutTest.java @@ -19,6 +19,8 @@ package org.apache.pulsar.client.impl; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; import java.util.HashSet; import java.util.concurrent.TimeUnit; @@ -30,6 +32,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -386,4 +389,33 @@ public class UnAcknowledgedMessagesTimeoutTest extends BrokerTestBase { assertEquals(consumer.getUnAckedMessageTracker().size(), 0); } + + @Test + public void testSingleMessageBatch() throws Exception { + String topicName = "prop/ns-abc/topic-estSingleMessageBatch"; + + Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .enableBatching(true) + .batchingMaxPublishDelay(10, TimeUnit.SECONDS) + .create(); + + Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topicName) + .subscriptionName("subscription") + .ackTimeout(1, TimeUnit.HOURS) + .subscribe(); + + // Force the creation of a batch with a single message + producer.sendAsync("hello"); + producer.flush(); + + Message<String> message = consumer.receive(); + + assertFalse(((ConsumerImpl<?>) consumer).getUnAckedMessageTracker().isEmpty()); + + consumer.acknowledge(message); + + assertTrue(((ConsumerImpl<?>) consumer).getUnAckedMessageTracker().isEmpty()); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 6f78119..1c69c75 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -385,11 +385,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle log.debug("[{}] [{}] can ack message to broker {}, acktype {}, cardinality {}, length {}", subscription, consumerName, batchMessageId, ackType, outstandingAcks, batchSize); } - // increment Acknowledge-msg counter with number of messages in batch only if AckType is Individual. - // CumulativeAckType is handled while sending ack to broker - if (ackType == AckType.Individual) { - stats.incrementNumAcksSent(batchSize); - } return true; } else { if (AckType.Cumulative == ackType @@ -433,11 +428,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle Map<String,Long> properties) { MessageIdImpl msgId = (MessageIdImpl) messageId; - if (ackType == AckType.Individual) { - unAckedMessageTracker.remove(msgId); - // increment counter by 1 for non-batch msg - if (!(messageId instanceof BatchMessageIdImpl)) { + if (messageId instanceof BatchMessageIdImpl) { + BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId; + + stats.incrementNumAcksSent(batchMessageId.getBatchSize()); + unAckedMessageTracker.remove(new MessageIdImpl(batchMessageId.getLedgerId(), + batchMessageId.getEntryId(), batchMessageId.getPartitionIndex())); + } else { + // increment counter by 1 for non-batch msg + unAckedMessageTracker.remove(msgId); stats.incrementNumAcksSent(1); } } else if (ackType == AckType.Cumulative) { @@ -717,16 +717,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } ByteBuf decryptedPayload = decryptPayloadIfNeeded(messageId, msgMetadata, payload, cnx); - + boolean isMessageUndecryptable = isMessageUndecryptable(msgMetadata); - + if (decryptedPayload == null) { // Message was discarded or CryptoKeyReader isn't implemented return; } - + // uncompress decryptedPayload and release decryptedPayload-ByteBuf - ByteBuf uncompressedPayload = isMessageUndecryptable ? decryptedPayload.retain() + ByteBuf uncompressedPayload = isMessageUndecryptable ? decryptedPayload.retain() : uncompressPayloadIfNeeded(messageId, msgMetadata, decryptedPayload, cnx); decryptedPayload.release(); if (uncompressedPayload == null) { @@ -1348,7 +1348,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle /** * Create EncryptionContext if message payload is encrypted - * + * * @param msgMetadata * @return {@link Optional}<{@link EncryptionContext}> */