This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a5df1a  Issue 3230:  auto update topic partitions extend for consumer 
and producer (#3513)
3a5df1a is described below

commit 3a5df1ae71a29873fdd91295763342b90f9bc224
Author: Jia Zhai <zhai...@apache.org>
AuthorDate: Fri Feb 8 04:51:47 2019 +0800

    Issue 3230:  auto update topic partitions extend for consumer and producer 
(#3513)
    
    * consumer auto subscribe
    
    * producer update for partiton extends
    
    * change following Matteo's comments
    
    * enable by default
---
 .../api/PartitionedProducerConsumerTest.java       | 112 ++++++++++
 .../pulsar/client/impl/TopicsConsumerImplTest.java |  88 ++++++++
 .../apache/pulsar/client/api/ConsumerBuilder.java  |   9 +
 .../apache/pulsar/client/api/ProducerBuilder.java  |   9 +
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |   7 +-
 .../client/impl/MultiTopicsConsumerImpl.java       | 237 ++++++++++++++++-----
 .../client/impl/PartitionedProducerImpl.java       | 126 ++++++++++-
 .../client/impl/PartitionsChangedListener.java     |  33 +++
 .../impl/PatternMultiTopicsConsumerImpl.java       |   2 +-
 .../pulsar/client/impl/ProducerBuilderImpl.java    |   5 +
 .../impl/conf/ConsumerConfigurationData.java       |   2 +
 .../impl/conf/ProducerConfigurationData.java       |   2 +
 12 files changed, 578 insertions(+), 54 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
index 137c57d..55952b7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
@@ -22,6 +22,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import io.netty.util.Timeout;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -34,6 +35,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.PartitionedProducerImpl;
 import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
 import org.apache.pulsar.common.naming.TopicName;
@@ -667,6 +669,116 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
                 Collections.singletonList(nonPartitionedTopic));
     }
 
+
+    /**
+     * It verifies that consumer producer auto update for partitions extend.
+     *
+     * Steps:
+     * 1. create topic with 2 partitions, and producer consumer
+     * 2. update partition from 2 to 3.
+     * 3. trigger auto update in producer, after produce, consumer will only 
get messages from 2 partitions.
+     * 4. trigger auto update in consumer, after produce, consumer will get 
all messages from 3 partitions.
+     *
+     * @throws Exception
+     */
+    @Test(timeOut = 30000)
+    public void testAutoUpdatePartitionsForProducerConsumer() throws Exception 
{
+        log.info("-- Starting {} test --", methodName);
+        PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
+
+        final int numPartitions = 2;
+        final String topicName = "persistent://my-property/my-ns/my-topic-" + 
System.currentTimeMillis();
+        final String producerMsg = "producerMsg";
+        final int totalMessages = 30;
+
+        admin.topics().createPartitionedTopic(topicName, numPartitions);
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+            .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+            .enableBatching(false)
+            .autoUpdatePartitions(true)
+            .create();
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
+            .subscriptionName("my-partitioned-subscriber")
+            .subscriptionType(SubscriptionType.Shared)
+            .autoUpdatePartitions(true)
+            .subscribe();
+
+        // 1. produce and consume 2 partitions
+        for (int i = 0; i < totalMessages; i++) {
+            producer.send((producerMsg + " first round " + "message index: " + 
i).getBytes());
+        }
+        int messageSet = 0;
+        Message<byte[]> message = consumer.receive();
+        do {
+            messageSet ++;
+            consumer.acknowledge(message);
+            log.info("Consumer acknowledged : " + new 
String(message.getData()));
+            message = consumer.receive(200, TimeUnit.MILLISECONDS);
+        } while (message != null);
+        assertEquals(messageSet, totalMessages);
+
+        // 2. update partition from 2 to 3.
+        admin.topics().updatePartitionedTopic(topicName,3);
+
+        // 3. trigger auto update in producer, after produce, consumer will 
get 2/3 messages.
+        log.info("trigger partitionsAutoUpdateTimerTask for producer");
+        Timeout timeout = 
((PartitionedProducerImpl<byte[]>)producer).getPartitionsAutoUpdateTimeout();
+        timeout.task().run(timeout);
+        Thread.sleep(200);
+
+        for (int i = 0; i < totalMessages; i++) {
+            producer.send((producerMsg + " second round " + "message index: " 
+ i).getBytes());
+        }
+        messageSet = 0;
+        message = consumer.receive();
+        do {
+            messageSet ++;
+            consumer.acknowledge(message);
+            log.info("Consumer acknowledged : " + new 
String(message.getData()));
+            message = consumer.receive(200, TimeUnit.MILLISECONDS);
+        } while (message != null);
+        assertEquals(messageSet, totalMessages * 2 / 3);
+
+        // 4. trigger auto update in consumer, after produce, consumer will 
get all messages.
+        log.info("trigger partitionsAutoUpdateTimerTask for consumer");
+        timeout = 
((MultiTopicsConsumerImpl<byte[]>)consumer).getPartitionsAutoUpdateTimeout();
+        timeout.task().run(timeout);
+        Thread.sleep(200);
+
+        // former produced messages
+        messageSet = 0;
+        message = consumer.receive();
+        do {
+            messageSet ++;
+            consumer.acknowledge(message);
+            log.info("Consumer acknowledged : " + new 
String(message.getData()));
+            message = consumer.receive(200, TimeUnit.MILLISECONDS);
+        } while (message != null);
+        assertEquals(messageSet, totalMessages / 3);
+
+        // former produced messages
+        for (int i = 0; i < totalMessages; i++) {
+            producer.send((producerMsg + " third round " + "message index: " + 
i).getBytes());
+        }
+        messageSet = 0;
+        message = consumer.receive();
+        do {
+            messageSet ++;
+            consumer.acknowledge(message);
+            log.info("Consumer acknowledged : " + new 
String(message.getData()));
+            message = consumer.receive(200, TimeUnit.MILLISECONDS);
+        } while (message != null);
+        assertEquals(messageSet, totalMessages);
+
+        pulsarClient.close();
+        admin.topics().deletePartitionedTopic(topicName);
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+
     private class AlwaysTwoMessageRouter implements MessageRouter {
         @Override
         public int choosePartition(Message<?> msg, TopicMetadata metadata) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index b791033..fc46586 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -20,10 +20,12 @@ package org.apache.pulsar.client.impl;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import com.google.common.collect.Lists;
+import io.netty.util.Timeout;
 import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -672,4 +674,90 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
 
         consumer.close();
     }
+
+
+    /**
+     * Test topic partitions auto subscribed.
+     *
+     * Steps:
+     * 1. Create a consumer with 2 topics, and each topic has 2 partitions: 
xx-partition-0, xx-partition-1.
+     * 2. produce message to xx-partition-2, and verify consumer could not 
receive message.
+     * 3. update topics to have 3 partitions.
+     * 4. trigger partitionsAutoUpdate. this should be done automatically, 
this is to save time to manually trigger.
+     * 5. produce message to xx-partition-2 again,  and verify consumer could 
receive message.
+     *
+     */
+    @Test(timeOut = 30000)
+    public void testTopicAutoUpdatePartitions() throws Exception {
+        String key = "TestTopicAutoUpdatePartitions";
+        final String subscriptionName = "my-ex-subscription-" + key;
+        final String messagePredicate = "my-message-" + key + "-";
+        final int totalMessages = 6;
+
+        final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + 
key;
+        final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + 
key;
+        List<String> topicNames = Lists.newArrayList(topicName1, topicName2);
+
+        admin.tenants().createTenant("prop", new TenantInfo());
+        admin.topics().createPartitionedTopic(topicName1, 2);
+        admin.topics().createPartitionedTopic(topicName2, 2);
+
+        // 1. Create a  consumer
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+            .topics(topicNames)
+            .subscriptionName(subscriptionName)
+            .subscriptionType(SubscriptionType.Shared)
+            .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+            .receiverQueueSize(4)
+            .autoUpdatePartitions(true)
+            .subscribe();
+        assertTrue(consumer instanceof MultiTopicsConsumerImpl);
+
+        MultiTopicsConsumerImpl topicsConsumer = (MultiTopicsConsumerImpl) 
consumer;
+
+        // 2. use partition-2 producer,
+        Producer<byte[]> producer1 = 
pulsarClient.newProducer().topic(topicName1 + "-partition-2")
+            .enableBatching(false)
+            .create();
+        Producer<byte[]> producer2 = 
pulsarClient.newProducer().topic(topicName2 + "-partition-2")
+            .enableBatching(false)
+            .create();
+        for (int i = 0; i < totalMessages; i++) {
+            producer1.send((messagePredicate + "topic1-partition-2 index:" + 
i).getBytes());
+            producer2.send((messagePredicate + "topic2-partition-2 index:" + 
i).getBytes());
+            log.info("produce message to partition-2. message index: {}", i);
+        }
+        // since partition-2 not subscribed,  could not receive any message.
+        Message<byte[]> message = consumer.receive(200, TimeUnit.MILLISECONDS);
+        assertNull(message);
+
+        // 3. update to 3 partitions
+        admin.topics().updatePartitionedTopic(topicName1, 3);
+        admin.topics().updatePartitionedTopic(topicName2, 3);
+
+        // 4. trigger partitionsAutoUpdate. this should be done automatically 
in 1 minutes,
+        // this is to save time to manually trigger.
+        log.info("trigger partitionsAutoUpdateTimerTask");
+        Timeout timeout = topicsConsumer.getPartitionsAutoUpdateTimeout();
+        timeout.task().run(timeout);
+        Thread.sleep(200);
+
+        // 5. produce message to xx-partition-2 again,  and verify consumer 
could receive message.
+        for (int i = 0; i < totalMessages; i++) {
+            producer1.send((messagePredicate + "topic1-partition-2 index:" + 
i).getBytes());
+            producer2.send((messagePredicate + "topic2-partition-2 index:" + 
i).getBytes());
+            log.info("produce message to partition-2 again. messageindex: {}", 
i);
+        }
+        int messageSet = 0;
+        message = consumer.receive();
+        do {
+            messageSet ++;
+            consumer.acknowledge(message);
+            log.info("4 Consumer acknowledged : " + new 
String(message.getData()));
+            message = consumer.receive(200, TimeUnit.MILLISECONDS);
+        } while (message != null);
+        assertEquals(messageSet, 2 * totalMessages);
+
+        consumer.close();
+    }
 }
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 5a8613d..f940640 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -368,4 +368,13 @@ public interface ConsumerBuilder<T> extends Cloneable {
      * then the ack timeout will be set to 30000 millisecond
      */
     ConsumerBuilder<T> deadLetterPolicy(DeadLetterPolicy deadLetterPolicy);
+
+    /**
+     * If enabled, the consumer will auto subscribe for partitions 
increasement.
+     * This is only for partitioned consumer.
+     *
+     * @param autoUpdate
+     *            whether to auto update partition increasement
+     */
+    ConsumerBuilder<T> autoUpdatePartitions(boolean autoUpdate);
 }
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
index dd87685..3b01314 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
@@ -328,4 +328,13 @@ public interface ProducerBuilder<T> extends Cloneable {
      * @return producer builder.
      */
     ProducerBuilder<T> intercept(ProducerInterceptor<T> ... interceptors);
+
+    /**
+     * If enabled, partitioned producer will auto create new producers for new 
partitions.
+     * This is only for partitioned producer.
+     *
+     * @param autoUpdate
+     *            whether to auto update partition increasement
+     */
+    ProducerBuilder<T> autoUpdatePartitions(boolean autoUpdate);
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 09ba946..7e809ee 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -46,7 +46,6 @@ import 
org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
 import org.apache.pulsar.common.util.FutureUtil;
 
 import com.google.common.collect.Lists;
@@ -293,6 +292,12 @@ public class ConsumerBuilderImpl<T> implements 
ConsumerBuilder<T> {
         return this;
     }
 
+    @Override
+    public ConsumerBuilder<T> autoUpdatePartitions(boolean autoUpdate) {
+        conf.setAutoUpdatePartitions(autoUpdate);
+        return this;
+    }
+
     public ConsumerConfigurationData<T> getConf() {
         return conf;
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index e9ef632..39c0511 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -21,6 +21,10 @@ package org.apache.pulsar.client.impl;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -40,12 +44,12 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
-
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerStats;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
+import 
org.apache.pulsar.client.api.PulsarClientException.NotSupportedException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -79,6 +83,11 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     // sum of topicPartitions, simple topic has 1, partitioned topic equals to 
partition number.
     AtomicInteger allTopicPartitionsNumber;
 
+    // timeout related to auto check and subscribe partition increasement
+    private volatile Timeout partitionsAutoUpdateTimeout = null;
+    TopicsPartitionChangedListener topicsPartitionChangedListener;
+    CompletableFuture<Void> partitionsAutoUpdateFuture = null;
+
     private final ReadWriteLock lock = new ReentrantReadWriteLock();
     private final ConsumerStatsRecorder stats;
     private final UnAckedMessageTracker unAckedMessageTracker;
@@ -111,6 +120,13 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         this.internalConfig = getInternalConsumerConfig();
         this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? 
new ConsumerStatsRecorderImpl() : null;
 
+        // start track and auto subscribe partition increasement
+        if (conf.isAutoUpdatePartitions()) {
+            topicsPartitionChangedListener = new 
TopicsPartitionChangedListener();
+            partitionsAutoUpdateTimeout = client.timer()
+                .newTimeout(partitionsAutoUpdateTimerTask, 1, 
TimeUnit.MINUTES);
+        }
+
         if (conf.getTopicNames().isEmpty()) {
             this.namespaceName = null;
             setState(State.Ready);
@@ -126,20 +142,16 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                 .collect(Collectors.toList());
         FutureUtil.waitForAll(futures)
             .thenAccept(finalFuture -> {
-                try {
-                    if (allTopicPartitionsNumber.get() > maxReceiverQueueSize) 
{
-                        
setMaxReceiverQueueSize(allTopicPartitionsNumber.get());
-                    }
-                    setState(State.Ready);
-                    // We have successfully created N consumers, so we can 
start receiving messages now
-                    startReceivingMessages(new 
ArrayList<>(consumers.values()));
-                    subscribeFuture().complete(MultiTopicsConsumerImpl.this);
-                    log.info("[{}] [{}] Created topics consumer with {} 
sub-consumers",
-                        topic, subscription, allTopicPartitionsNumber.get());
-                } catch (PulsarClientException e) {
-                    log.warn("[{}] Failed startReceivingMessages while 
subscribe topics: {}", topic, e.getMessage());
-                    subscribeFuture.completeExceptionally(e);
-                }})
+                if (allTopicPartitionsNumber.get() > maxReceiverQueueSize) {
+                    setMaxReceiverQueueSize(allTopicPartitionsNumber.get());
+                }
+                setState(State.Ready);
+                // We have successfully created N consumers, so we can start 
receiving messages now
+                startReceivingMessages(new ArrayList<>(consumers.values()));
+                log.info("[{}] [{}] Created topics consumer with {} 
sub-consumers",
+                    topic, subscription, allTopicPartitionsNumber.get());
+                subscribeFuture().complete(MultiTopicsConsumerImpl.this);
+            })
             .exceptionally(ex -> {
                 log.warn("[{}] Failed to subscribe topics: {}", topic, 
ex.getMessage());
                 subscribeFuture.completeExceptionally(ex);
@@ -187,7 +199,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         }
     }
 
-    private void startReceivingMessages(List<ConsumerImpl<T>> newConsumers) 
throws PulsarClientException {
+    private void startReceivingMessages(List<ConsumerImpl<T>> newConsumers) {
         if (log.isDebugEnabled()) {
             log.debug("[{}] startReceivingMessages for {} new consumers in 
topics consumer, state: {}",
                 topic, newConsumers.size(), getState());
@@ -431,6 +443,11 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         }
         setState(State.Closing);
 
+        if (partitionsAutoUpdateTimeout != null) {
+            partitionsAutoUpdateTimeout.cancel();
+            partitionsAutoUpdateTimeout = null;
+        }
+
         CompletableFuture<Void> closeFuture = new CompletableFuture<>();
         List<CompletableFuture<Void>> futureList = consumers.values().stream()
             .map(c -> c.closeAsync()).collect(Collectors.toList());
@@ -719,39 +736,35 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
         FutureUtil.waitForAll(futureList)
             .thenAccept(finalFuture -> {
-                try {
-                    if (allTopicPartitionsNumber.get() > maxReceiverQueueSize) 
{
-                        
setMaxReceiverQueueSize(allTopicPartitionsNumber.get());
-                    }
-                    int numTopics = 
this.topics.values().stream().mapToInt(Integer::intValue).sum();
-                    checkState(allTopicPartitionsNumber.get() == numTopics,
-                        "allTopicPartitionsNumber " + 
allTopicPartitionsNumber.get()
-                            + " not equals expected: " + numTopics);
-
-                    // We have successfully created new consumers, so we can 
start receiving messages for them
-                    startReceivingMessages(
-                        consumers.values().stream()
-                            .filter(consumer1 -> {
-                                String consumerTopicName = 
consumer1.getTopic();
-                                if 
(TopicName.get(consumerTopicName).getPartitionedTopicName().equals(
-                                    
TopicName.get(topicName).getPartitionedTopicName().toString())) {
-                                    return true;
-                                } else {
-                                    return false;
-                                }
-                            })
-                            .collect(Collectors.toList()));
-
-                    subscribeResult.complete(null);
-                    log.info("[{}] [{}] Success subscribe new topic {} in 
topics consumer, partitions: {}, allTopicPartitionsNumber: {}",
-                        topic, subscription, topicName, numPartitions, 
allTopicPartitionsNumber.get());
-                    if (this.namespaceName == null) {
-                        this.namespaceName = 
TopicName.get(topicName).getNamespaceObject();
-                    }
-                    return;
-                } catch (PulsarClientException e) {
-                    handleSubscribeOneTopicError(topicName, e, 
subscribeResult);
+                if (allTopicPartitionsNumber.get() > maxReceiverQueueSize) {
+                    setMaxReceiverQueueSize(allTopicPartitionsNumber.get());
                 }
+                int numTopics = 
this.topics.values().stream().mapToInt(Integer::intValue).sum();
+                checkState(allTopicPartitionsNumber.get() == numTopics,
+                    "allTopicPartitionsNumber " + 
allTopicPartitionsNumber.get()
+                        + " not equals expected: " + numTopics);
+
+                // We have successfully created new consumers, so we can start 
receiving messages for them
+                startReceivingMessages(
+                    consumers.values().stream()
+                        .filter(consumer1 -> {
+                            String consumerTopicName = consumer1.getTopic();
+                            if 
(TopicName.get(consumerTopicName).getPartitionedTopicName().equals(
+                                
TopicName.get(topicName).getPartitionedTopicName().toString())) {
+                                return true;
+                            } else {
+                                return false;
+                            }
+                        })
+                        .collect(Collectors.toList()));
+
+                subscribeResult.complete(null);
+                log.info("[{}] [{}] Success subscribe new topic {} in topics 
consumer, partitions: {}, allTopicPartitionsNumber: {}",
+                    topic, subscription, topicName, numPartitions, 
allTopicPartitionsNumber.get());
+                if (this.namespaceName == null) {
+                    this.namespaceName = 
TopicName.get(topicName).getNamespaceObject();
+                }
+                return;
             })
             .exceptionally(ex -> {
                 handleSubscribeOneTopicError(topicName, ex, subscribeResult);
@@ -800,6 +813,11 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                 new PulsarClientException.AlreadyClosedException("Topics 
Consumer was already closed"));
         }
 
+        if (partitionsAutoUpdateTimeout != null) {
+            partitionsAutoUpdateTimeout.cancel();
+            partitionsAutoUpdateTimeout = null;
+        }
+
         CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
         String topicPartName = 
TopicName.get(topicName).getPartitionedTopicName();
 
@@ -867,5 +885,128 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         consumers.forEach((name, consumer) -> consumer.resume());
     }
 
+    // This listener is triggered when topics partitions are updated.
+    private class TopicsPartitionChangedListener implements 
PartitionsChangedListener {
+        // Check partitions changes of passed in topics, and subscribe new 
added partitions.
+        @Override
+        public CompletableFuture<Void> onTopicsExtended(Collection<String> 
topicsExtended) {
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            if (topicsExtended.isEmpty()) {
+                future.complete(null);
+                return future;
+            }
+
+            if (log.isDebugEnabled()) {
+                log.debug("[{}]  run onTopicsExtended: {}, size: {}",
+                    topic, topicsExtended.toString(), topicsExtended.size());
+            }
+
+            List<CompletableFuture<Void>> futureList = 
Lists.newArrayListWithExpectedSize(topicsExtended.size());
+            topicsExtended.forEach(topic -> 
futureList.add(subscribeIncreasedTopicPartitions(topic)));
+            FutureUtil.waitForAll(futureList)
+                .thenAccept(finalFuture -> future.complete(null))
+                .exceptionally(ex -> {
+                    log.warn("[{}] Failed to subscribe increased topics 
partitions: {}", topic, ex.getMessage());
+                    future.completeExceptionally(ex);
+                    return null;
+                });
+
+            return future;
+        }
+    }
+
+    // subscribe increased partitions for a given topic
+    private CompletableFuture<Void> subscribeIncreasedTopicPartitions(String 
topicName) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+
+        client.getPartitionsForTopic(topicName).thenCompose(list -> {
+            int oldPartitionNumber = topics.get(topicName.toString());
+            int currentPartitionNumber = list.size();
+
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] partitions number. old: {}, new: {}",
+                    topicName.toString(), oldPartitionNumber, 
currentPartitionNumber);
+            }
+
+            if (oldPartitionNumber == currentPartitionNumber) {
+                // topic partition number not changed
+                future.complete(null);
+                return future;
+            } else if (oldPartitionNumber < currentPartitionNumber) {
+                List<String> newPartitions = list.subList(oldPartitionNumber, 
currentPartitionNumber);
+                // subscribe new added partitions
+                List<CompletableFuture<Consumer<T>>> futureList = newPartitions
+                    .stream()
+                    .map(partitionName -> {
+                        int partitionIndex = 
TopicName.getPartitionIndex(partitionName);
+                        CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
+                        ConsumerConfigurationData<T> configurationData = 
getInternalConsumerConfig();
+                        ConsumerImpl<T> newConsumer = new ConsumerImpl<>(
+                            client, partitionName, configurationData,
+                            client.externalExecutorProvider().getExecutor(),
+                            partitionIndex, subFuture, schema, interceptors);
+                        consumers.putIfAbsent(newConsumer.getTopic(), 
newConsumer);
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] create consumer {} for 
partitionName: {}",
+                                topicName.toString(), newConsumer.getTopic(), 
partitionName);
+                        }
+                        return subFuture;
+                    })
+                    .collect(Collectors.toList());
+
+                // wait for all partitions subscribe future complete, then 
startReceivingMessages
+                FutureUtil.waitForAll(futureList)
+                    .thenAccept(finalFuture -> {
+                        List<ConsumerImpl<T>> newConsumerList = 
newPartitions.stream()
+                            .map(partitionTopic -> 
consumers.get(partitionTopic))
+                            .collect(Collectors.toList());
+
+                        startReceivingMessages(newConsumerList);
+                        future.complete(null);
+                    })
+                    .exceptionally(ex -> {
+                        log.warn("[{}] Failed to subscribe {} partition: {} - 
{}",
+                            topic, topicName.toString(), oldPartitionNumber, 
currentPartitionNumber, ex.getMessage());
+                        future.completeExceptionally(ex);
+                        return null;
+                    });
+            } else {
+                log.error("[{}] not support shrink topic partitions. old: {}, 
new: {}",
+                    topicName.toString(), oldPartitionNumber, 
currentPartitionNumber);
+                future.completeExceptionally(new NotSupportedException("not 
support shrink topic partitions"));
+            }
+            return future;
+        });
+
+        return future;
+    }
+
+    private TimerTask partitionsAutoUpdateTimerTask = new TimerTask() {
+        @Override
+        public void run(Timeout timeout) throws Exception {
+            if (timeout.isCancelled() || getState() != State.Ready) {
+                return;
+            }
+
+            if (log.isDebugEnabled()) {
+                log.debug("[{}]  run partitionsAutoUpdateTimerTask for 
multiTopicsConsumer: {}", topic);
+            }
+
+            // if last auto update not completed yet, do nothing.
+            if (partitionsAutoUpdateFuture == null || 
partitionsAutoUpdateFuture.isDone()) {
+                partitionsAutoUpdateFuture = 
topicsPartitionChangedListener.onTopicsExtended(topics.keySet());
+            }
+
+            // schedule the next re-check task
+            partitionsAutoUpdateTimeout = client.timer()
+                .newTimeout(partitionsAutoUpdateTimerTask, 1, 
TimeUnit.MINUTES);
+        }
+    };
+
+    @VisibleForTesting
+    public Timeout getPartitionsAutoUpdateTimeout() {
+        return partitionsAutoUpdateTimeout;
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(MultiTopicsConsumerImpl.class);
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 9b862df..bfc6645 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -21,13 +21,18 @@ package org.apache.pulsar.client.impl;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
-
 import java.util.stream.Collectors;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -35,6 +40,7 @@ import org.apache.pulsar.client.api.MessageRouter;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
+import 
org.apache.pulsar.client.api.PulsarClientException.NotSupportedException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TopicMetadata;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
@@ -43,8 +49,6 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-
 public class PartitionedProducerImpl<T> extends ProducerBase<T> {
 
     private static final Logger log = 
LoggerFactory.getLogger(PartitionedProducerImpl.class);
@@ -52,7 +56,12 @@ public class PartitionedProducerImpl<T> extends 
ProducerBase<T> {
     private List<ProducerImpl<T>> producers;
     private MessageRouter routerPolicy;
     private final ProducerStatsRecorderImpl stats;
-    private final TopicMetadata topicMetadata;
+    private TopicMetadata topicMetadata;
+
+    // timeout related to auto check and subscribe partition increasement
+    private volatile Timeout partitionsAutoUpdateTimeout = null;
+    TopicsPartitionChangedListener topicsPartitionChangedListener;
+    CompletableFuture<Void> partitionsAutoUpdateFuture = null;
 
     public PartitionedProducerImpl(PulsarClientImpl client, String topic, 
ProducerConfigurationData conf, int numPartitions,
             CompletableFuture<Producer<T>> producerCreatedFuture, Schema<T> 
schema, ProducerInterceptors<T> interceptors) {
@@ -66,6 +75,13 @@ public class PartitionedProducerImpl<T> extends 
ProducerBase<T> {
                 conf.getMaxPendingMessagesAcrossPartitions() / numPartitions);
         conf.setMaxPendingMessages(maxPendingMessages);
         start();
+
+        // start track and auto subscribe partition increasement
+        if (conf.isAutoUpdatePartitions()) {
+            topicsPartitionChangedListener = new 
TopicsPartitionChangedListener();
+            partitionsAutoUpdateTimeout = client.timer()
+                .newTimeout(partitionsAutoUpdateTimerTask, 1, 
TimeUnit.MINUTES);
+        }
     }
 
     private MessageRouter getMessageRouter() {
@@ -192,6 +208,11 @@ public class PartitionedProducerImpl<T> extends 
ProducerBase<T> {
         }
         setState(State.Closing);
 
+        if (partitionsAutoUpdateTimeout != null) {
+            partitionsAutoUpdateTimeout.cancel();
+            partitionsAutoUpdateTimeout = null;
+        }
+
         AtomicReference<Throwable> closeFail = new 
AtomicReference<Throwable>();
         AtomicInteger completed = new 
AtomicInteger(topicMetadata.numPartitions());
         CompletableFuture<Void> closeFuture = new CompletableFuture<>();
@@ -244,4 +265,101 @@ public class PartitionedProducerImpl<T> extends 
ProducerBase<T> {
         return "partition-producer";
     }
 
+    // This listener is triggered when topics partitions are updated.
+    private class TopicsPartitionChangedListener implements 
PartitionsChangedListener {
+        // Check partitions changes of passed in topics, and add new topic 
partitions.
+        @Override
+        public CompletableFuture<Void> onTopicsExtended(Collection<String> 
topicsExtended) {
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            if (topicsExtended.isEmpty() || !topicsExtended.contains(topic)) {
+                future.complete(null);
+                return future;
+            }
+
+            client.getPartitionsForTopic(topic).thenCompose(list -> {
+                int oldPartitionNumber = topicMetadata.numPartitions();
+                int currentPartitionNumber = list.size();
+
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] partitions number. old: {}, new: {}",
+                        topic, oldPartitionNumber, currentPartitionNumber);
+                }
+
+                if (oldPartitionNumber == currentPartitionNumber) {
+                    // topic partition number not changed
+                    future.complete(null);
+                    return future;
+                } else if (oldPartitionNumber < currentPartitionNumber) {
+                    List<CompletableFuture<Producer<T>>> futureList = list
+                        .subList(oldPartitionNumber, currentPartitionNumber)
+                        .stream()
+                        .map(partitionName -> {
+                            int partitionIndex = 
TopicName.getPartitionIndex(partitionName);
+                            ProducerImpl<T> producer =
+                                new ProducerImpl<>(client,
+                                    partitionName, conf, new 
CompletableFuture<>(),
+                                    partitionIndex, schema, interceptors);
+                            producers.add(producer);
+                            return producer.producerCreatedFuture();
+                        }).collect(Collectors.toList());
+
+                    FutureUtil.waitForAll(futureList)
+                        .thenAccept(finalFuture -> {
+                            if (log.isDebugEnabled()) {
+                                log.debug("[{}] success create producers for 
extended partitions. old: {}, new: {}",
+                                    topic, oldPartitionNumber, 
currentPartitionNumber);
+                            }
+                            topicMetadata = new 
TopicMetadataImpl(currentPartitionNumber);
+                            future.complete(null);
+                        })
+                        .exceptionally(ex -> {
+                            // error happened, remove
+                            log.warn("[{}] fail create producers for extended 
partitions. old: {}, new: {}",
+                                topic, oldPartitionNumber, 
currentPartitionNumber);
+                            List<ProducerImpl<T>> sublist = 
producers.subList(oldPartitionNumber, producers.size());
+                            sublist.forEach(newProducer -> 
newProducer.closeAsync());
+                            sublist.clear();
+                            future.completeExceptionally(ex);
+                            return null;
+                        });
+                    return null;
+                } else {
+                    log.error("[{}] not support shrink topic partitions. old: 
{}, new: {}",
+                        topic, oldPartitionNumber, currentPartitionNumber);
+                    future.completeExceptionally(new 
NotSupportedException("not support shrink topic partitions"));
+                }
+                return future;
+            });
+
+            return future;
+        }
+    }
+
+    private TimerTask partitionsAutoUpdateTimerTask = new TimerTask() {
+        @Override
+        public void run(Timeout timeout) throws Exception {
+            if (timeout.isCancelled() || getState() != State.Ready) {
+                return;
+            }
+
+            if (log.isDebugEnabled()) {
+                log.debug("[{}]  run partitionsAutoUpdateTimerTask for 
partitioned producer: {}", topic);
+            }
+
+            // if last auto update not completed yet, do nothing.
+            if (partitionsAutoUpdateFuture == null || 
partitionsAutoUpdateFuture.isDone()) {
+                partitionsAutoUpdateFuture = 
topicsPartitionChangedListener.onTopicsExtended(ImmutableList.of(topic));
+            }
+
+            // schedule the next re-check task
+            partitionsAutoUpdateTimeout = client.timer()
+                .newTimeout(partitionsAutoUpdateTimerTask, 1, 
TimeUnit.MINUTES);
+        }
+    };
+
+    @VisibleForTesting
+    public Timeout getPartitionsAutoUpdateTimeout() {
+        return partitionsAutoUpdateTimeout;
+    }
+
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionsChangedListener.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionsChangedListener.java
new file mode 100644
index 0000000..48de279
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionsChangedListener.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Listener that notified when concerned topic partitions changed.
+ */
+public interface PartitionsChangedListener {
+    /**
+     * Notified when topic partitions increased.
+     * Passed in topics that have partitions increased.
+     */
+    CompletableFuture<Void> onTopicsExtended(Collection<String> 
topicsExtended);
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index a33b790..47e3236 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -103,7 +103,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T
         });
 
         // schedule the next re-check task
-        client.timer().newTimeout(PatternMultiTopicsConsumerImpl.this,
+        recheckPatternTimeout = 
client.timer().newTimeout(PatternMultiTopicsConsumerImpl.this,
             Math.min(1, conf.getPatternAutoDiscoveryPeriod()), 
TimeUnit.MINUTES);
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index 92526f5..193d034 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -255,6 +255,11 @@ public class ProducerBuilderImpl<T> implements 
ProducerBuilder<T> {
         interceptorList.addAll(Arrays.asList(interceptors));
         return this;
     }
+    @Override
+    public ProducerBuilder<T> autoUpdatePartitions(boolean autoUpdate) {
+        conf.setAutoUpdatePartitions(autoUpdate);
+        return this;
+    }
 
     private void setMessageRoutingMode() throws PulsarClientException {
         if(conf.getMessageRoutingMode() == null && 
conf.getCustomMessageRouter() == null) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 8e1a723..0f61cbb 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -90,6 +90,8 @@ public class ConsumerConfigurationData<T> implements 
Serializable, Cloneable {
 
     private DeadLetterPolicy deadLetterPolicy;
 
+    private boolean autoUpdatePartitions = true;
+
     @JsonIgnore
     public String getSingleTopic() {
         checkArgument(topicNames.size() == 1);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
index 611cafb..409962b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
@@ -73,6 +73,8 @@ public class ProducerConfigurationData implements 
Serializable, Cloneable {
     // Cannot use Optional<Long> since it's not serializable
     private Long initialSequenceId = null;
 
+    private boolean autoUpdatePartitions = true;
+
     private SortedMap<String, String> properties = new TreeMap<>();
 
     /**

Reply via email to