This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 3a5df1a Issue 3230: auto update topic partitions extend for consumer and producer (#3513) 3a5df1a is described below commit 3a5df1ae71a29873fdd91295763342b90f9bc224 Author: Jia Zhai <zhai...@apache.org> AuthorDate: Fri Feb 8 04:51:47 2019 +0800 Issue 3230: auto update topic partitions extend for consumer and producer (#3513) * consumer auto subscribe * producer update for partiton extends * change following Matteo's comments * enable by default --- .../api/PartitionedProducerConsumerTest.java | 112 ++++++++++ .../pulsar/client/impl/TopicsConsumerImplTest.java | 88 ++++++++ .../apache/pulsar/client/api/ConsumerBuilder.java | 9 + .../apache/pulsar/client/api/ProducerBuilder.java | 9 + .../pulsar/client/impl/ConsumerBuilderImpl.java | 7 +- .../client/impl/MultiTopicsConsumerImpl.java | 237 ++++++++++++++++----- .../client/impl/PartitionedProducerImpl.java | 126 ++++++++++- .../client/impl/PartitionsChangedListener.java | 33 +++ .../impl/PatternMultiTopicsConsumerImpl.java | 2 +- .../pulsar/client/impl/ProducerBuilderImpl.java | 5 + .../impl/conf/ConsumerConfigurationData.java | 2 + .../impl/conf/ProducerConfigurationData.java | 2 + 12 files changed, 578 insertions(+), 54 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java index 137c57d..55952b7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java @@ -22,6 +22,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import io.netty.util.Timeout; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -34,6 +35,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.PartitionedProducerImpl; import org.apache.pulsar.client.impl.TypedMessageBuilderImpl; import org.apache.pulsar.common.naming.TopicName; @@ -667,6 +669,116 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase { Collections.singletonList(nonPartitionedTopic)); } + + /** + * It verifies that consumer producer auto update for partitions extend. + * + * Steps: + * 1. create topic with 2 partitions, and producer consumer + * 2. update partition from 2 to 3. + * 3. trigger auto update in producer, after produce, consumer will only get messages from 2 partitions. + * 4. trigger auto update in consumer, after produce, consumer will get all messages from 3 partitions. + * + * @throws Exception + */ + @Test(timeOut = 30000) + public void testAutoUpdatePartitionsForProducerConsumer() throws Exception { + log.info("-- Starting {} test --", methodName); + PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection + + final int numPartitions = 2; + final String topicName = "persistent://my-property/my-ns/my-topic-" + System.currentTimeMillis(); + final String producerMsg = "producerMsg"; + final int totalMessages = 30; + + admin.topics().createPartitionedTopic(topicName, numPartitions); + + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) + .enableBatching(false) + .autoUpdatePartitions(true) + .create(); + + Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName) + .subscriptionName("my-partitioned-subscriber") + .subscriptionType(SubscriptionType.Shared) + .autoUpdatePartitions(true) + .subscribe(); + + // 1. produce and consume 2 partitions + for (int i = 0; i < totalMessages; i++) { + producer.send((producerMsg + " first round " + "message index: " + i).getBytes()); + } + int messageSet = 0; + Message<byte[]> message = consumer.receive(); + do { + messageSet ++; + consumer.acknowledge(message); + log.info("Consumer acknowledged : " + new String(message.getData())); + message = consumer.receive(200, TimeUnit.MILLISECONDS); + } while (message != null); + assertEquals(messageSet, totalMessages); + + // 2. update partition from 2 to 3. + admin.topics().updatePartitionedTopic(topicName,3); + + // 3. trigger auto update in producer, after produce, consumer will get 2/3 messages. + log.info("trigger partitionsAutoUpdateTimerTask for producer"); + Timeout timeout = ((PartitionedProducerImpl<byte[]>)producer).getPartitionsAutoUpdateTimeout(); + timeout.task().run(timeout); + Thread.sleep(200); + + for (int i = 0; i < totalMessages; i++) { + producer.send((producerMsg + " second round " + "message index: " + i).getBytes()); + } + messageSet = 0; + message = consumer.receive(); + do { + messageSet ++; + consumer.acknowledge(message); + log.info("Consumer acknowledged : " + new String(message.getData())); + message = consumer.receive(200, TimeUnit.MILLISECONDS); + } while (message != null); + assertEquals(messageSet, totalMessages * 2 / 3); + + // 4. trigger auto update in consumer, after produce, consumer will get all messages. + log.info("trigger partitionsAutoUpdateTimerTask for consumer"); + timeout = ((MultiTopicsConsumerImpl<byte[]>)consumer).getPartitionsAutoUpdateTimeout(); + timeout.task().run(timeout); + Thread.sleep(200); + + // former produced messages + messageSet = 0; + message = consumer.receive(); + do { + messageSet ++; + consumer.acknowledge(message); + log.info("Consumer acknowledged : " + new String(message.getData())); + message = consumer.receive(200, TimeUnit.MILLISECONDS); + } while (message != null); + assertEquals(messageSet, totalMessages / 3); + + // former produced messages + for (int i = 0; i < totalMessages; i++) { + producer.send((producerMsg + " third round " + "message index: " + i).getBytes()); + } + messageSet = 0; + message = consumer.receive(); + do { + messageSet ++; + consumer.acknowledge(message); + log.info("Consumer acknowledged : " + new String(message.getData())); + message = consumer.receive(200, TimeUnit.MILLISECONDS); + } while (message != null); + assertEquals(messageSet, totalMessages); + + pulsarClient.close(); + admin.topics().deletePartitionedTopic(topicName); + + log.info("-- Exiting {} test --", methodName); + } + + private class AlwaysTwoMessageRouter implements MessageRouter { @Override public int choosePartition(Message<?> msg, TopicMetadata metadata) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index b791033..fc46586 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -20,10 +20,12 @@ package org.apache.pulsar.client.impl; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.Lists; +import io.netty.util.Timeout; import java.util.HashSet; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -672,4 +674,90 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase { consumer.close(); } + + + /** + * Test topic partitions auto subscribed. + * + * Steps: + * 1. Create a consumer with 2 topics, and each topic has 2 partitions: xx-partition-0, xx-partition-1. + * 2. produce message to xx-partition-2, and verify consumer could not receive message. + * 3. update topics to have 3 partitions. + * 4. trigger partitionsAutoUpdate. this should be done automatically, this is to save time to manually trigger. + * 5. produce message to xx-partition-2 again, and verify consumer could receive message. + * + */ + @Test(timeOut = 30000) + public void testTopicAutoUpdatePartitions() throws Exception { + String key = "TestTopicAutoUpdatePartitions"; + final String subscriptionName = "my-ex-subscription-" + key; + final String messagePredicate = "my-message-" + key + "-"; + final int totalMessages = 6; + + final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key; + final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key; + List<String> topicNames = Lists.newArrayList(topicName1, topicName2); + + admin.tenants().createTenant("prop", new TenantInfo()); + admin.topics().createPartitionedTopic(topicName1, 2); + admin.topics().createPartitionedTopic(topicName2, 2); + + // 1. Create a consumer + Consumer<byte[]> consumer = pulsarClient.newConsumer() + .topics(topicNames) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .receiverQueueSize(4) + .autoUpdatePartitions(true) + .subscribe(); + assertTrue(consumer instanceof MultiTopicsConsumerImpl); + + MultiTopicsConsumerImpl topicsConsumer = (MultiTopicsConsumerImpl) consumer; + + // 2. use partition-2 producer, + Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName1 + "-partition-2") + .enableBatching(false) + .create(); + Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName2 + "-partition-2") + .enableBatching(false) + .create(); + for (int i = 0; i < totalMessages; i++) { + producer1.send((messagePredicate + "topic1-partition-2 index:" + i).getBytes()); + producer2.send((messagePredicate + "topic2-partition-2 index:" + i).getBytes()); + log.info("produce message to partition-2. message index: {}", i); + } + // since partition-2 not subscribed, could not receive any message. + Message<byte[]> message = consumer.receive(200, TimeUnit.MILLISECONDS); + assertNull(message); + + // 3. update to 3 partitions + admin.topics().updatePartitionedTopic(topicName1, 3); + admin.topics().updatePartitionedTopic(topicName2, 3); + + // 4. trigger partitionsAutoUpdate. this should be done automatically in 1 minutes, + // this is to save time to manually trigger. + log.info("trigger partitionsAutoUpdateTimerTask"); + Timeout timeout = topicsConsumer.getPartitionsAutoUpdateTimeout(); + timeout.task().run(timeout); + Thread.sleep(200); + + // 5. produce message to xx-partition-2 again, and verify consumer could receive message. + for (int i = 0; i < totalMessages; i++) { + producer1.send((messagePredicate + "topic1-partition-2 index:" + i).getBytes()); + producer2.send((messagePredicate + "topic2-partition-2 index:" + i).getBytes()); + log.info("produce message to partition-2 again. messageindex: {}", i); + } + int messageSet = 0; + message = consumer.receive(); + do { + messageSet ++; + consumer.acknowledge(message); + log.info("4 Consumer acknowledged : " + new String(message.getData())); + message = consumer.receive(200, TimeUnit.MILLISECONDS); + } while (message != null); + assertEquals(messageSet, 2 * totalMessages); + + consumer.close(); + } } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index 5a8613d..f940640 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -368,4 +368,13 @@ public interface ConsumerBuilder<T> extends Cloneable { * then the ack timeout will be set to 30000 millisecond */ ConsumerBuilder<T> deadLetterPolicy(DeadLetterPolicy deadLetterPolicy); + + /** + * If enabled, the consumer will auto subscribe for partitions increasement. + * This is only for partitioned consumer. + * + * @param autoUpdate + * whether to auto update partition increasement + */ + ConsumerBuilder<T> autoUpdatePartitions(boolean autoUpdate); } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java index dd87685..3b01314 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java @@ -328,4 +328,13 @@ public interface ProducerBuilder<T> extends Cloneable { * @return producer builder. */ ProducerBuilder<T> intercept(ProducerInterceptor<T> ... interceptors); + + /** + * If enabled, partitioned producer will auto create new producers for new partitions. + * This is only for partitioned producer. + * + * @param autoUpdate + * whether to auto update partition increasement + */ + ProducerBuilder<T> autoUpdatePartitions(boolean autoUpdate); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index 09ba946..7e809ee 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -46,7 +46,6 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; -import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode; import org.apache.pulsar.common.util.FutureUtil; import com.google.common.collect.Lists; @@ -293,6 +292,12 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> { return this; } + @Override + public ConsumerBuilder<T> autoUpdatePartitions(boolean autoUpdate) { + conf.setAutoUpdatePartitions(autoUpdate); + return this; + } + public ConsumerConfigurationData<T> getConf() { return conf; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index e9ef632..39c0511 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -21,6 +21,10 @@ package org.apache.pulsar.client.impl; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -40,12 +44,12 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import java.util.stream.IntStream; - import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerStats; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarClientException.NotSupportedException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; @@ -79,6 +83,11 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { // sum of topicPartitions, simple topic has 1, partitioned topic equals to partition number. AtomicInteger allTopicPartitionsNumber; + // timeout related to auto check and subscribe partition increasement + private volatile Timeout partitionsAutoUpdateTimeout = null; + TopicsPartitionChangedListener topicsPartitionChangedListener; + CompletableFuture<Void> partitionsAutoUpdateFuture = null; + private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final ConsumerStatsRecorder stats; private final UnAckedMessageTracker unAckedMessageTracker; @@ -111,6 +120,13 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { this.internalConfig = getInternalConsumerConfig(); this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ConsumerStatsRecorderImpl() : null; + // start track and auto subscribe partition increasement + if (conf.isAutoUpdatePartitions()) { + topicsPartitionChangedListener = new TopicsPartitionChangedListener(); + partitionsAutoUpdateTimeout = client.timer() + .newTimeout(partitionsAutoUpdateTimerTask, 1, TimeUnit.MINUTES); + } + if (conf.getTopicNames().isEmpty()) { this.namespaceName = null; setState(State.Ready); @@ -126,20 +142,16 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { .collect(Collectors.toList()); FutureUtil.waitForAll(futures) .thenAccept(finalFuture -> { - try { - if (allTopicPartitionsNumber.get() > maxReceiverQueueSize) { - setMaxReceiverQueueSize(allTopicPartitionsNumber.get()); - } - setState(State.Ready); - // We have successfully created N consumers, so we can start receiving messages now - startReceivingMessages(new ArrayList<>(consumers.values())); - subscribeFuture().complete(MultiTopicsConsumerImpl.this); - log.info("[{}] [{}] Created topics consumer with {} sub-consumers", - topic, subscription, allTopicPartitionsNumber.get()); - } catch (PulsarClientException e) { - log.warn("[{}] Failed startReceivingMessages while subscribe topics: {}", topic, e.getMessage()); - subscribeFuture.completeExceptionally(e); - }}) + if (allTopicPartitionsNumber.get() > maxReceiverQueueSize) { + setMaxReceiverQueueSize(allTopicPartitionsNumber.get()); + } + setState(State.Ready); + // We have successfully created N consumers, so we can start receiving messages now + startReceivingMessages(new ArrayList<>(consumers.values())); + log.info("[{}] [{}] Created topics consumer with {} sub-consumers", + topic, subscription, allTopicPartitionsNumber.get()); + subscribeFuture().complete(MultiTopicsConsumerImpl.this); + }) .exceptionally(ex -> { log.warn("[{}] Failed to subscribe topics: {}", topic, ex.getMessage()); subscribeFuture.completeExceptionally(ex); @@ -187,7 +199,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { } } - private void startReceivingMessages(List<ConsumerImpl<T>> newConsumers) throws PulsarClientException { + private void startReceivingMessages(List<ConsumerImpl<T>> newConsumers) { if (log.isDebugEnabled()) { log.debug("[{}] startReceivingMessages for {} new consumers in topics consumer, state: {}", topic, newConsumers.size(), getState()); @@ -431,6 +443,11 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { } setState(State.Closing); + if (partitionsAutoUpdateTimeout != null) { + partitionsAutoUpdateTimeout.cancel(); + partitionsAutoUpdateTimeout = null; + } + CompletableFuture<Void> closeFuture = new CompletableFuture<>(); List<CompletableFuture<Void>> futureList = consumers.values().stream() .map(c -> c.closeAsync()).collect(Collectors.toList()); @@ -719,39 +736,35 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { FutureUtil.waitForAll(futureList) .thenAccept(finalFuture -> { - try { - if (allTopicPartitionsNumber.get() > maxReceiverQueueSize) { - setMaxReceiverQueueSize(allTopicPartitionsNumber.get()); - } - int numTopics = this.topics.values().stream().mapToInt(Integer::intValue).sum(); - checkState(allTopicPartitionsNumber.get() == numTopics, - "allTopicPartitionsNumber " + allTopicPartitionsNumber.get() - + " not equals expected: " + numTopics); - - // We have successfully created new consumers, so we can start receiving messages for them - startReceivingMessages( - consumers.values().stream() - .filter(consumer1 -> { - String consumerTopicName = consumer1.getTopic(); - if (TopicName.get(consumerTopicName).getPartitionedTopicName().equals( - TopicName.get(topicName).getPartitionedTopicName().toString())) { - return true; - } else { - return false; - } - }) - .collect(Collectors.toList())); - - subscribeResult.complete(null); - log.info("[{}] [{}] Success subscribe new topic {} in topics consumer, partitions: {}, allTopicPartitionsNumber: {}", - topic, subscription, topicName, numPartitions, allTopicPartitionsNumber.get()); - if (this.namespaceName == null) { - this.namespaceName = TopicName.get(topicName).getNamespaceObject(); - } - return; - } catch (PulsarClientException e) { - handleSubscribeOneTopicError(topicName, e, subscribeResult); + if (allTopicPartitionsNumber.get() > maxReceiverQueueSize) { + setMaxReceiverQueueSize(allTopicPartitionsNumber.get()); } + int numTopics = this.topics.values().stream().mapToInt(Integer::intValue).sum(); + checkState(allTopicPartitionsNumber.get() == numTopics, + "allTopicPartitionsNumber " + allTopicPartitionsNumber.get() + + " not equals expected: " + numTopics); + + // We have successfully created new consumers, so we can start receiving messages for them + startReceivingMessages( + consumers.values().stream() + .filter(consumer1 -> { + String consumerTopicName = consumer1.getTopic(); + if (TopicName.get(consumerTopicName).getPartitionedTopicName().equals( + TopicName.get(topicName).getPartitionedTopicName().toString())) { + return true; + } else { + return false; + } + }) + .collect(Collectors.toList())); + + subscribeResult.complete(null); + log.info("[{}] [{}] Success subscribe new topic {} in topics consumer, partitions: {}, allTopicPartitionsNumber: {}", + topic, subscription, topicName, numPartitions, allTopicPartitionsNumber.get()); + if (this.namespaceName == null) { + this.namespaceName = TopicName.get(topicName).getNamespaceObject(); + } + return; }) .exceptionally(ex -> { handleSubscribeOneTopicError(topicName, ex, subscribeResult); @@ -800,6 +813,11 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { new PulsarClientException.AlreadyClosedException("Topics Consumer was already closed")); } + if (partitionsAutoUpdateTimeout != null) { + partitionsAutoUpdateTimeout.cancel(); + partitionsAutoUpdateTimeout = null; + } + CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>(); String topicPartName = TopicName.get(topicName).getPartitionedTopicName(); @@ -867,5 +885,128 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { consumers.forEach((name, consumer) -> consumer.resume()); } + // This listener is triggered when topics partitions are updated. + private class TopicsPartitionChangedListener implements PartitionsChangedListener { + // Check partitions changes of passed in topics, and subscribe new added partitions. + @Override + public CompletableFuture<Void> onTopicsExtended(Collection<String> topicsExtended) { + CompletableFuture<Void> future = new CompletableFuture<>(); + if (topicsExtended.isEmpty()) { + future.complete(null); + return future; + } + + if (log.isDebugEnabled()) { + log.debug("[{}] run onTopicsExtended: {}, size: {}", + topic, topicsExtended.toString(), topicsExtended.size()); + } + + List<CompletableFuture<Void>> futureList = Lists.newArrayListWithExpectedSize(topicsExtended.size()); + topicsExtended.forEach(topic -> futureList.add(subscribeIncreasedTopicPartitions(topic))); + FutureUtil.waitForAll(futureList) + .thenAccept(finalFuture -> future.complete(null)) + .exceptionally(ex -> { + log.warn("[{}] Failed to subscribe increased topics partitions: {}", topic, ex.getMessage()); + future.completeExceptionally(ex); + return null; + }); + + return future; + } + } + + // subscribe increased partitions for a given topic + private CompletableFuture<Void> subscribeIncreasedTopicPartitions(String topicName) { + CompletableFuture<Void> future = new CompletableFuture<>(); + + client.getPartitionsForTopic(topicName).thenCompose(list -> { + int oldPartitionNumber = topics.get(topicName.toString()); + int currentPartitionNumber = list.size(); + + if (log.isDebugEnabled()) { + log.debug("[{}] partitions number. old: {}, new: {}", + topicName.toString(), oldPartitionNumber, currentPartitionNumber); + } + + if (oldPartitionNumber == currentPartitionNumber) { + // topic partition number not changed + future.complete(null); + return future; + } else if (oldPartitionNumber < currentPartitionNumber) { + List<String> newPartitions = list.subList(oldPartitionNumber, currentPartitionNumber); + // subscribe new added partitions + List<CompletableFuture<Consumer<T>>> futureList = newPartitions + .stream() + .map(partitionName -> { + int partitionIndex = TopicName.getPartitionIndex(partitionName); + CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>(); + ConsumerConfigurationData<T> configurationData = getInternalConsumerConfig(); + ConsumerImpl<T> newConsumer = new ConsumerImpl<>( + client, partitionName, configurationData, + client.externalExecutorProvider().getExecutor(), + partitionIndex, subFuture, schema, interceptors); + consumers.putIfAbsent(newConsumer.getTopic(), newConsumer); + if (log.isDebugEnabled()) { + log.debug("[{}] create consumer {} for partitionName: {}", + topicName.toString(), newConsumer.getTopic(), partitionName); + } + return subFuture; + }) + .collect(Collectors.toList()); + + // wait for all partitions subscribe future complete, then startReceivingMessages + FutureUtil.waitForAll(futureList) + .thenAccept(finalFuture -> { + List<ConsumerImpl<T>> newConsumerList = newPartitions.stream() + .map(partitionTopic -> consumers.get(partitionTopic)) + .collect(Collectors.toList()); + + startReceivingMessages(newConsumerList); + future.complete(null); + }) + .exceptionally(ex -> { + log.warn("[{}] Failed to subscribe {} partition: {} - {}", + topic, topicName.toString(), oldPartitionNumber, currentPartitionNumber, ex.getMessage()); + future.completeExceptionally(ex); + return null; + }); + } else { + log.error("[{}] not support shrink topic partitions. old: {}, new: {}", + topicName.toString(), oldPartitionNumber, currentPartitionNumber); + future.completeExceptionally(new NotSupportedException("not support shrink topic partitions")); + } + return future; + }); + + return future; + } + + private TimerTask partitionsAutoUpdateTimerTask = new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + if (timeout.isCancelled() || getState() != State.Ready) { + return; + } + + if (log.isDebugEnabled()) { + log.debug("[{}] run partitionsAutoUpdateTimerTask for multiTopicsConsumer: {}", topic); + } + + // if last auto update not completed yet, do nothing. + if (partitionsAutoUpdateFuture == null || partitionsAutoUpdateFuture.isDone()) { + partitionsAutoUpdateFuture = topicsPartitionChangedListener.onTopicsExtended(topics.keySet()); + } + + // schedule the next re-check task + partitionsAutoUpdateTimeout = client.timer() + .newTimeout(partitionsAutoUpdateTimerTask, 1, TimeUnit.MINUTES); + } + }; + + @VisibleForTesting + public Timeout getPartitionsAutoUpdateTimeout() { + return partitionsAutoUpdateTimeout; + } + private static final Logger log = LoggerFactory.getLogger(MultiTopicsConsumerImpl.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java index 9b862df..bfc6645 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java @@ -21,13 +21,18 @@ package org.apache.pulsar.client.impl; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; +import java.util.Collection; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; - import java.util.stream.Collectors; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; @@ -35,6 +40,7 @@ import org.apache.pulsar.client.api.MessageRouter; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarClientException.NotSupportedException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TopicMetadata; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; @@ -43,8 +49,6 @@ import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; - public class PartitionedProducerImpl<T> extends ProducerBase<T> { private static final Logger log = LoggerFactory.getLogger(PartitionedProducerImpl.class); @@ -52,7 +56,12 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> { private List<ProducerImpl<T>> producers; private MessageRouter routerPolicy; private final ProducerStatsRecorderImpl stats; - private final TopicMetadata topicMetadata; + private TopicMetadata topicMetadata; + + // timeout related to auto check and subscribe partition increasement + private volatile Timeout partitionsAutoUpdateTimeout = null; + TopicsPartitionChangedListener topicsPartitionChangedListener; + CompletableFuture<Void> partitionsAutoUpdateFuture = null; public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf, int numPartitions, CompletableFuture<Producer<T>> producerCreatedFuture, Schema<T> schema, ProducerInterceptors<T> interceptors) { @@ -66,6 +75,13 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> { conf.getMaxPendingMessagesAcrossPartitions() / numPartitions); conf.setMaxPendingMessages(maxPendingMessages); start(); + + // start track and auto subscribe partition increasement + if (conf.isAutoUpdatePartitions()) { + topicsPartitionChangedListener = new TopicsPartitionChangedListener(); + partitionsAutoUpdateTimeout = client.timer() + .newTimeout(partitionsAutoUpdateTimerTask, 1, TimeUnit.MINUTES); + } } private MessageRouter getMessageRouter() { @@ -192,6 +208,11 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> { } setState(State.Closing); + if (partitionsAutoUpdateTimeout != null) { + partitionsAutoUpdateTimeout.cancel(); + partitionsAutoUpdateTimeout = null; + } + AtomicReference<Throwable> closeFail = new AtomicReference<Throwable>(); AtomicInteger completed = new AtomicInteger(topicMetadata.numPartitions()); CompletableFuture<Void> closeFuture = new CompletableFuture<>(); @@ -244,4 +265,101 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> { return "partition-producer"; } + // This listener is triggered when topics partitions are updated. + private class TopicsPartitionChangedListener implements PartitionsChangedListener { + // Check partitions changes of passed in topics, and add new topic partitions. + @Override + public CompletableFuture<Void> onTopicsExtended(Collection<String> topicsExtended) { + CompletableFuture<Void> future = new CompletableFuture<>(); + if (topicsExtended.isEmpty() || !topicsExtended.contains(topic)) { + future.complete(null); + return future; + } + + client.getPartitionsForTopic(topic).thenCompose(list -> { + int oldPartitionNumber = topicMetadata.numPartitions(); + int currentPartitionNumber = list.size(); + + if (log.isDebugEnabled()) { + log.debug("[{}] partitions number. old: {}, new: {}", + topic, oldPartitionNumber, currentPartitionNumber); + } + + if (oldPartitionNumber == currentPartitionNumber) { + // topic partition number not changed + future.complete(null); + return future; + } else if (oldPartitionNumber < currentPartitionNumber) { + List<CompletableFuture<Producer<T>>> futureList = list + .subList(oldPartitionNumber, currentPartitionNumber) + .stream() + .map(partitionName -> { + int partitionIndex = TopicName.getPartitionIndex(partitionName); + ProducerImpl<T> producer = + new ProducerImpl<>(client, + partitionName, conf, new CompletableFuture<>(), + partitionIndex, schema, interceptors); + producers.add(producer); + return producer.producerCreatedFuture(); + }).collect(Collectors.toList()); + + FutureUtil.waitForAll(futureList) + .thenAccept(finalFuture -> { + if (log.isDebugEnabled()) { + log.debug("[{}] success create producers for extended partitions. old: {}, new: {}", + topic, oldPartitionNumber, currentPartitionNumber); + } + topicMetadata = new TopicMetadataImpl(currentPartitionNumber); + future.complete(null); + }) + .exceptionally(ex -> { + // error happened, remove + log.warn("[{}] fail create producers for extended partitions. old: {}, new: {}", + topic, oldPartitionNumber, currentPartitionNumber); + List<ProducerImpl<T>> sublist = producers.subList(oldPartitionNumber, producers.size()); + sublist.forEach(newProducer -> newProducer.closeAsync()); + sublist.clear(); + future.completeExceptionally(ex); + return null; + }); + return null; + } else { + log.error("[{}] not support shrink topic partitions. old: {}, new: {}", + topic, oldPartitionNumber, currentPartitionNumber); + future.completeExceptionally(new NotSupportedException("not support shrink topic partitions")); + } + return future; + }); + + return future; + } + } + + private TimerTask partitionsAutoUpdateTimerTask = new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + if (timeout.isCancelled() || getState() != State.Ready) { + return; + } + + if (log.isDebugEnabled()) { + log.debug("[{}] run partitionsAutoUpdateTimerTask for partitioned producer: {}", topic); + } + + // if last auto update not completed yet, do nothing. + if (partitionsAutoUpdateFuture == null || partitionsAutoUpdateFuture.isDone()) { + partitionsAutoUpdateFuture = topicsPartitionChangedListener.onTopicsExtended(ImmutableList.of(topic)); + } + + // schedule the next re-check task + partitionsAutoUpdateTimeout = client.timer() + .newTimeout(partitionsAutoUpdateTimerTask, 1, TimeUnit.MINUTES); + } + }; + + @VisibleForTesting + public Timeout getPartitionsAutoUpdateTimeout() { + return partitionsAutoUpdateTimeout; + } + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionsChangedListener.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionsChangedListener.java new file mode 100644 index 0000000..48de279 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionsChangedListener.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import java.util.Collection; +import java.util.concurrent.CompletableFuture; + +/** + * Listener that notified when concerned topic partitions changed. + */ +public interface PartitionsChangedListener { + /** + * Notified when topic partitions increased. + * Passed in topics that have partitions increased. + */ + CompletableFuture<Void> onTopicsExtended(Collection<String> topicsExtended); +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java index a33b790..47e3236 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java @@ -103,7 +103,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T }); // schedule the next re-check task - client.timer().newTimeout(PatternMultiTopicsConsumerImpl.this, + recheckPatternTimeout = client.timer().newTimeout(PatternMultiTopicsConsumerImpl.this, Math.min(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.MINUTES); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java index 92526f5..193d034 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java @@ -255,6 +255,11 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> { interceptorList.addAll(Arrays.asList(interceptors)); return this; } + @Override + public ProducerBuilder<T> autoUpdatePartitions(boolean autoUpdate) { + conf.setAutoUpdatePartitions(autoUpdate); + return this; + } private void setMessageRoutingMode() throws PulsarClientException { if(conf.getMessageRoutingMode() == null && conf.getCustomMessageRouter() == null) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index 8e1a723..0f61cbb 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -90,6 +90,8 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable { private DeadLetterPolicy deadLetterPolicy; + private boolean autoUpdatePartitions = true; + @JsonIgnore public String getSingleTopic() { checkArgument(topicNames.size() == 1); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java index 611cafb..409962b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java @@ -73,6 +73,8 @@ public class ProducerConfigurationData implements Serializable, Cloneable { // Cannot use Optional<Long> since it's not serializable private Long initialSequenceId = null; + private boolean autoUpdatePartitions = true; + private SortedMap<String, String> properties = new TreeMap<>(); /**