This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1dd9c43  Delete PartitionedConsumerImpl, use TopicsConsumerImpl 
instead (#1365)
1dd9c43 is described below

commit 1dd9c43e8e266d3957177354f592015c6f89c6d6
Author: Jia Zhai <zhaiji...@gmail.com>
AuthorDate: Tue Apr 3 22:03:55 2018 -0700

    Delete PartitionedConsumerImpl, use TopicsConsumerImpl instead (#1365)
    
    * delete partitionedConsumer, use topicsConsumer instead
    
    * change following comments
    
    * rebase master, rename TopicsConsumerImpl to MultiTopicsConsumerImpl
    
    * avoid dup calling getPartitionedTopicMetadata
    
    * rebase master, fix test error
---
 .../broker/service/PersistentFailoverE2ETest.java  |   7 +-
 .../apache/pulsar/client/impl/MessageIdTest.java   |   7 +-
 .../client/impl/PatternTopicsConsumerImplTest.java |  76 +--
 .../PerMessageUnAcknowledgedRedeliveryTest.java    |   6 +-
 .../pulsar/client/impl/TopicsConsumerImplTest.java |  46 +-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   2 +-
 ...sumerImpl.java => MultiTopicsConsumerImpl.java} | 283 ++++++-----
 .../client/impl/PartitionedConsumerImpl.java       | 553 ---------------------
 ...pl.java => PatternMultiTopicsConsumerImpl.java} |  24 +-
 .../pulsar/client/impl/PulsarClientImpl.java       |   8 +-
 .../pulsar/client/impl/TopicMessageImpl.java       |   6 +-
 11 files changed, 263 insertions(+), 755 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
index 6176821..08cd7f6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -323,13 +324,11 @@ public class PersistentFailoverE2ETest extends 
BrokerTestBase {
         final String subName = "sub1";
         final int numMsgs = 100;
         Set<String> uniqueMessages = new HashSet<>();
-
         admin.persistentTopics().createPartitionedTopic(topicName, 
numPartitions);
 
         ConsumerBuilder<byte[]> consumerBuilder = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
                 .subscriptionType(SubscriptionType.Failover);
 
-
         // 1. two consumers on the same subscription
         ActiveInactiveListenerEvent listener1 = new 
ActiveInactiveListenerEvent();
         ActiveInactiveListenerEvent listener2 = new 
ActiveInactiveListenerEvent();
@@ -374,7 +373,7 @@ public class PersistentFailoverE2ETest extends 
BrokerTestBase {
             }
             totalMessages++;
             consumer1.acknowledge(msg);
-            MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
+            MessageIdImpl msgId = (MessageIdImpl) 
(((TopicMessageImpl)msg).getInnerMessageId());
             receivedPtns.add(msgId.getPartitionIndex());
         }
 
@@ -391,7 +390,7 @@ public class PersistentFailoverE2ETest extends 
BrokerTestBase {
             }
             totalMessages++;
             consumer2.acknowledge(msg);
-            MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
+            MessageIdImpl msgId = (MessageIdImpl) 
(((TopicMessageImpl)msg).getInnerMessageId());
             receivedPtns.add(msgId.getPartitionIndex());
         }
         assertTrue(Sets.difference(listener1.inactivePtns, 
receivedPtns).isEmpty());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
index 4a9912d..6d6fc92 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
@@ -208,7 +208,8 @@ public class MessageIdTest extends BrokerTestBase {
         Assert.assertEquals(messageIds.size(), numberOfMessages, "Not all 
messages published successfully");
 
         for (int i = 0; i < numberOfMessages; i++) {
-            MessageId messageId = consumer.receive().getMessageId();
+            MessageId topicMessageId = consumer.receive().getMessageId();
+            MessageId messageId = 
((TopicMessageIdImpl)topicMessageId).getInnerMessageId();
             log.info("Message ID Received = " + messageId);
             Assert.assertTrue(messageIds.remove(messageId), "Failed to receive 
Message");
         }
@@ -247,7 +248,9 @@ public class MessageIdTest extends BrokerTestBase {
         Assert.assertEquals(messageIds.size(), numberOfMessages, "Not all 
messages published successfully");
 
         for (int i = 0; i < numberOfMessages; i++) {
-            
Assert.assertTrue(messageIds.remove(consumer.receive().getMessageId()), "Failed 
to receive Message");
+            MessageId topicMessageId = consumer.receive().getMessageId();
+            MessageId messageId = 
((TopicMessageIdImpl)topicMessageId).getInnerMessageId();
+            Assert.assertTrue(messageIds.remove(messageId), "Failed to receive 
Message");
         }
         log.info("Message IDs = " + messageIds);
         Assert.assertEquals(messageIds.size(), 0, "Not all messages received 
successfully");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 4f9cc70..b3e74f7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -161,13 +161,13 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
             .subscribe();
 
         // 4. verify consumer get methods, to get right number of partitions 
and topics.
-        assertSame(pattern, ((PatternTopicsConsumerImpl<?>) 
consumer).getPattern());
-        List<String> topics = ((PatternTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics();
-        List<ConsumerImpl<byte[]>> consumers = 
((PatternTopicsConsumerImpl<byte[]>) consumer).getConsumers();
+        assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern());
+        List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics();
+        List<ConsumerImpl<byte[]>> consumers = 
((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers();
 
         assertEquals(topics.size(), 6);
         assertEquals(consumers.size(), 6);
-        assertEquals(((PatternTopicsConsumerImpl<?>) 
consumer).getTopics().size(), 3);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getTopics().size(), 3);
 
         topics.forEach(topic -> log.debug("topic: {}", topic));
         consumers.forEach(c -> log.debug("consumer: {}", c.getTopic()));
@@ -175,7 +175,7 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
         IntStream.range(0, topics.size()).forEach(index ->
             
assertTrue(topics.get(index).equals(consumers.get(index).getTopic())));
 
-        ((PatternTopicsConsumerImpl<?>) consumer).getTopics().forEach(topic -> 
log.debug("getTopics topic: {}", topic));
+        ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic));
 
         // 5. produce data
         for (int i = 0; i < totalMessages / 3; i++) {
@@ -235,8 +235,8 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
         List<String> oldNames = Lists.newArrayList(topicName1, topicName2, 
topicName3, topicName4);
         List<String> newNames = Lists.newArrayList(topicName3, topicName4, 
topicName5, topicName6);
 
-        List<String> addedNames = 
PatternTopicsConsumerImpl.topicsListsMinus(newNames, oldNames);
-        List<String> removedNames = 
PatternTopicsConsumerImpl.topicsListsMinus(oldNames, newNames);
+        List<String> addedNames = 
PatternMultiTopicsConsumerImpl.topicsListsMinus(newNames, oldNames);
+        List<String> removedNames = 
PatternMultiTopicsConsumerImpl.topicsListsMinus(oldNames, newNames);
 
         assertTrue(addedNames.size() == 2 &&
             addedNames.contains(topicName5) &&
@@ -246,21 +246,21 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
             removedNames.contains(topicName2));
 
         // totally 2 different list, should return content of first lists.
-        List<String> addedNames2 = 
PatternTopicsConsumerImpl.topicsListsMinus(addedNames, removedNames);
+        List<String> addedNames2 = 
PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames, removedNames);
         assertTrue(addedNames2.size() == 2 &&
             addedNames2.contains(topicName5) &&
             addedNames2.contains(topicName6));
 
         // 2 same list, should return empty list.
-        List<String> addedNames3 = 
PatternTopicsConsumerImpl.topicsListsMinus(addedNames, addedNames);
+        List<String> addedNames3 = 
PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames, addedNames);
         assertEquals(addedNames3.size(), 0);
 
         // empty list minus: addedNames2.size = 2, addedNames3.size = 0
-        List<String> addedNames4 = 
PatternTopicsConsumerImpl.topicsListsMinus(addedNames2, addedNames3);
+        List<String> addedNames4 = 
PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames2, addedNames3);
         assertTrue(addedNames4.size() == addedNames2.size());
         addedNames4.forEach(name -> assertTrue(addedNames2.contains(name)));
 
-        List<String> addedNames5 = 
PatternTopicsConsumerImpl.topicsListsMinus(addedNames3, addedNames2);
+        List<String> addedNames5 = 
PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames3, addedNames2);
         assertEquals(addedNames5.size(), 0);
     }
 
@@ -290,10 +290,10 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
             .subscribe();
 
         // 3. verify consumer get methods, to get 0 number of partitions and 
topics.
-        assertSame(pattern, ((PatternTopicsConsumerImpl<?>) 
consumer).getPattern());
-        assertEquals(((PatternTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 0);
-        assertEquals(((PatternTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 0);
-        assertEquals(((PatternTopicsConsumerImpl<?>) 
consumer).getTopics().size(), 0);
+        assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern());
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 0);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 0);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getTopics().size(), 0);
 
         // 4. create producer
         String messagePredicate = "my-message-" + key + "-";
@@ -310,15 +310,15 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
 
         // 5. call recheckTopics to subscribe each added topics above
         log.debug("recheck topics change");
-        PatternTopicsConsumerImpl<byte[]> consumer1 = 
((PatternTopicsConsumerImpl<byte[]>) consumer);
+        PatternMultiTopicsConsumerImpl<byte[]> consumer1 = 
((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
         consumer1.run(consumer1.getRecheckPatternTimeout());
         Thread.sleep(100);
 
         // 6. verify consumer get methods, to get number of partitions and 
topics, value 6=1+2+3.
-        assertSame(pattern, ((PatternTopicsConsumerImpl<?>) 
consumer).getPattern());
-        assertEquals(((PatternTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 6);
-        assertEquals(((PatternTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 6);
-        assertEquals(((PatternTopicsConsumerImpl<?>) 
consumer).getTopics().size(), 3);
+        assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern());
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 6);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 6);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getTopics().size(), 3);
 
 
         // 7. produce data
@@ -384,13 +384,13 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
             .receiverQueueSize(4)
             .subscribe();
 
-        assertTrue(consumer instanceof PatternTopicsConsumerImpl);
+        assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);
 
         // 4. verify consumer get methods, to get 6 number of partitions and 
topics: 6=1+2+3
-        assertSame(pattern, ((PatternTopicsConsumerImpl<?>) 
consumer).getPattern());
-        assertEquals(((PatternTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 6);
-        assertEquals(((PatternTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 6);
-        assertEquals(((PatternTopicsConsumerImpl<?>) 
consumer).getTopics().size(), 3);
+        assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern());
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 6);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 6);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getTopics().size(), 3);
 
         // 5. produce data to topic 1,2,3; verify should receive all the 
message
         for (int i = 0; i < totalMessages / 3; i++) {
@@ -419,12 +419,12 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
 
         // 7. call recheckTopics to subscribe each added topics above, verify 
topics number: 10=1+2+3+4
         log.debug("recheck topics change");
-        PatternTopicsConsumerImpl<byte[]> consumer1 = 
((PatternTopicsConsumerImpl<byte[]>) consumer);
+        PatternMultiTopicsConsumerImpl<byte[]> consumer1 = 
((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
         consumer1.run(consumer1.getRecheckPatternTimeout());
         Thread.sleep(100);
-        assertEquals(((PatternTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 10);
-        assertEquals(((PatternTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 10);
-        assertEquals(((PatternTopicsConsumerImpl<?>) 
consumer).getTopics().size(), 4);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 10);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 10);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getTopics().size(), 4);
 
         // 8. produce data to topic3 and topic4, verify should receive all the 
message
         for (int i = 0; i < totalMessages / 2; i++) {
@@ -487,13 +487,13 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
             .receiverQueueSize(4)
             .subscribe();
 
-        assertTrue(consumer instanceof PatternTopicsConsumerImpl);
+        assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);
 
         // 4. verify consumer get methods, to get 0 number of partitions and 
topics: 6=1+2+3
-        assertSame(pattern, ((PatternTopicsConsumerImpl<?>) 
consumer).getPattern());
-        assertEquals(((PatternTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 6);
-        assertEquals(((PatternTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 6);
-        assertEquals(((PatternTopicsConsumerImpl<?>) 
consumer).getTopics().size(), 3);
+        assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern());
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 6);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 6);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getTopics().size(), 3);
 
         // 5. produce data to topic 1,2,3; verify should receive all the 
message
         for (int i = 0; i < totalMessages / 3; i++) {
@@ -521,12 +521,12 @@ public class PatternTopicsConsumerImplTest extends 
ProducerConsumerBase {
 
         // 7. call recheckTopics to unsubscribe topic 1,3 , verify topics 
number: 2=6-1-3
         log.debug("recheck topics change");
-        PatternTopicsConsumerImpl<byte[]> consumer1 = 
((PatternTopicsConsumerImpl<byte[]>) consumer);
+        PatternMultiTopicsConsumerImpl<byte[]> consumer1 = 
((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
         consumer1.run(consumer1.getRecheckPatternTimeout());
         Thread.sleep(100);
-        assertEquals(((PatternTopicsConsumerImpl<byte[]>) 
consumer).getPartitionedTopics().size(), 2);
-        assertEquals(((PatternTopicsConsumerImpl<byte[]>) 
consumer).getConsumers().size(), 2);
-        assertEquals(((PatternTopicsConsumerImpl<byte[]>) 
consumer).getTopics().size(), 1);
+        assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) 
consumer).getPartitionedTopics().size(), 2);
+        assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) 
consumer).getConsumers().size(), 2);
+        assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) 
consumer).getTopics().size(), 1);
 
         // 8. produce data to topic2, verify should receive all the message
         for (int i = 0; i < totalMessages; i++) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
index 7a94cc2..053cb5e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
@@ -338,7 +338,7 @@ public class PerMessageUnAcknowledgedRedeliveryTest extends 
BrokerTestBase {
     }
 
     private static long 
getUnackedMessagesCountInPartitionedConsumer(Consumer<byte[]> c) {
-        PartitionedConsumerImpl<byte[]> pc = (PartitionedConsumerImpl<byte[]>) 
c;
+        MultiTopicsConsumerImpl<byte[]> pc = (MultiTopicsConsumerImpl<byte[]>) 
c;
         return pc.getUnAckedMessageTracker().size()
                 + pc.getConsumers().stream().mapToLong(consumer -> 
consumer.getUnAckedMessageTracker().size()).sum();
     }
@@ -405,8 +405,8 @@ public class PerMessageUnAcknowledgedRedeliveryTest extends 
BrokerTestBase {
         assertEquals(received, 5);
 
         // 7. Simulate ackTimeout
-        ((PartitionedConsumerImpl<byte[]>) 
consumer).getUnAckedMessageTracker().toggle();
-        ((PartitionedConsumerImpl<byte[]>) consumer).getConsumers().forEach(c 
-> c.getUnAckedMessageTracker().toggle());
+        ((MultiTopicsConsumerImpl<byte[]>) 
consumer).getUnAckedMessageTracker().toggle();
+        ((MultiTopicsConsumerImpl<byte[]>) consumer).getConsumers().forEach(c 
-> c.getUnAckedMessageTracker().toggle());
 
         // 8. producer publish more messages
         for (int i = 0; i < totalMessages / 3; i++) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 952dfac..6aa574d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -116,10 +116,10 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
             .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
             .receiverQueueSize(4)
             .subscribe();
-        assertTrue(consumer instanceof TopicsConsumerImpl);
+        assertTrue(consumer instanceof MultiTopicsConsumerImpl);
 
-        List<String> topics = ((TopicsConsumerImpl<byte[]>) 
consumer).getPartitionedTopics();
-        List<ConsumerImpl<byte[]>> consumers = ((TopicsConsumerImpl) 
consumer).getConsumers();
+        List<String> topics = ((MultiTopicsConsumerImpl<byte[]>) 
consumer).getPartitionedTopics();
+        List<ConsumerImpl<byte[]>> consumers = ((MultiTopicsConsumerImpl) 
consumer).getConsumers();
 
         topics.forEach(topic -> log.info("topic: {}", topic));
         consumers.forEach(c -> log.info("consumer: {}", c.getTopic()));
@@ -127,7 +127,7 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
         IntStream.range(0, 6).forEach(index ->
             
assertTrue(topics.get(index).equals(consumers.get(index).getTopic())));
 
-        assertTrue(((TopicsConsumerImpl<byte[]>) consumer).getTopics().size() 
== 3);
+        assertTrue(((MultiTopicsConsumerImpl<byte[]>) 
consumer).getTopics().size() == 3);
 
         consumer.unsubscribe();
         consumer.close();
@@ -167,7 +167,7 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
             .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
             .receiverQueueSize(4)
             .subscribe();
-        assertTrue(consumer instanceof TopicsConsumerImpl);
+        assertTrue(consumer instanceof MultiTopicsConsumerImpl);
 
         // 3. producer publish messages
         for (int i = 0; i < totalMessages / 3; i++) {
@@ -228,7 +228,7 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
             .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
             .receiverQueueSize(4)
             .subscribe();
-        assertTrue(consumer instanceof TopicsConsumerImpl);
+        assertTrue(consumer instanceof MultiTopicsConsumerImpl);
 
         // Asynchronously produce messages
         List<Future<MessageId>> futures = Lists.newArrayList();
@@ -307,7 +307,7 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
             .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
             .receiverQueueSize(4)
             .subscribe();
-        assertTrue(consumer instanceof TopicsConsumerImpl);
+        assertTrue(consumer instanceof MultiTopicsConsumerImpl);
 
         // 3. producer publish messages
         for (int i = 0; i < totalMessages / 3; i++) {
@@ -323,7 +323,7 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
             log.debug("Consumer received : " + new String(message.getData()));
             message = consumer.receive(500, TimeUnit.MILLISECONDS);
         }
-        long size = ((TopicsConsumerImpl<byte[]>) 
consumer).getUnAckedMessageTracker().size();
+        long size = ((MultiTopicsConsumerImpl<byte[]>) 
consumer).getUnAckedMessageTracker().size();
         log.debug(key + " Unacked Message Tracker size is " + size);
         assertEquals(size, totalMessages);
 
@@ -338,7 +338,7 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
             message = consumer.receive(500, TimeUnit.MILLISECONDS);
         } while (message != null);
 
-        size = ((TopicsConsumerImpl<byte[]>) 
consumer).getUnAckedMessageTracker().size();
+        size = ((MultiTopicsConsumerImpl<byte[]>) 
consumer).getUnAckedMessageTracker().size();
         log.debug(key + " Unacked Message Tracker size is " + size);
         assertEquals(size, 0);
         assertEquals(hSet.size(), totalMessages);
@@ -361,14 +361,14 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
             consumer.acknowledge(message);
             message = consumer.receive(100, TimeUnit.MILLISECONDS);
         }
-        size = ((TopicsConsumerImpl<byte[]>) 
consumer).getUnAckedMessageTracker().size();
+        size = ((MultiTopicsConsumerImpl<byte[]>) 
consumer).getUnAckedMessageTracker().size();
         log.debug(key + " Unacked Message Tracker size is " + size);
         assertEquals(size, 0);
         assertEquals(received, totalMessages);
 
         // 8. Simulate ackTimeout
-        ((TopicsConsumerImpl<byte[]>) 
consumer).getUnAckedMessageTracker().toggle();
-        ((TopicsConsumerImpl<byte[]>) consumer).getConsumers().forEach(c -> 
c.getUnAckedMessageTracker().toggle());
+        ((MultiTopicsConsumerImpl<byte[]>) 
consumer).getUnAckedMessageTracker().toggle();
+        ((MultiTopicsConsumerImpl<byte[]>) consumer).getConsumers().forEach(c 
-> c.getUnAckedMessageTracker().toggle());
 
         // 9. producer publish more messages
         for (int i = 0; i < totalMessages / 3; i++) {
@@ -384,7 +384,7 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
             log.debug("Consumer received : " + data);
             message = consumer.receive(100, TimeUnit.MILLISECONDS);
         }
-        size = ((TopicsConsumerImpl<byte[]>) 
consumer).getUnAckedMessageTracker().size();
+        size = ((MultiTopicsConsumerImpl<byte[]>) 
consumer).getUnAckedMessageTracker().size();
         log.debug(key + " Unacked Message Tracker size is " + size);
         assertEquals(size, 30);
 
@@ -402,7 +402,7 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
             message = consumer.receive(100, TimeUnit.MILLISECONDS);
         }
         assertEquals(redelivered, 30);
-        size =  ((TopicsConsumerImpl<byte[]>) 
consumer).getUnAckedMessageTracker().size();
+        size =  ((MultiTopicsConsumerImpl<byte[]>) 
consumer).getUnAckedMessageTracker().size();
         log.info(key + " Unacked Message Tracker size is " + size);
         assertEquals(size, 0);
 
@@ -447,7 +447,7 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
             .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
             .receiverQueueSize(4)
             .subscribe();
-        assertTrue(consumer instanceof TopicsConsumerImpl);
+        assertTrue(consumer instanceof MultiTopicsConsumerImpl);
 
         // 3. producer publish messages
         for (int i = 0; i < totalMessages / 3; i++) {
@@ -468,7 +468,7 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
         assertEquals(messageSet, totalMessages);
 
         // 4, unsubscribe topic3
-        CompletableFuture<Void> unsubFuture = ((TopicsConsumerImpl<byte[]>) 
consumer).unsubscribeAsync(topicName3);
+        CompletableFuture<Void> unsubFuture = 
((MultiTopicsConsumerImpl<byte[]>) consumer).unsubscribeAsync(topicName3);
         unsubFuture.get();
 
         // 5. producer publish messages
@@ -491,15 +491,15 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
         assertEquals(messageSet, totalMessages * 2 / 3);
 
         // 7. use getter to verify internal topics number after un-subscribe 
topic3
-        List<String> topics = ((TopicsConsumerImpl<byte[]>) 
consumer).getPartitionedTopics();
-        List<ConsumerImpl<byte[]>> consumers = ((TopicsConsumerImpl) 
consumer).getConsumers();
+        List<String> topics = ((MultiTopicsConsumerImpl<byte[]>) 
consumer).getPartitionedTopics();
+        List<ConsumerImpl<byte[]>> consumers = ((MultiTopicsConsumerImpl) 
consumer).getConsumers();
 
         assertEquals(topics.size(), 3);
         assertEquals(consumers.size(), 3);
-        assertTrue(((TopicsConsumerImpl<byte[]>) consumer).getTopics().size() 
== 2);
+        assertTrue(((MultiTopicsConsumerImpl<byte[]>) 
consumer).getTopics().size() == 2);
 
         // 8. re-subscribe topic3
-        CompletableFuture<Void> subFuture = 
((TopicsConsumerImpl<byte[]>)consumer).subscribeAsync(topicName3);
+        CompletableFuture<Void> subFuture = 
((MultiTopicsConsumerImpl<byte[]>)consumer).subscribeAsync(topicName3);
         subFuture.get();
 
         // 9. producer publish messages
@@ -522,12 +522,12 @@ public class TopicsConsumerImplTest extends 
ProducerConsumerBase {
         assertEquals(messageSet, totalMessages);
 
         // 11. use getter to verify internal topics number after subscribe 
topic3
-        topics = ((TopicsConsumerImpl<byte[]>) 
consumer).getPartitionedTopics();
-        consumers = ((TopicsConsumerImpl) consumer).getConsumers();
+        topics = ((MultiTopicsConsumerImpl<byte[]>) 
consumer).getPartitionedTopics();
+        consumers = ((MultiTopicsConsumerImpl) consumer).getConsumers();
 
         assertEquals(topics.size(), 6);
         assertEquals(consumers.size(), 6);
-        assertTrue(((TopicsConsumerImpl<byte[]>) consumer).getTopics().size() 
== 3);
+        assertTrue(((MultiTopicsConsumerImpl<byte[]>) 
consumer).getTopics().size() == 3);
 
         consumer.unsubscribe();
         consumer.close();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 2cbd642..2a88bf0 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -914,7 +914,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 id = new MessageIdImpl(id.getLedgerId(), id.getEntryId(), 
getPartitionIndex());
             }
             if (partitionIndex != -1) {
-                // we should no longer track this message, 
PartitionedConsumerImpl will take care from now onwards
+                // we should no longer track this message, TopicsConsumer will 
take care from now onwards
                 unAckedMessageTracker.remove(id);
             } else {
                 unAckedMessageTracker.add(id);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
similarity index 74%
rename from 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
rename to 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 1089cef..6da72ec 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -56,7 +56,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class TopicsConsumerImpl<T> extends ConsumerBase<T> {
+public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
 
     // All topics should be in same namespace
     protected NamespaceName namespaceName;
@@ -76,15 +76,15 @@ public class TopicsConsumerImpl<T> extends ConsumerBase<T> {
     private final int sharedQueueResumeThreshold;
 
     // sum of topicPartitions, simple topic has 1, partitioned topic equals to 
partition number.
-    AtomicInteger numberTopicPartitions;
+    AtomicInteger allTopicPartitionsNumber;
 
     private final ReadWriteLock lock = new ReentrantReadWriteLock();
     private final ConsumerStatsRecorder stats;
     private final UnAckedMessageTracker unAckedMessageTracker;
     private final ConsumerConfigurationData<T> internalConfig;
 
-    TopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> 
conf, ExecutorService listenerExecutor,
-                       CompletableFuture<Consumer<T>> subscribeFuture, 
Schema<T> schema) {
+    MultiTopicsConsumerImpl(PulsarClientImpl client, 
ConsumerConfigurationData<T> conf, ExecutorService listenerExecutor,
+                            CompletableFuture<Consumer<T>> subscribeFuture, 
Schema<T> schema) {
         super(client, "TopicsConsumerFakeTopicName" + 
ConsumerName.generateRandomName(), conf,
                 Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, 
subscribeFuture, schema);
 
@@ -95,7 +95,7 @@ public class TopicsConsumerImpl<T> extends ConsumerBase<T> {
         this.consumers = new ConcurrentHashMap<>();
         this.pausedConsumers = new ConcurrentLinkedQueue<>();
         this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
-        this.numberTopicPartitions = new AtomicInteger(0);
+        this.allTopicPartitionsNumber = new AtomicInteger(0);
 
         if (conf.getAckTimeoutMillis() != 0) {
             this.unAckedMessageTracker = new 
UnAckedTopicMessageTracker(client, this, conf.getAckTimeoutMillis());
@@ -109,7 +109,7 @@ public class TopicsConsumerImpl<T> extends ConsumerBase<T> {
         if (conf.getTopicNames().isEmpty()) {
             this.namespaceName = null;
             setState(State.Ready);
-            subscribeFuture().complete(TopicsConsumerImpl.this);
+            subscribeFuture().complete(MultiTopicsConsumerImpl.this);
             return;
         }
 
@@ -122,15 +122,15 @@ public class TopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         FutureUtil.waitForAll(futures)
             .thenAccept(finalFuture -> {
                 try {
-                    if (numberTopicPartitions.get() > maxReceiverQueueSize) {
-                        setMaxReceiverQueueSize(numberTopicPartitions.get());
+                    if (allTopicPartitionsNumber.get() > maxReceiverQueueSize) 
{
+                        
setMaxReceiverQueueSize(allTopicPartitionsNumber.get());
                     }
                     setState(State.Ready);
                     // We have successfully created N consumers, so we can 
start receiving messages now
                     
startReceivingMessages(consumers.values().stream().collect(Collectors.toList()));
-                    subscribeFuture().complete(TopicsConsumerImpl.this);
+                    subscribeFuture().complete(MultiTopicsConsumerImpl.this);
                     log.info("[{}] [{}] Created topics consumer with {} 
sub-consumers",
-                        topic, subscription, numberTopicPartitions.get());
+                        topic, subscription, allTopicPartitionsNumber.get());
                 } catch (PulsarClientException e) {
                     log.warn("[{}] Failed startReceivingMessages while 
subscribe topics: {}", topic, e.getMessage());
                     subscribeFuture.completeExceptionally(e);
@@ -245,7 +245,7 @@ public class TopicsConsumerImpl<T> extends ConsumerBase<T> {
             } else {
                 // Enqueue the message so that it can be retrieved when 
application calls receive()
                 // Waits for the queue to have space for the message
-                // This should never block cause TopicsConsumerImpl should 
always use GrowableArrayBlockingQueue
+                // This should never block cause MultiTopicsConsumerImpl 
should always use GrowableArrayBlockingQueue
                 incomingMessages.put(topicMessage);
             }
         } catch (InterruptedException e) {
@@ -271,7 +271,7 @@ public class TopicsConsumerImpl<T> extends ConsumerBase<T> {
                         log.debug("[{}][{}] Calling message listener for 
message {}",
                             topic, subscription, message.getMessageId());
                     }
-                    listener.received(TopicsConsumerImpl.this, msg);
+                    listener.received(MultiTopicsConsumerImpl.this, msg);
                 } catch (Throwable t) {
                     log.error("[{}][{}] Message listener error in processing 
message: {}",
                         topic, subscription, message, t);
@@ -613,117 +613,172 @@ public class TopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         }
 
         CompletableFuture<Void> subscribeResult = new CompletableFuture<>();
-        final AtomicInteger partitionNumber = new AtomicInteger(0);
 
-        client.getPartitionedTopicMetadata(topicName).thenAccept(metadata -> {
-            if (log.isDebugEnabled()) {
-                log.debug("Received topic {} metadata.partitions: {}", 
topicName, metadata.partitions);
-            }
+        client.getPartitionedTopicMetadata(topicName)
+            .thenAccept(metadata -> subscribeTopicPartitions(subscribeResult, 
topicName, metadata.partitions))
+            .exceptionally(ex1 -> {
+                log.warn("[{}] Failed to get partitioned topic metadata: {}", 
topicName, ex1.getMessage());
+                subscribeResult.completeExceptionally(ex1);
+                return null;
+            });
 
-            List<CompletableFuture<Consumer<T>>> futureList;
-
-            if (metadata.partitions > 1) {
-                this.topics.putIfAbsent(topicName, metadata.partitions);
-                numberTopicPartitions.addAndGet(metadata.partitions);
-                partitionNumber.addAndGet(metadata.partitions);
-
-                futureList = IntStream
-                    .range(0, partitionNumber.get())
-                    .mapToObj(
-                        partitionIndex -> {
-                            String partitionName = 
TopicName.get(topicName).getPartition(partitionIndex).toString();
-                            CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
-                            ConsumerImpl<T> newConsumer = new 
ConsumerImpl<>(client, partitionName, internalConfig,
-                                    
client.externalExecutorProvider().getExecutor(), partitionIndex, subFuture, 
schema);
-                            consumers.putIfAbsent(newConsumer.getTopic(), 
newConsumer);
-                            return subFuture;
-                        })
-                    .collect(Collectors.toList());
-            } else {
-                this.topics.putIfAbsent(topicName, 1);
-                numberTopicPartitions.incrementAndGet();
-                partitionNumber.incrementAndGet();
+        return subscribeResult;
+    }
 
-                CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
-                ConsumerImpl<T> newConsumer = new ConsumerImpl<>(client, 
topicName, internalConfig,
-                        client.externalExecutorProvider().getExecutor(), 0, 
subFuture, schema);
-                consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
+    // create consumer for a single topic with already known partitions.
+    // first create a consumer with no topic, then do subscription for already 
know partitionedTopic.
+    public static <T> MultiTopicsConsumerImpl<T> 
createPartitionedConsumer(PulsarClientImpl client,
+                                                                           
ConsumerConfigurationData<T> conf,
+                                                                           
ExecutorService listenerExecutor,
+                                                                           
CompletableFuture<Consumer<T>> subscribeFuture,
+                                                                           int 
numPartitions,
+                                                                           
Schema<T> schema) {
+        checkArgument(conf.getTopicNames().size() == 1, "Should have only 1 
topic for partitioned consumer");
+
+        // get topic name, then remove it from conf, so constructor will 
create a consumer with no topic.
+        ConsumerConfigurationData cloneConf = conf.clone();
+        String topicName = cloneConf.getSingleTopic();
+        cloneConf.getTopicNames().remove(topicName);
+
+        CompletableFuture<Consumer> future = new CompletableFuture<>();
+        MultiTopicsConsumerImpl consumer = new MultiTopicsConsumerImpl(client, 
cloneConf, listenerExecutor, future, schema);
+
+        future.thenCompose(c -> 
((MultiTopicsConsumerImpl)c).subscribeAsync(topicName, numPartitions))
+            .thenRun(()-> subscribeFuture.complete(consumer))
+            .exceptionally(e -> {
+                log.warn("Failed subscription for createPartitionedConsumer: 
{} {}, e:{}",
+                    topicName, numPartitions,  e);
+                
subscribeFuture.completeExceptionally(((Throwable)e).getCause());
+                return null;
+            });;
+        return consumer;
+    }
 
-                futureList = Collections.singletonList(subFuture);
-            }
+    // subscribe one more given topic, but already know the numberPartitions
+    private CompletableFuture<Void> subscribeAsync(String topicName, int 
numberPartitions) {
+        if (!topicNameValid(topicName)) {
+            return FutureUtil.failedFuture(
+                new PulsarClientException.AlreadyClosedException("Topic name 
not valid"));
+        }
 
-            FutureUtil.waitForAll(futureList)
-                .thenAccept(finalFuture -> {
-                    try {
-                        if (numberTopicPartitions.get() > 
maxReceiverQueueSize) {
-                            
setMaxReceiverQueueSize(numberTopicPartitions.get());
-                        }
-                        int numTopics = 
this.topics.values().stream().mapToInt(Integer::intValue).sum();
-                        checkState(numberTopicPartitions.get() == numTopics,
-                            "numberTopicPartitions " + 
numberTopicPartitions.get()
-                                + " not equals expected: " + numTopics);
-
-                        // We have successfully created new consumers, so we 
can start receiving messages for them
-                        startReceivingMessages(
-                            consumers.values().stream()
-                                .filter(consumer1 -> {
-                                    String consumerTopicName = 
consumer1.getTopic();
-                                    if 
(TopicName.get(consumerTopicName).getPartitionedTopicName().equals(
-                                        
TopicName.get(topicName).getPartitionedTopicName().toString())) {
-                                        return true;
-                                    } else {
-                                        return false;
-                                    }
-                                })
-                                .collect(Collectors.toList()));
-
-                        subscribeResult.complete(null);
-                        log.info("[{}] [{}] Success subscribe new topic {} in 
topics consumer, numberTopicPartitions {}",
-                            topic, subscription, topicName, 
numberTopicPartitions.get());
-                        if (this.namespaceName == null) {
-                            this.namespaceName = 
TopicName.get(topicName).getNamespaceObject();
-                        }
-                        return;
-                    } catch (PulsarClientException e) {
-                        handleSubscribeOneTopicError(topicName, e);
-                        subscribeResult.completeExceptionally(e);
-                    }
-                })
-                .exceptionally(ex -> {
-                    handleSubscribeOneTopicError(topicName, ex);
-                    subscribeResult.completeExceptionally(ex);
-                    return null;
-                });
-        }).exceptionally(ex1 -> {
-            log.warn("[{}] Failed to get partitioned topic metadata: {}", 
topicName, ex1.getMessage());
-            subscribeResult.completeExceptionally(ex1);
-            return null;
-        });
+        if (getState() == State.Closing || getState() == State.Closed) {
+            return FutureUtil.failedFuture(
+                new PulsarClientException.AlreadyClosedException("Topics 
Consumer was already closed"));
+        }
+
+        CompletableFuture<Void> subscribeResult = new CompletableFuture<>();
+        subscribeTopicPartitions(subscribeResult, topicName, numberPartitions);
 
         return subscribeResult;
     }
 
-    // handling failure during subscribe new topic, unsubscribe success 
created partitions
-    private void handleSubscribeOneTopicError(String topicName, Throwable 
error) {
-        log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer 
", topic, topicName, error.getMessage());
+    private void subscribeTopicPartitions(CompletableFuture<Void> 
subscribeResult, String topicName, int partitionNumber) {
+        if (log.isDebugEnabled()) {
+            log.debug("Subscribe to topic {} metadata.partitions: {}", 
topicName, partitionNumber);
+        }
 
-        consumers.values().stream().filter(consumer1 -> {
-            String consumerTopicName = consumer1.getTopic();
-            if 
(TopicName.get(consumerTopicName).getPartitionedTopicName().equals(topicName)) {
-                return true;
-            } else {
-                return false;
-            }
-        }).forEach(consumer2 ->  {
-            consumer2.closeAsync().handle((ok, closeException) -> {
-                consumer2.subscribeFuture().completeExceptionally(error);
+        List<CompletableFuture<Consumer<T>>> futureList;
+
+        if (partitionNumber > 1) {
+            this.topics.putIfAbsent(topicName, partitionNumber);
+            allTopicPartitionsNumber.addAndGet(partitionNumber);
+
+            futureList = IntStream
+                .range(0, partitionNumber)
+                .mapToObj(
+                    partitionIndex -> {
+                        String partitionName = 
TopicName.get(topicName).getPartition(partitionIndex).toString();
+                        CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
+                        ConsumerImpl<T> newConsumer = new 
ConsumerImpl<>(client, partitionName, internalConfig,
+                            client.externalExecutorProvider().getExecutor(), 
partitionIndex, subFuture, schema);
+                        consumers.putIfAbsent(newConsumer.getTopic(), 
newConsumer);
+                        return subFuture;
+                    })
+                .collect(Collectors.toList());
+        } else {
+            this.topics.putIfAbsent(topicName, 1);
+            allTopicPartitionsNumber.incrementAndGet();
+
+            CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
+            ConsumerImpl<T> newConsumer = new ConsumerImpl<>(client, 
topicName, internalConfig,
+                client.externalExecutorProvider().getExecutor(), 0, subFuture, 
schema);
+            consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
+
+            futureList = Collections.singletonList(subFuture);
+        }
+
+        FutureUtil.waitForAll(futureList)
+            .thenAccept(finalFuture -> {
+                try {
+                    if (allTopicPartitionsNumber.get() > maxReceiverQueueSize) 
{
+                        
setMaxReceiverQueueSize(allTopicPartitionsNumber.get());
+                    }
+                    int numTopics = 
this.topics.values().stream().mapToInt(Integer::intValue).sum();
+                    checkState(allTopicPartitionsNumber.get() == numTopics,
+                        "allTopicPartitionsNumber " + 
allTopicPartitionsNumber.get()
+                            + " not equals expected: " + numTopics);
+
+                    // We have successfully created new consumers, so we can 
start receiving messages for them
+                    startReceivingMessages(
+                        consumers.values().stream()
+                            .filter(consumer1 -> {
+                                String consumerTopicName = 
consumer1.getTopic();
+                                if 
(TopicName.get(consumerTopicName).getPartitionedTopicName().equals(
+                                    
TopicName.get(topicName).getPartitionedTopicName().toString())) {
+                                    return true;
+                                } else {
+                                    return false;
+                                }
+                            })
+                            .collect(Collectors.toList()));
+
+                    subscribeResult.complete(null);
+                    log.info("[{}] [{}] Success subscribe new topic {} in 
topics consumer, partitions: {}, allTopicPartitionsNumber: {}",
+                        topic, subscription, topicName, partitionNumber, 
allTopicPartitionsNumber.get());
+                    if (this.namespaceName == null) {
+                        this.namespaceName = 
TopicName.get(topicName).getNamespaceObject();
+                    }
+                    return;
+                } catch (PulsarClientException e) {
+                    handleSubscribeOneTopicError(topicName, e, 
subscribeResult);
+                }
+            })
+            .exceptionally(ex -> {
+                handleSubscribeOneTopicError(topicName, ex, subscribeResult);
                 return null;
             });
-            consumers.remove(consumer2.getTopic());
-        });
+    }
 
-        topics.remove(topicName);
-        checkState(numberTopicPartitions.get() == consumers.values().size());
+    // handling failure during subscribe new topic, unsubscribe success 
created partitions
+    private void handleSubscribeOneTopicError(String topicName, Throwable 
error, CompletableFuture<Void> subscribeFuture) {
+        log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer 
{}", topic, topicName, error.getMessage());
+
+        client.externalExecutorProvider().getExecutor().submit(() -> {
+            AtomicInteger toCloseNum = new AtomicInteger(0);
+            consumers.values().stream().filter(consumer1 -> {
+                String consumerTopicName = consumer1.getTopic();
+                if 
(TopicName.get(consumerTopicName).getPartitionedTopicName().equals(topicName)) {
+                    return true;
+                } else {
+                    return false;
+                }
+            }).forEach(consumer2 -> {
+                toCloseNum.incrementAndGet();
+                consumer2.closeAsync().whenComplete((r, ex) -> {
+                    consumer2.subscribeFuture().completeExceptionally(error);
+                    allTopicPartitionsNumber.decrementAndGet();
+                    consumers.remove(consumer2.getTopic());
+                    if (toCloseNum.decrementAndGet() == 0) {
+                        log.warn("[{}] Failed to subscribe for topic [{}] in 
topics consumer, subscribe error: {}",
+                            topic, topicName, error.getMessage());
+                        topics.remove(topicName);
+                        checkState(allTopicPartitionsNumber.get() == 
consumers.values().size());
+                        subscribeFuture.completeExceptionally(error);
+                    }
+                    return;
+                });
+            });
+        });
     }
 
     // un-subscribe a given topic
@@ -757,15 +812,15 @@ public class TopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                     consumersToUnsub.forEach(consumer1 -> {
                         consumers.remove(consumer1.getTopic());
                         pausedConsumers.remove(consumer1);
-                        numberTopicPartitions.decrementAndGet();
+                        allTopicPartitionsNumber.decrementAndGet();
                     });
 
                     topics.remove(topicName);
                     ((UnAckedTopicMessageTracker) 
unAckedMessageTracker).removeTopicMessages(topicName);
 
                     unsubscribeFuture.complete(null);
-                    log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer, 
numberTopicPartitions: {}",
-                        topicName, subscription, consumerName, 
numberTopicPartitions);
+                    log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer, 
allTopicPartitionsNumber: {}",
+                        topicName, subscription, consumerName, 
allTopicPartitionsNumber);
                 } else {
                     unsubscribeFuture.completeExceptionally(ex);
                     setState(State.Failed);
@@ -792,5 +847,5 @@ public class TopicsConsumerImpl<T> extends ConsumerBase<T> {
         return consumers.values().stream().collect(Collectors.toList());
     }
 
-    private static final Logger log = 
LoggerFactory.getLogger(TopicsConsumerImpl.class);
+    private static final Logger log = 
LoggerFactory.getLogger(MultiTopicsConsumerImpl.class);
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
deleted file mode 100644
index 9c952dd..0000000
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
+++ /dev/null
@@ -1,553 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.impl;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
-
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.util.FutureUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-public class PartitionedConsumerImpl<T> extends ConsumerBase<T> {
-
-    private final List<ConsumerImpl<T>> consumers;
-
-    // Queue of partition consumers on which we have stopped calling 
receiveAsync() because the
-    // shared incoming queue was full
-    private final ConcurrentLinkedQueue<ConsumerImpl<T>> pausedConsumers;
-
-    // Threshold for the shared queue. When the size of the shared queue goes 
below the threshold, we are going to
-    // resume receiving from the paused consumer partitions
-    private final int sharedQueueResumeThreshold;
-
-    private final int numPartitions;
-    private final ReadWriteLock lock = new ReentrantReadWriteLock();
-    private final ConsumerStatsRecorderImpl stats;
-    private final UnAckedMessageTracker unAckedMessageTracker;
-
-    PartitionedConsumerImpl(PulsarClientImpl client, 
ConsumerConfigurationData<T> conf, int numPartitions,
-                            ExecutorService listenerExecutor, 
CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema) {
-        super(client, conf.getSingleTopic(), conf, Math.max(Math.max(2, 
numPartitions), conf.getReceiverQueueSize()),
-                listenerExecutor, subscribeFuture, schema);
-        this.consumers = Lists.newArrayListWithCapacity(numPartitions);
-        this.pausedConsumers = new ConcurrentLinkedQueue<>();
-        this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
-        this.numPartitions = numPartitions;
-
-        if (conf.getAckTimeoutMillis() != 0) {
-            this.unAckedMessageTracker = new UnAckedMessageTracker(client, 
this, conf.getAckTimeoutMillis());
-        } else {
-            this.unAckedMessageTracker = 
UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED;
-        }
-
-        stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new 
ConsumerStatsRecorderImpl() : null;
-        checkArgument(conf.getReceiverQueueSize() > 0,
-                "Receiver queue size needs to be greater than 0 for 
Partitioned Topics");
-        start();
-    }
-
-    private void start() {
-        AtomicReference<Throwable> subscribeFail = new 
AtomicReference<Throwable>();
-        AtomicInteger completed = new AtomicInteger();
-        ConsumerConfigurationData<T> internalConfig = 
getInternalConsumerConfig();
-        for (int partitionIndex = 0; partitionIndex < numPartitions; 
partitionIndex++) {
-            String partitionName = 
TopicName.get(topic).getPartition(partitionIndex).toString();
-            ConsumerImpl<T> consumer = new ConsumerImpl<>(client, 
partitionName, internalConfig,
-                    client.externalExecutorProvider().getExecutor(), 
partitionIndex, new CompletableFuture<>(), schema);
-            consumers.add(consumer);
-            consumer.subscribeFuture().handle((cons, subscribeException) -> {
-                if (subscribeException != null) {
-                    setState(State.Failed);
-                    subscribeFail.compareAndSet(null, subscribeException);
-                    client.cleanupConsumer(this);
-                }
-                if (completed.incrementAndGet() == numPartitions) {
-                    if (subscribeFail.get() == null) {
-                        try {
-                            // We have successfully created N consumers, so we 
can start receiving messages now
-                            starReceivingMessages();
-                            setState(State.Ready);
-                            
subscribeFuture().complete(PartitionedConsumerImpl.this);
-                            log.info("[{}] [{}] Created partitioned consumer", 
topic, subscription);
-                            return null;
-                        } catch (PulsarClientException e) {
-                            subscribeFail.set(e);
-                        }
-                    }
-                    closeAsync().handle((ok, closeException) -> {
-                        
subscribeFuture().completeExceptionally(subscribeFail.get());
-                        client.cleanupConsumer(this);
-                        return null;
-                    });
-                    log.error("[{}] [{}] Could not create partitioned 
consumer.", topic, subscription,
-                            subscribeFail.get().getCause());
-                }
-                return null;
-            });
-        }
-    }
-
-    private void starReceivingMessages() throws PulsarClientException {
-        for (ConsumerImpl<T> consumer : consumers) {
-            
consumer.sendFlowPermitsToBroker(consumer.getConnectionHandler().cnx(), 
conf.getReceiverQueueSize());
-            receiveMessageFromConsumer(consumer);
-        }
-    }
-
-    private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {
-        consumer.receiveAsync().thenAccept(message -> {
-            // Process the message, add to the queue and trigger listener or 
async callback
-            messageReceived(message);
-
-            // we're modifying pausedConsumers
-            lock.writeLock().lock();
-            try {
-                int size = incomingMessages.size();
-                if (size >= maxReceiverQueueSize
-                        || (size > sharedQueueResumeThreshold && 
!pausedConsumers.isEmpty())) {
-                    // mark this consumer to be resumed later: if No more 
space left in shared queue,
-                    // or if any consumer is already paused (to create fair 
chance for already paused consumers)
-                    pausedConsumers.add(consumer);
-                } else {
-                    // Schedule next receiveAsync() if the incoming queue is 
not full. Use a different thread to avoid
-                    // recursion and stack overflow
-                    client.eventLoopGroup().execute(() -> {
-                        receiveMessageFromConsumer(consumer);
-                    });
-                }
-            } finally {
-                lock.writeLock().unlock();
-            }
-        });
-    }
-
-    private void resumeReceivingFromPausedConsumersIfNeeded() {
-        lock.readLock().lock();
-        try {
-            if (incomingMessages.size() <= sharedQueueResumeThreshold && 
!pausedConsumers.isEmpty()) {
-                while (true) {
-                    ConsumerImpl<T> consumer = pausedConsumers.poll();
-                    if (consumer == null) {
-                        break;
-                    }
-
-                    // if messages are readily available on consumer we will 
attempt to writeLock on the same thread
-                    client.eventLoopGroup().execute(() -> {
-                        receiveMessageFromConsumer(consumer);
-                    });
-                }
-            }
-        } finally {
-            lock.readLock().unlock();
-        }
-    }
-
-    @Override
-    protected Message<T> internalReceive() throws PulsarClientException {
-        Message<T> message;
-        try {
-            message = incomingMessages.take();
-            unAckedMessageTracker.add((MessageIdImpl) message.getMessageId());
-            resumeReceivingFromPausedConsumersIfNeeded();
-            return message;
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarClientException(e);
-        }
-    }
-
-    @Override
-    protected Message<T> internalReceive(int timeout, TimeUnit unit) throws 
PulsarClientException {
-        Message<T> message;
-        try {
-            message = incomingMessages.poll(timeout, unit);
-            if (message != null) {
-                unAckedMessageTracker.add(message.getMessageId());
-            }
-            resumeReceivingFromPausedConsumersIfNeeded();
-            return message;
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarClientException(e);
-        }
-    }
-
-    @Override
-    protected CompletableFuture<Message<T>> internalReceiveAsync() {
-        CompletableFuture<Message<T>> result = new CompletableFuture<>();
-        Message<T> message;
-        try {
-            lock.writeLock().lock();
-            message = incomingMessages.poll(0, TimeUnit.SECONDS);
-            if (message == null) {
-                pendingReceives.add(result);
-            } else {
-                unAckedMessageTracker.add(message.getMessageId());
-                resumeReceivingFromPausedConsumersIfNeeded();
-                result.complete(message);
-            }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            result.completeExceptionally(new PulsarClientException(e));
-        } finally {
-            lock.writeLock().unlock();
-        }
-
-        return result;
-    }
-
-    @Override
-    protected CompletableFuture<Void> doAcknowledge(MessageId messageId, 
AckType ackType,
-                                                    Map<String,Long> 
properties) {
-        checkArgument(messageId instanceof MessageIdImpl);
-
-        if (getState() != State.Ready) {
-            return FutureUtil.failedFuture(new PulsarClientException("Consumer 
already closed"));
-        }
-
-        if (ackType == AckType.Cumulative) {
-            return FutureUtil.failedFuture(new 
PulsarClientException.NotSupportedException(
-                    "Cumulative acknowledge not supported for partitioned 
topics"));
-        } else {
-
-            ConsumerImpl<T> consumer = consumers.get(((MessageIdImpl) 
messageId).getPartitionIndex());
-            return consumer.doAcknowledge(messageId, ackType, 
properties).thenRun(() ->
-                    unAckedMessageTracker.remove(messageId));
-        }
-
-    }
-
-    @Override
-    public CompletableFuture<Void> unsubscribeAsync() {
-        if (getState() == State.Closing || getState() == State.Closed) {
-            return FutureUtil.failedFuture(
-                    new 
PulsarClientException.AlreadyClosedException("Partitioned Consumer was already 
closed"));
-        }
-        setState(State.Closing);
-
-        AtomicReference<Throwable> unsubscribeFail = new 
AtomicReference<Throwable>();
-        AtomicInteger completed = new AtomicInteger(numPartitions);
-        CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
-        for (Consumer<T> consumer : consumers) {
-            if (consumer != null) {
-                consumer.unsubscribeAsync().handle((unsubscribed, ex) -> {
-                    if (ex != null) {
-                        unsubscribeFail.compareAndSet(null, ex);
-                    }
-                    if (completed.decrementAndGet() == 0) {
-                        if (unsubscribeFail.get() == null) {
-                            setState(State.Closed);
-                            unAckedMessageTracker.close();
-                            unsubscribeFuture.complete(null);
-                            log.info("[{}] [{}] Unsubscribed Partitioned 
Consumer", topic, subscription);
-                        } else {
-                            setState(State.Failed);
-                            
unsubscribeFuture.completeExceptionally(unsubscribeFail.get());
-                            log.error("[{}] [{}] Could not unsubscribe 
Partitioned Consumer", topic, subscription,
-                                    unsubscribeFail.get().getCause());
-                        }
-                    }
-
-                    return null;
-                });
-            }
-
-        }
-
-        return unsubscribeFuture;
-    }
-
-    @Override
-    public CompletableFuture<Void> closeAsync() {
-        if (getState() == State.Closing || getState() == State.Closed) {
-            unAckedMessageTracker.close();
-            return CompletableFuture.completedFuture(null);
-        }
-        setState(State.Closing);
-
-        AtomicReference<Throwable> closeFail = new 
AtomicReference<Throwable>();
-        AtomicInteger completed = new AtomicInteger(numPartitions);
-        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
-        for (Consumer<T> consumer : consumers) {
-            if (consumer != null) {
-                consumer.closeAsync().handle((closed, ex) -> {
-                    if (ex != null) {
-                        closeFail.compareAndSet(null, ex);
-                    }
-                    if (completed.decrementAndGet() == 0) {
-                        if (closeFail.get() == null) {
-                            setState(State.Closed);
-                            unAckedMessageTracker.close();
-                            closeFuture.complete(null);
-                            log.info("[{}] [{}] Closed Partitioned Consumer", 
topic, subscription);
-                            client.cleanupConsumer(this);
-                            // fail all pending-receive futures to notify 
application
-                            failPendingReceive();
-                        } else {
-                            setState(State.Failed);
-                            closeFuture.completeExceptionally(closeFail.get());
-                            log.error("[{}] [{}] Could not close Partitioned 
Consumer", topic, subscription,
-                                    closeFail.get().getCause());
-                        }
-                    }
-
-                    return null;
-                });
-            }
-
-        }
-
-        return closeFuture;
-    }
-
-    private void failPendingReceive() {
-        lock.readLock().lock();
-        try {
-            if (listenerExecutor != null && !listenerExecutor.isShutdown()) {
-                while (!pendingReceives.isEmpty()) {
-                    CompletableFuture<Message<T>> receiveFuture = 
pendingReceives.poll();
-                    if (receiveFuture != null) {
-                        receiveFuture.completeExceptionally(
-                                new 
PulsarClientException.AlreadyClosedException("Consumer is already closed"));
-                    } else {
-                        break;
-                    }
-                }
-            }
-        } finally {
-            lock.readLock().unlock();
-        }
-    }
-
-    @Override
-    public boolean isConnected() {
-        return consumers.stream().allMatch(ConsumerImpl::isConnected);
-    }
-
-    void messageReceived(Message<T> message) {
-        lock.writeLock().lock();
-        try {
-            unAckedMessageTracker.add(message.getMessageId());
-            if (log.isDebugEnabled()) {
-                log.debug("[{}][{}] Received message from partitioned-consumer 
{}", topic, subscription, message.getMessageId());
-            }
-            // if asyncReceive is waiting : return message to callback without 
adding to incomingMessages queue
-            if (!pendingReceives.isEmpty()) {
-                CompletableFuture<Message<T>> receivedFuture = 
pendingReceives.poll();
-                listenerExecutor.execute(() -> 
receivedFuture.complete(message));
-            } else {
-                // Enqueue the message so that it can be retrieved when 
application calls receive()
-                // Waits for the queue to have space for the message
-                // This should never block cause PartitonedConsumerImpl should 
always use GrowableArrayBlockingQueue
-                incomingMessages.put(message);
-            }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        } finally {
-            lock.writeLock().unlock();
-        }
-
-        if (listener != null) {
-            // Trigger the notification on the message listener in a separate 
thread to avoid blocking the networking
-            // thread while the message processing happens
-            listenerExecutor.execute(() -> {
-                Message<T> msg;
-                try {
-                    msg = internalReceive();
-                } catch (PulsarClientException e) {
-                    log.warn("[{}] [{}] Failed to dequeue the message for 
listener", topic, subscription, e);
-                    return;
-                }
-
-                try {
-                    if (log.isDebugEnabled()) {
-                        log.debug("[{}][{}] Calling message listener for 
message {}", topic, subscription, message.getMessageId());
-                    }
-                    listener.received(PartitionedConsumerImpl.this, msg);
-                } catch (Throwable t) {
-                    log.error("[{}][{}] Message listener error in processing 
message: {}", topic, subscription, message,
-                            t);
-                }
-            });
-        }
-    }
-
-    @Override
-    String getHandlerName() {
-        return subscription;
-    }
-
-    private ConsumerConfigurationData<T> getInternalConsumerConfig() {
-        ConsumerConfigurationData<T> internalConsumerConfig = new 
ConsumerConfigurationData<>();
-        
internalConsumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize());
-        internalConsumerConfig.setSubscriptionName(conf.getSubscriptionName());
-        internalConsumerConfig.setSubscriptionType(conf.getSubscriptionType());
-        internalConsumerConfig.setConsumerName(consumerName);
-        
internalConsumerConfig.setAcknowledgementsGroupTimeMicros(conf.getAcknowledgementsGroupTimeMicros());
-        internalConsumerConfig.setPriorityLevel(conf.getPriorityLevel());
-        internalConsumerConfig.setProperties(conf.getProperties());
-        internalConsumerConfig.setReadCompacted(conf.isReadCompacted());
-        if (null != conf.getConsumerEventListener()) {
-            
internalConsumerConfig.setConsumerEventListener(conf.getConsumerEventListener());
-        }
-
-        int receiverQueueSize = Math.min(conf.getReceiverQueueSize(),
-                conf.getMaxTotalReceiverQueueSizeAcrossPartitions() / 
numPartitions);
-        internalConsumerConfig.setReceiverQueueSize(receiverQueueSize);
-
-        if (conf.getCryptoKeyReader() != null) {
-            
internalConsumerConfig.setCryptoKeyReader(conf.getCryptoKeyReader());
-            
internalConsumerConfig.setCryptoFailureAction(conf.getCryptoFailureAction());
-        }
-        if (conf.getAckTimeoutMillis() != 0) {
-            
internalConsumerConfig.setAckTimeoutMillis(conf.getAckTimeoutMillis());
-        }
-
-        return internalConsumerConfig;
-    }
-
-    @Override
-    public void redeliverUnacknowledgedMessages() {
-        synchronized (this) {
-            for (ConsumerImpl<T> c : consumers) {
-                c.redeliverUnacknowledgedMessages();
-            }
-            incomingMessages.clear();
-            unAckedMessageTracker.clear();
-            resumeReceivingFromPausedConsumersIfNeeded();
-        }
-    }
-
-    @Override
-    public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
-        checkArgument(messageIds.stream().findFirst().get() instanceof 
MessageIdImpl);
-        if (conf.getSubscriptionType() != SubscriptionType.Shared) {
-            // We cannot redeliver single messages if subscription type is not 
Shared
-            redeliverUnacknowledgedMessages();
-            return;
-        }
-        removeExpiredMessagesFromQueue(messageIds);
-        messageIds.stream()
-            .map(messageId -> (MessageIdImpl)messageId)
-            .collect(Collectors.groupingBy(MessageIdImpl::getPartitionIndex, 
Collectors.toSet()))
-            .forEach((partitionIndex, messageIds1) ->
-                consumers.get(partitionIndex).redeliverUnacknowledgedMessages(
-                    messageIds1.stream().map(mid -> 
(MessageId)mid).collect(Collectors.toSet())));
-        resumeReceivingFromPausedConsumersIfNeeded();
-    }
-
-    @Override
-    public void seek(MessageId messageId) throws PulsarClientException {
-        try {
-            seekAsync(messageId).get();
-        } catch (ExecutionException e) {
-            throw new PulsarClientException(e.getCause());
-        } catch (InterruptedException e) {
-            throw new PulsarClientException(e);
-        }
-    }
-
-    @Override
-    public CompletableFuture<Void> seekAsync(MessageId messageId) {
-        return FutureUtil.failedFuture(new PulsarClientException("Seek 
operation not supported on partitioned topics"));
-    }
-
-    List<ConsumerImpl<T>> getConsumers() {
-        return consumers;
-    }
-
-    @Override
-    public int getAvailablePermits() {
-        return 
consumers.stream().mapToInt(ConsumerImpl::getAvailablePermits).sum();
-    }
-
-    @Override
-    public boolean hasReachedEndOfTopic() {
-        return consumers.stream().allMatch(Consumer::hasReachedEndOfTopic);
-    }
-
-    @Override
-    public int numMessagesInQueue() {
-        return incomingMessages.size() + 
consumers.stream().mapToInt(ConsumerImpl::numMessagesInQueue).sum();
-    }
-
-    @Override
-    public synchronized ConsumerStatsRecorderImpl getStats() {
-        if (stats == null) {
-            return null;
-        }
-        stats.reset();
-        for (int i = 0; i < numPartitions; i++) {
-            stats.updateCumulativeStats(consumers.get(i).getStats());
-        }
-        return stats;
-    }
-
-    public UnAckedMessageTracker getUnAckedMessageTracker() {
-        return unAckedMessageTracker;
-    }
-
-    private void removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
-        Message<T> peek = incomingMessages.peek();
-        if (peek != null) {
-            if (!messageIds.contains(peek.getMessageId())) {
-                // first message is not expired, then no message is expired in 
queue.
-                return;
-            }
-
-            // try not to remove elements that are added while we remove
-            Message<T> message = incomingMessages.poll();
-            while (message != null) {
-                MessageIdImpl messageId = (MessageIdImpl) 
message.getMessageId();
-                if (!messageIds.contains(messageId)) {
-                    messageIds.add(messageId);
-                    break;
-                }
-                message = incomingMessages.poll();
-            }
-        }
-    }
-
-    private static final Logger log = 
LoggerFactory.getLogger(PartitionedConsumerImpl.class);
-}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
similarity index 89%
rename from 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java
rename to 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index f9cf550..d0b0c60 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -41,17 +41,17 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class PatternTopicsConsumerImpl<T> extends TopicsConsumerImpl<T> 
implements TimerTask {
+public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T> implements TimerTask {
     private final Pattern topicsPattern;
     private final TopicsChangedListener topicsChangeListener;
     private volatile Timeout recheckPatternTimeout = null;
 
-    public PatternTopicsConsumerImpl(Pattern topicsPattern,
-                                     PulsarClientImpl client,
-                                     ConsumerConfigurationData<T> conf,
-                                     ExecutorService listenerExecutor,
-                                     CompletableFuture<Consumer<T>> 
subscribeFuture,
-                                     Schema<T> schema) {
+    public PatternMultiTopicsConsumerImpl(Pattern topicsPattern,
+                                          PulsarClientImpl client,
+                                          ConsumerConfigurationData<T> conf,
+                                          ExecutorService listenerExecutor,
+                                          CompletableFuture<Consumer<T>> 
subscribeFuture,
+                                          Schema<T> schema) {
         super(client, conf, listenerExecutor, subscribeFuture, schema);
         this.topicsPattern = topicsPattern;
 
@@ -86,7 +86,7 @@ public class PatternTopicsConsumerImpl<T> extends 
TopicsConsumerImpl<T> implemen
             }
 
             List<String> newTopics = 
PulsarClientImpl.topicsPatternFilter(topics, topicsPattern);
-            List<String> oldTopics = 
PatternTopicsConsumerImpl.this.getTopics();
+            List<String> oldTopics = 
PatternMultiTopicsConsumerImpl.this.getTopics();
 
             
futures.add(topicsChangeListener.onTopicsAdded(topicsListsMinus(newTopics, 
oldTopics)));
             
futures.add(topicsChangeListener.onTopicsRemoved(topicsListsMinus(oldTopics, 
newTopics)));
@@ -100,7 +100,7 @@ public class PatternTopicsConsumerImpl<T> extends 
TopicsConsumerImpl<T> implemen
         });
 
         // schedule the next re-check task
-        client.timer().newTimeout(PatternTopicsConsumerImpl.this,
+        client.timer().newTimeout(PatternMultiTopicsConsumerImpl.this,
             Math.min(1, conf.getPatternAutoDiscoveryPeriod()), 
TimeUnit.MINUTES);
     }
 
@@ -109,9 +109,9 @@ public class PatternTopicsConsumerImpl<T> extends 
TopicsConsumerImpl<T> implemen
     }
 
     interface TopicsChangedListener {
-        // unsubscribe and delete ConsumerImpl in the `consumers` map in 
`TopicsConsumerImpl` based on added topics.
+        // unsubscribe and delete ConsumerImpl in the `consumers` map in 
`MultiTopicsConsumerImpl` based on added topics.
         CompletableFuture<Void> onTopicsRemoved(Collection<String> 
removedTopics);
-        // subscribe and create a list of new ConsumerImpl, added them to the 
`consumers` map in `TopicsConsumerImpl`.
+        // subscribe and create a list of new ConsumerImpl, added them to the 
`consumers` map in `MultiTopicsConsumerImpl`.
         CompletableFuture<Void> onTopicsAdded(Collection<String> addedTopics);
     }
 
@@ -181,5 +181,5 @@ public class PatternTopicsConsumerImpl<T> extends 
TopicsConsumerImpl<T> implemen
         return recheckPatternTimeout;
     }
 
-    private static final Logger log = 
LoggerFactory.getLogger(PatternTopicsConsumerImpl.class);
+    private static final Logger log = 
LoggerFactory.getLogger(PatternMultiTopicsConsumerImpl.class);
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 47e953b..dd07cf9 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -384,8 +384,8 @@ public class PulsarClientImpl implements PulsarClient {
             // gets the next single threaded executor from the list of 
executors
             ExecutorService listenerThread = 
externalExecutorProvider.getExecutor();
             if (metadata.partitions > 1) {
-                consumer = new 
PartitionedConsumerImpl<>(PulsarClientImpl.this, conf, metadata.partitions, 
listenerThread,
-                        consumerSubscribedFuture, schema);
+                consumer = 
MultiTopicsConsumerImpl.createPartitionedConsumer(PulsarClientImpl.this, conf,
+                    listenerThread, consumerSubscribedFuture, 
metadata.partitions, schema);
             } else {
                 consumer = new ConsumerImpl<>(PulsarClientImpl.this, topic, 
conf, listenerThread, -1,
                         consumerSubscribedFuture, schema);
@@ -406,7 +406,7 @@ public class PulsarClientImpl implements PulsarClient {
     private <T> CompletableFuture<Consumer<T>> 
multiTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema) {
         CompletableFuture<Consumer<T>> consumerSubscribedFuture = new 
CompletableFuture<>();
 
-        ConsumerBase<T> consumer = new 
TopicsConsumerImpl<>(PulsarClientImpl.this, conf,
+        ConsumerBase<T> consumer = new 
MultiTopicsConsumerImpl<>(PulsarClientImpl.this, conf,
                 externalExecutorProvider.getExecutor(), 
consumerSubscribedFuture, schema);
 
         synchronized (consumers) {
@@ -436,7 +436,7 @@ public class PulsarClientImpl implements PulsarClient {
 
                 List<String> topicsList = topicsPatternFilter(topics, 
conf.getTopicsPattern());
                 conf.getTopicNames().addAll(topicsList);
-                ConsumerBase<T> consumer = new 
PatternTopicsConsumerImpl<>(conf.getTopicsPattern(),
+                ConsumerBase<T> consumer = new 
PatternMultiTopicsConsumerImpl<>(conf.getTopicsPattern(),
                     PulsarClientImpl.this,
                     conf,
                     externalExecutorProvider.getExecutor(),
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index 6f32188..5ae452e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -27,7 +27,7 @@ public class TopicMessageImpl<T> implements Message<T> {
 
     private final String topicName;
     private final Message<T> msg;
-    private final MessageId msgId;
+    private final TopicMessageIdImpl msgId;
 
     TopicMessageImpl(String topicName,
                      Message<T> msg) {
@@ -49,6 +49,10 @@ public class TopicMessageImpl<T> implements Message<T> {
         return msgId;
     }
 
+    public MessageId getInnerMessageId() {
+        return msgId.getInnerMessageId();
+    }
+
     @Override
     public Map<String, String> getProperties() {
         return msg.getProperties();

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to