This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 0cc5e81 Issue #1452: remove reachedEndOfTopic in addConsumer (#2301) 0cc5e81 is described below commit 0cc5e81fcc5ac3859ee463556e6c1ed7e05d29a3 Author: Jia Zhai <jiaz...@users.noreply.github.com> AuthorDate: Mon Aug 6 07:49:42 2018 +0800 Issue #1452: remove reachedEndOfTopic in addConsumer (#2301) ### Motivation Fixes #1452 In issue #1452 , reachedEndOfTopic was called twice if a topic has been terminated before subscription. It may be better to call `reachedEndOfTopic`, when real read/ack happened to the subscription, so delete the calling in `addConsumer` to avoid dup calling. ### Modifications remove dup calling in `addConsumer` . add related ut. ### Result Expected all ut passed. --- .../service/persistent/PersistentSubscription.java | 5 --- .../client/api/SimpleProducerConsumerTest.java | 42 ++++++++++++++++++++-- 2 files changed, 40 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 8a77bfa..0842742 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -103,11 +103,6 @@ public class PersistentSubscription implements Subscription { throw new SubscriptionFencedException("Subscription is fenced"); } - if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog() == 0) { - // Immediately notify the consumer that there are no more available messages - consumer.reachedEndOfTopic(); - } - if (dispatcher == null || !dispatcher.isConsumerConnected()) { switch (consumer.subType()) { case Exclusive: diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 79166a1..61bdad0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -2434,7 +2434,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1") .addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4) .cryptoKeyReader(new EncKeyReader()).create(); - + Consumer<byte[]> consumer = pulsarClient.newConsumer().topicsPattern("persistent://my-property/my-ns/myrsa-topic1") .subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME) .subscribe(); @@ -2450,7 +2450,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { consumer.close(); log.info("-- Exiting {} test --", methodName); } - + private String decryptMessage(TopicMessageImpl<byte[]> msg, String encryptionKeyName, CryptoKeyReader reader) throws Exception { Optional<EncryptionContext> ctx = msg.getEncryptionCtx(); @@ -2624,4 +2624,42 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { log.info("-- Exiting {} test --", methodName); } + // Issue 1452: https://github.com/apache/incubator-pulsar/issues/1452 + // reachedEndOfTopic should be called only once if a topic has been terminated before subscription + @Test + public void testReachedEndOfTopic() throws Exception + { + String topicName = "persistent://my-property/my-ns/testReachedEndOfTopic"; + Producer producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(false).create(); + producer.close(); + + admin.topics().terminateTopicAsync(topicName).get(); + + CountDownLatch latch = new CountDownLatch(2); + Consumer consumer = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName("my-subscriber-name") + .messageListener(new MessageListener() + { + @Override + public void reachedEndOfTopic(Consumer consumer) + { + log.info("called reachedEndOfTopic {}", methodName); + latch.countDown(); + } + + @Override + public void received(Consumer consumer, Message message) + { + // do nothing + } + }) + .subscribe(); + + assertFalse(latch.await(1, TimeUnit.SECONDS)); + assertEquals(latch.getCount(), 1); + consumer.close(); + } }