This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 518120d PIP-13-3/3: auto subscribe based on regex pattern topics changing (#1298) 518120d is described below commit 518120df6a202dfcfd40f8ea657632f6d5a94a61 Author: Jia Zhai <zhaiji...@gmail.com> AuthorDate: Wed Feb 28 09:01:46 2018 -0800 PIP-13-3/3: auto subscribe based on regex pattern topics changing (#1298) * auto discovery of pattern subscribe topics changes; auto sub/unsub * change following @Matteo's comments * change to use timer --- .../client/impl/PatternTopicsConsumerImplTest.java | 355 ++++++++++++++++++++- .../apache/pulsar/client/api/ConsumerBuilder.java | 9 + .../pulsar/client/impl/ConsumerBuilderImpl.java | 7 + .../client/impl/PatternTopicsConsumerImpl.java | 139 +++++++- .../pulsar/client/impl/PulsarClientImpl.java | 30 +- .../pulsar/client/impl/TopicsConsumerImpl.java | 11 +- .../impl/conf/ConsumerConfigurationData.java | 2 + 7 files changed, 535 insertions(+), 18 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java index 26f3277..c82c525 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl; +import static org.mockito.Mockito.doReturn; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertSame; import static org.testng.Assert.assertTrue; @@ -28,14 +29,14 @@ import java.util.List; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import java.util.stream.IntStream; +import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerConfiguration; -import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.policies.data.PropertyAdmin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -136,8 +137,6 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { admin.persistentTopics().createPartitionedTopic(topicName3, 3); // 2. create producer - ProducerConfiguration producerConfiguration = new ProducerConfiguration(); - producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition); String messagePredicate = "my-message-" + key + "-"; int totalMessages = 30; @@ -152,6 +151,7 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { Consumer consumer = pulsarClient.newConsumer() .topicsPattern(pattern) + .patternAutoDiscoveryPeriod(2) .subscriptionName(subscriptionName) .subscriptionType(SubscriptionType.Shared) .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) @@ -201,4 +201,351 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { producer3.close(); } + @Test(timeOut = testTimeout) + public void testTopicsPatternFilter() throws Exception { + String topicName1 = "persistent://prop/use/ns-abc/pattern-topic-1"; + String topicName2 = "persistent://prop/use/ns-abc/pattern-topic-2"; + String topicName3 = "persistent://prop/use/ns-abc/hello-3"; + + List<String> topicsNames = Lists.newArrayList(topicName1, topicName2, topicName3); + + Pattern pattern1 = Pattern.compile("persistent://prop/use/ns-abc/pattern-topic.*"); + List<String> result1 = PulsarClientImpl.topicsPatternFilter(topicsNames, pattern1); + assertTrue(result1.size() == 2 && result1.contains(topicName1) && result1.contains(topicName2)); + + Pattern pattern2 = Pattern.compile("persistent://prop/use/ns-abc/.*"); + List<String> result2 = PulsarClientImpl.topicsPatternFilter(topicsNames, pattern2); + assertTrue(result2.size() == 3 && + result2.contains(topicName1) && + result2.contains(topicName2) && + result2.contains(topicName3)); + } + + @Test(timeOut = testTimeout) + public void testTopicsListMinus() throws Exception { + String topicName1 = "persistent://prop/use/ns-abc/pattern-topic-1"; + String topicName2 = "persistent://prop/use/ns-abc/pattern-topic-2"; + String topicName3 = "persistent://prop/use/ns-abc/pattern-topic-3"; + String topicName4 = "persistent://prop/use/ns-abc/pattern-topic-4"; + String topicName5 = "persistent://prop/use/ns-abc/pattern-topic-5"; + String topicName6 = "persistent://prop/use/ns-abc/pattern-topic-6"; + + List<String> oldNames = Lists.newArrayList(topicName1, topicName2, topicName3, topicName4); + List<String> newNames = Lists.newArrayList(topicName3, topicName4, topicName5, topicName6); + + List<String> addedNames = PatternTopicsConsumerImpl.topicsListsMinus(newNames, oldNames); + List<String> removedNames = PatternTopicsConsumerImpl.topicsListsMinus(oldNames, newNames); + + assertTrue(addedNames.size() == 2 && + addedNames.contains(topicName5) && + addedNames.contains(topicName6)); + assertTrue(removedNames.size() == 2 && + removedNames.contains(topicName1) && + removedNames.contains(topicName2)); + + // totally 2 different list, should return content of first lists. + List<String> addedNames2 = PatternTopicsConsumerImpl.topicsListsMinus(addedNames, removedNames); + assertTrue(addedNames2.size() == 2 && + addedNames2.contains(topicName5) && + addedNames2.contains(topicName6)); + + // 2 same list, should return empty list. + List<String> addedNames3 = PatternTopicsConsumerImpl.topicsListsMinus(addedNames, addedNames); + assertEquals(addedNames3.size(), 0); + + // empty list minus: addedNames2.size = 2, addedNames3.size = 0 + List<String> addedNames4 = PatternTopicsConsumerImpl.topicsListsMinus(addedNames2, addedNames3); + assertTrue(addedNames4.size() == addedNames2.size()); + addedNames4.forEach(name -> assertTrue(addedNames2.contains(name))); + + List<String> addedNames5 = PatternTopicsConsumerImpl.topicsListsMinus(addedNames3, addedNames2); + assertEquals(addedNames5.size(), 0); + } + + // simulate subscribe a pattern which has no topics, but then matched topics added in. + @Test(timeOut = testTimeout) + public void testStartEmptyPatternConsumer() throws Exception { + String key = "StartEmptyPatternConsumerTest"; + String subscriptionName = "my-ex-subscription-" + key; + String topicName1 = "persistent://prop/use/ns-abc/pattern-topic-1-" + key; + String topicName2 = "persistent://prop/use/ns-abc/pattern-topic-2-" + key; + String topicName3 = "persistent://prop/use/ns-abc/pattern-topic-3-" + key; + Pattern pattern = Pattern.compile("persistent://prop/use/ns-abc/pattern-topic.*"); + + // 1. create partition + admin.properties().createProperty("prop", new PropertyAdmin()); + admin.persistentTopics().createPartitionedTopic(topicName2, 2); + admin.persistentTopics().createPartitionedTopic(topicName3, 3); + + // 2. Create consumer, this should success, but with empty sub-consumser internal + Consumer consumer = pulsarClient.newConsumer() + .topicsPattern(pattern) + .patternAutoDiscoveryPeriod(2) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .receiverQueueSize(4) + .subscribe(); + + // 3. verify consumer get methods, to get 0 number of partitions and topics. + assertSame(pattern, ((PatternTopicsConsumerImpl) consumer).getPattern()); + assertEquals(((PatternTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 0); + assertEquals(((PatternTopicsConsumerImpl) consumer).getConsumers().size(), 0); + assertEquals(((PatternTopicsConsumerImpl) consumer).getTopics().size(), 0); + + // 4. create producer + String messagePredicate = "my-message-" + key + "-"; + int totalMessages = 30; + + Producer producer1 = pulsarClient.newProducer().topic(topicName1) + .create(); + Producer producer2 = pulsarClient.newProducer().topic(topicName2) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); + Producer producer3 = pulsarClient.newProducer().topic(topicName3) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); + + // 5. call recheckTopics to subscribe each added topics above + log.debug("recheck topics change"); + PatternTopicsConsumerImpl consumer1 = ((PatternTopicsConsumerImpl) consumer); + consumer1.run(consumer1.getRecheckPatternTimeout()); + Thread.sleep(100); + + // 6. verify consumer get methods, to get number of partitions and topics, value 6=1+2+3. + assertSame(pattern, ((PatternTopicsConsumerImpl) consumer).getPattern()); + assertEquals(((PatternTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 6); + assertEquals(((PatternTopicsConsumerImpl) consumer).getConsumers().size(), 6); + assertEquals(((PatternTopicsConsumerImpl) consumer).getTopics().size(), 3); + + + // 7. produce data + for (int i = 0; i < totalMessages / 3; i++) { + producer1.send((messagePredicate + "producer1-" + i).getBytes()); + producer2.send((messagePredicate + "producer2-" + i).getBytes()); + producer3.send((messagePredicate + "producer3-" + i).getBytes()); + } + + // 8. should receive all the message + int messageSet = 0; + Message message = consumer.receive(); + do { + assertTrue(message instanceof TopicMessageImpl); + messageSet ++; + consumer.acknowledge(message); + log.debug("Consumer acknowledged : " + new String(message.getData())); + message = consumer.receive(500, TimeUnit.MILLISECONDS); + } while (message != null); + assertEquals(messageSet, totalMessages); + + consumer.unsubscribe(); + consumer.close(); + producer1.close(); + producer2.close(); + producer3.close(); + } + + // simulate subscribe a pattern which has 3 topics, but then matched topic added in. + @Test(timeOut = testTimeout) + public void testAutoSubscribePatternConsumer() throws Exception { + String key = "AutoSubscribePatternConsumer"; + String subscriptionName = "my-ex-subscription-" + key; + String topicName1 = "persistent://prop/use/ns-abc/pattern-topic-1-" + key; + String topicName2 = "persistent://prop/use/ns-abc/pattern-topic-2-" + key; + String topicName3 = "persistent://prop/use/ns-abc/pattern-topic-3-" + key; + Pattern pattern = Pattern.compile("persistent://prop/use/ns-abc/pattern-topic.*"); + + // 1. create partition + admin.properties().createProperty("prop", new PropertyAdmin()); + admin.persistentTopics().createPartitionedTopic(topicName2, 2); + admin.persistentTopics().createPartitionedTopic(topicName3, 3); + + // 2. create producer + String messagePredicate = "my-message-" + key + "-"; + int totalMessages = 30; + + Producer producer1 = pulsarClient.newProducer().topic(topicName1) + .create(); + Producer producer2 = pulsarClient.newProducer().topic(topicName2) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); + Producer producer3 = pulsarClient.newProducer().topic(topicName3) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); + + Consumer consumer = pulsarClient.newConsumer() + .topicsPattern(pattern) + .patternAutoDiscoveryPeriod(2) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .receiverQueueSize(4) + .subscribe(); + + assertTrue(consumer instanceof PatternTopicsConsumerImpl); + + // 4. verify consumer get methods, to get 6 number of partitions and topics: 6=1+2+3 + assertSame(pattern, ((PatternTopicsConsumerImpl) consumer).getPattern()); + assertEquals(((PatternTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 6); + assertEquals(((PatternTopicsConsumerImpl) consumer).getConsumers().size(), 6); + assertEquals(((PatternTopicsConsumerImpl) consumer).getTopics().size(), 3); + + // 5. produce data to topic 1,2,3; verify should receive all the message + for (int i = 0; i < totalMessages / 3; i++) { + producer1.send((messagePredicate + "producer1-" + i).getBytes()); + producer2.send((messagePredicate + "producer2-" + i).getBytes()); + producer3.send((messagePredicate + "producer3-" + i).getBytes()); + } + + int messageSet = 0; + Message message = consumer.receive(); + do { + assertTrue(message instanceof TopicMessageImpl); + messageSet ++; + consumer.acknowledge(message); + log.debug("Consumer acknowledged : " + new String(message.getData())); + message = consumer.receive(500, TimeUnit.MILLISECONDS); + } while (message != null); + assertEquals(messageSet, totalMessages); + + // 6. create another producer with 4 partitions + String topicName4 = "persistent://prop/use/ns-abc/pattern-topic-4-" + key; + admin.persistentTopics().createPartitionedTopic(topicName4, 4); + Producer producer4 = pulsarClient.newProducer().topic(topicName4) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); + + // 7. call recheckTopics to subscribe each added topics above, verify topics number: 10=1+2+3+4 + log.debug("recheck topics change"); + PatternTopicsConsumerImpl consumer1 = ((PatternTopicsConsumerImpl) consumer); + consumer1.run(consumer1.getRecheckPatternTimeout()); + Thread.sleep(100); + assertEquals(((PatternTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 10); + assertEquals(((PatternTopicsConsumerImpl) consumer).getConsumers().size(), 10); + assertEquals(((PatternTopicsConsumerImpl) consumer).getTopics().size(), 4); + + // 8. produce data to topic3 and topic4, verify should receive all the message + for (int i = 0; i < totalMessages / 2; i++) { + producer3.send((messagePredicate + "round2-producer4-" + i).getBytes()); + producer4.send((messagePredicate + "round2-producer4-" + i).getBytes()); + } + + messageSet = 0; + message = consumer.receive(); + do { + assertTrue(message instanceof TopicMessageImpl); + messageSet ++; + consumer.acknowledge(message); + log.debug("Consumer acknowledged : " + new String(message.getData())); + message = consumer.receive(500, TimeUnit.MILLISECONDS); + } while (message != null); + assertEquals(messageSet, totalMessages); + + consumer.unsubscribe(); + consumer.close(); + producer1.close(); + producer2.close(); + producer3.close(); + producer4.close(); + } + + @Test(timeOut = testTimeout) + public void testAutoUnbubscribePatternConsumer() throws Exception { + String key = "AutoUnsubscribePatternConsumer"; + String subscriptionName = "my-ex-subscription-" + key; + String topicName1 = "persistent://prop/use/ns-abc/pattern-topic-1-" + key; + String topicName2 = "persistent://prop/use/ns-abc/pattern-topic-2-" + key; + String topicName3 = "persistent://prop/use/ns-abc/pattern-topic-3-" + key; + Pattern pattern = Pattern.compile("persistent://prop/use/ns-abc/pattern-topic.*"); + + // 1. create partition + admin.properties().createProperty("prop", new PropertyAdmin()); + admin.persistentTopics().createPartitionedTopic(topicName2, 2); + admin.persistentTopics().createPartitionedTopic(topicName3, 3); + + // 2. create producer + String messagePredicate = "my-message-" + key + "-"; + int totalMessages = 30; + + Producer producer1 = pulsarClient.newProducer().topic(topicName1) + .create(); + Producer producer2 = pulsarClient.newProducer().topic(topicName2) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); + Producer producer3 = pulsarClient.newProducer().topic(topicName3) + .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition) + .create(); + + Consumer consumer = pulsarClient.newConsumer() + .topicsPattern(pattern) + .patternAutoDiscoveryPeriod(2) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .receiverQueueSize(4) + .subscribe(); + + assertTrue(consumer instanceof PatternTopicsConsumerImpl); + + // 4. verify consumer get methods, to get 0 number of partitions and topics: 6=1+2+3 + assertSame(pattern, ((PatternTopicsConsumerImpl) consumer).getPattern()); + assertEquals(((PatternTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 6); + assertEquals(((PatternTopicsConsumerImpl) consumer).getConsumers().size(), 6); + assertEquals(((PatternTopicsConsumerImpl) consumer).getTopics().size(), 3); + + // 5. produce data to topic 1,2,3; verify should receive all the message + for (int i = 0; i < totalMessages / 3; i++) { + producer1.send((messagePredicate + "producer1-" + i).getBytes()); + producer2.send((messagePredicate + "producer2-" + i).getBytes()); + producer3.send((messagePredicate + "producer3-" + i).getBytes()); + } + + int messageSet = 0; + Message message = consumer.receive(); + do { + assertTrue(message instanceof TopicMessageImpl); + messageSet ++; + consumer.acknowledge(message); + log.debug("Consumer acknowledged : " + new String(message.getData())); + message = consumer.receive(500, TimeUnit.MILLISECONDS); + } while (message != null); + assertEquals(messageSet, totalMessages); + + // 6. remove producer 1,3; verify only consumer 2 left + // seems no direct way to verify auto-unsubscribe, because this patternConsumer also referenced the topic. + List<String> topicNames = Lists.newArrayList(topicName2); + NamespaceService nss = pulsar.getNamespaceService(); + doReturn(topicNames).when(nss).getListOfTopics(NamespaceName.get("prop", "use", "ns-abc")); + + // 7. call recheckTopics to unsubscribe topic 1,3 , verify topics number: 2=6-1-3 + log.debug("recheck topics change"); + PatternTopicsConsumerImpl consumer1 = ((PatternTopicsConsumerImpl) consumer); + consumer1.run(consumer1.getRecheckPatternTimeout()); + Thread.sleep(100); + assertEquals(((PatternTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 2); + assertEquals(((PatternTopicsConsumerImpl) consumer).getConsumers().size(), 2); + assertEquals(((PatternTopicsConsumerImpl) consumer).getTopics().size(), 1); + + // 8. produce data to topic2, verify should receive all the message + for (int i = 0; i < totalMessages; i++) { + producer2.send((messagePredicate + "round2-producer2-" + i).getBytes()); + } + + messageSet = 0; + message = consumer.receive(); + do { + assertTrue(message instanceof TopicMessageImpl); + messageSet ++; + consumer.acknowledge(message); + log.debug("Consumer acknowledged : " + new String(message.getData())); + message = consumer.receive(500, TimeUnit.MILLISECONDS); + } while (message != null); + assertEquals(messageSet, totalMessages); + + consumer.unsubscribe(); + consumer.close(); + producer1.close(); + producer2.close(); + producer3.close(); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index cf63320..c487425 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -232,6 +232,15 @@ public interface ConsumerBuilder extends Serializable, Cloneable { ConsumerBuilder readCompacted(boolean readCompacted); /** + * Set topics auto discovery period when using a pattern for topics consumer. + * The period is in minute, and default and minimum value is 1 minute. + * + * @param periodInMinutes + * whether to read from the compacted topic + */ + ConsumerBuilder patternAutoDiscoveryPeriod(int periodInMinutes); + + /** * Sets priority level for the shared subscription consumers to which broker gives more priority while dispatching * messages. Here, broker follows descending priorities. (eg: 0=max-priority, 1, 2,..) </br> * In Shared subscription mode, broker will first dispatch messages to max priority-level consumers if they have diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index edaeb5b..4c9caa1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -201,4 +201,11 @@ public class ConsumerBuilderImpl implements ConsumerBuilder { conf.setReadCompacted(readCompacted); return this; } + + @Override + public ConsumerBuilder patternAutoDiscoveryPeriod(int periodInMinutes) { + conf.setPatternAutoDiscoveryPeriod(periodInMinutes); + return this; + } + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java index b09ad48..04445ea 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java @@ -18,16 +18,32 @@ */ package org.apache.pulsar.client.impl; +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; +import java.util.stream.Collectors; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class PatternTopicsConsumerImpl extends TopicsConsumerImpl { +public class PatternTopicsConsumerImpl extends TopicsConsumerImpl implements TimerTask { private final Pattern topicsPattern; + private final TopicsChangedListener topicsChangeListener; + private volatile Timeout recheckPatternTimeout = null; public PatternTopicsConsumerImpl(Pattern topicsPattern, PulsarClientImpl client, @@ -36,11 +52,132 @@ public class PatternTopicsConsumerImpl extends TopicsConsumerImpl { CompletableFuture<Consumer> subscribeFuture) { super(client, conf, listenerExecutor, subscribeFuture); this.topicsPattern = topicsPattern; + + if (this.namespaceName == null) { + this.namespaceName = getNameSpaceFromPattern(topicsPattern); + } + checkArgument(getNameSpaceFromPattern(topicsPattern).toString().equals(this.namespaceName.toString())); + + this.topicsChangeListener = new PatternTopicsChangedListener(); + recheckPatternTimeout = client.timer().newTimeout(this, Math.min(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.MINUTES); + } + + public static NamespaceName getNameSpaceFromPattern(Pattern pattern) { + return TopicName.get(pattern.pattern()).getNamespaceObject(); + } + + // TimerTask to recheck topics change, and trigger subscribe/unsubscribe based on the change. + @Override + public void run(Timeout timeout) throws Exception { + if (timeout.isCancelled()) { + return; + } + + CompletableFuture<Void> recheckFuture = new CompletableFuture<>(); + List<CompletableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(2); + + client.getLookup().getTopicsUnderNamespace(namespaceName).thenAccept(topics -> { + if (log.isDebugEnabled()) { + log.debug("Get topics under namespace {}, topics.size: {}", namespaceName.toString(), topics.size()); + topics.forEach(topicName -> + log.debug("Get topics under namespace {}, topic: {}", namespaceName.toString(), topicName)); + } + + List<String> newTopics = PulsarClientImpl.topicsPatternFilter(topics, topicsPattern); + List<String> oldTopics = PatternTopicsConsumerImpl.this.getTopics(); + + futures.add(topicsChangeListener.onTopicsAdded(topicsListsMinus(newTopics, oldTopics))); + futures.add(topicsChangeListener.onTopicsRemoved(topicsListsMinus(oldTopics, newTopics))); + FutureUtil.waitForAll(futures) + .thenAccept(finalFuture -> recheckFuture.complete(null)) + .exceptionally(ex -> { + log.warn("[{}] Failed to recheck topics change: {}", topic, ex.getMessage()); + recheckFuture.completeExceptionally(ex); + return null; + }); + }); + + // schedule the next re-check task + client.timer().newTimeout(PatternTopicsConsumerImpl.this, + Math.min(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.MINUTES); } public Pattern getPattern() { return this.topicsPattern; } + interface TopicsChangedListener { + // unsubscribe and delete ConsumerImpl in the `consumers` map in `TopicsConsumerImpl` based on added topics. + CompletableFuture<Void> onTopicsRemoved(Collection<String> removedTopics); + // subscribe and create a list of new ConsumerImpl, added them to the `consumers` map in `TopicsConsumerImpl`. + CompletableFuture<Void> onTopicsAdded(Collection<String> addedTopics); + } + + private class PatternTopicsChangedListener implements TopicsChangedListener { + @Override + public CompletableFuture<Void> onTopicsRemoved(Collection<String> removedTopics) { + CompletableFuture<Void> removeFuture = new CompletableFuture<>(); + + if (removedTopics.isEmpty()) { + removeFuture.complete(null); + return removeFuture; + } + + List<CompletableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(topics.size()); + removedTopics.stream().forEach(topic -> futures.add(unsubscribeAsync(topic))); + FutureUtil.waitForAll(futures) + .thenAccept(finalFuture -> removeFuture.complete(null)) + .exceptionally(ex -> { + log.warn("[{}] Failed to subscribe topics: {}", topic, ex.getMessage()); + removeFuture.completeExceptionally(ex); + return null; + }); + return removeFuture; + } + + @Override + public CompletableFuture<Void> onTopicsAdded(Collection<String> addedTopics) { + CompletableFuture<Void> addFuture = new CompletableFuture<>(); + + if (addedTopics.isEmpty()) { + addFuture.complete(null); + return addFuture; + } + + List<CompletableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(topics.size()); + addedTopics.stream().forEach(topic -> futures.add(subscribeAsync(topic))); + FutureUtil.waitForAll(futures) + .thenAccept(finalFuture -> addFuture.complete(null)) + .exceptionally(ex -> { + log.warn("[{}] Failed to unsubscribe topics: {}", topic, ex.getMessage()); + addFuture.completeExceptionally(ex); + return null; + }); + return addFuture; + } + } + + // get topics, which are contained in list1, and not in list2 + public static List<String> topicsListsMinus(List<String> list1, List<String> list2) { + HashSet<String> s1 = new HashSet<>(list1); + s1.removeAll(list2); + return s1.stream().collect(Collectors.toList()); + } + + @Override + public CompletableFuture<Void> closeAsync() { + Timeout timeout = recheckPatternTimeout; + if (timeout != null) { + timeout.cancel(); + recheckPatternTimeout = null; + } + return super.closeAsync(); + } + + @VisibleForTesting + Timeout getRecheckPatternTimeout() { + return recheckPatternTimeout; + } + private static final Logger log = LoggerFactory.getLogger(PatternTopicsConsumerImpl.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index c07ec8b..fd3cada 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.pulsar.client.api.ClientConfiguration; import org.apache.pulsar.client.api.Consumer; @@ -402,13 +403,13 @@ public class PulsarClientImpl implements PulsarClient { CompletableFuture<Consumer> consumerSubscribedFuture = new CompletableFuture<>(); lookup.getTopicsUnderNamespace(namespaceName) .thenAccept(topics -> { - List<String> topicsList = topics.stream() - .filter(topic -> { - TopicName destinationName = TopicName.get(topic); - checkState(destinationName.getNamespaceObject().equals(namespaceName)); - return conf.getTopicsPattern().matcher(destinationName.toString()).matches(); - }) - .collect(Collectors.toList()); + if (log.isDebugEnabled()) { + log.debug("Get topics under namespace {}, topics.size: {}", namespaceName.toString(), topics.size()); + topics.forEach(topicName -> + log.debug("Get topics under namespace {}, topic: {}", namespaceName.toString(), topicName)); + } + + List<String> topicsList = topicsPatternFilter(topics, conf.getTopicsPattern()); conf.getTopicNames().addAll(topicsList); ConsumerBase consumer = new PatternTopicsConsumerImpl(conf.getTopicsPattern(), PulsarClientImpl.this, @@ -429,6 +430,17 @@ public class PulsarClientImpl implements PulsarClient { return consumerSubscribedFuture; } + // get topics that match 'topicsPattern' from original topics list + // return result should contain only topic names, without partition part + public static List<String> topicsPatternFilter(List<String> original, Pattern topicsPattern) { + return original.stream() + .filter(topic -> { + TopicName destinationName = TopicName.get(topic); + return topicsPattern.matcher(destinationName.toString()).matches(); + }) + .collect(Collectors.toList()); + } + @Override public Reader createReader(String topic, MessageId startMessageId, ReaderConfiguration conf) throws PulsarClientException { @@ -619,6 +631,10 @@ public class PulsarClientImpl implements PulsarClient { return eventLoopGroup; } + public LookupService getLookup() { + return lookup; + } + public CompletableFuture<Integer> getNumberOfPartitions(String topic) { return getPartitionedTopicMetadata(topic).thenApply(metadata -> metadata.partitions); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java index 492c014..4a8e663 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java @@ -38,7 +38,6 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import java.util.stream.IntStream; - import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; @@ -64,7 +63,7 @@ public class TopicsConsumerImpl extends ConsumerBase { private final ConcurrentHashMap<String, ConsumerImpl> consumers; // Map <topic, partitionNumber>, store partition number for each topic - private final ConcurrentHashMap<String, Integer> topics; + protected final ConcurrentHashMap<String, Integer> topics; // Queue of partition consumers on which we have stopped calling receiveAsync() because the // shared incoming queue was full @@ -146,8 +145,8 @@ public class TopicsConsumerImpl extends ConsumerBase { // - every topic has same namespace, // - topic names are unique. private static boolean topicNamesValid(Collection<String> topics) { - checkState(topics != null && topics.size() > 1, - "topics should should contain more than 1 topics"); + checkState(topics != null && topics.size() >= 1, + "topics should should contain more than 1 topic"); final String namespace = TopicName.get(topics.stream().findFirst().get()).getNamespace(); @@ -689,8 +688,8 @@ public class TopicsConsumerImpl extends ConsumerBase { consumers.values().stream() .filter(consumer1 -> { String consumerTopicName = consumer1.getTopic(); - if (TopicName.get(consumerTopicName) - .getPartitionedTopicName().equals(topicName)) { + if (TopicName.get(consumerTopicName).getPartitionedTopicName().equals( + TopicName.get(topicName).getPartitionedTopicName().toString())) { return true; } else { return false; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index 5acfe9d..72df4ae 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -75,6 +75,8 @@ public class ConsumerConfigurationData implements Serializable, Cloneable { private boolean readCompacted = false; + private int patternAutoDiscoveryPeriod = 1; + @JsonIgnore public String getSingleTopic() { checkArgument(topicNames.size() == 1); -- To stop receiving notification emails like this one, please contact mme...@apache.org.