merlimat closed pull request #1365: Delete PartitionedConsumerImpl, use 
TopicsConsumerImpl instead
URL: https://github.com/apache/incubator-pulsar/pull/1365
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
index 617682169..08cd7f654 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
@@ -44,6 +44,7 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -323,13 +324,11 @@ public void testSimpleConsumerEventsWithPartition() 
throws Exception {
         final String subName = "sub1";
         final int numMsgs = 100;
         Set<String> uniqueMessages = new HashSet<>();
-
         admin.persistentTopics().createPartitionedTopic(topicName, 
numPartitions);
 
         ConsumerBuilder<byte[]> consumerBuilder = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
                 .subscriptionType(SubscriptionType.Failover);
 
-
         // 1. two consumers on the same subscription
         ActiveInactiveListenerEvent listener1 = new 
ActiveInactiveListenerEvent();
         ActiveInactiveListenerEvent listener2 = new 
ActiveInactiveListenerEvent();
@@ -374,7 +373,7 @@ public void testSimpleConsumerEventsWithPartition() throws 
Exception {
             }
             totalMessages++;
             consumer1.acknowledge(msg);
-            MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
+            MessageIdImpl msgId = (MessageIdImpl) 
(((TopicMessageImpl)msg).getInnerMessageId());
             receivedPtns.add(msgId.getPartitionIndex());
         }
 
@@ -391,7 +390,7 @@ public void testSimpleConsumerEventsWithPartition() throws 
Exception {
             }
             totalMessages++;
             consumer2.acknowledge(msg);
-            MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
+            MessageIdImpl msgId = (MessageIdImpl) 
(((TopicMessageImpl)msg).getInnerMessageId());
             receivedPtns.add(msgId.getPartitionIndex());
         }
         assertTrue(Sets.difference(listener1.inactivePtns, 
receivedPtns).isEmpty());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
index 4a9912dce..6d6fc924c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java
@@ -208,7 +208,8 @@ public void partitionedProducerSendAsync() throws 
PulsarClientException, PulsarA
         Assert.assertEquals(messageIds.size(), numberOfMessages, "Not all 
messages published successfully");
 
         for (int i = 0; i < numberOfMessages; i++) {
-            MessageId messageId = consumer.receive().getMessageId();
+            MessageId topicMessageId = consumer.receive().getMessageId();
+            MessageId messageId = 
((TopicMessageIdImpl)topicMessageId).getInnerMessageId();
             log.info("Message ID Received = " + messageId);
             Assert.assertTrue(messageIds.remove(messageId), "Failed to receive 
Message");
         }
@@ -247,7 +248,9 @@ public void partitionedProducerSend() throws 
PulsarClientException, PulsarAdminE
         Assert.assertEquals(messageIds.size(), numberOfMessages, "Not all 
messages published successfully");
 
         for (int i = 0; i < numberOfMessages; i++) {
-            
Assert.assertTrue(messageIds.remove(consumer.receive().getMessageId()), "Failed 
to receive Message");
+            MessageId topicMessageId = consumer.receive().getMessageId();
+            MessageId messageId = 
((TopicMessageIdImpl)topicMessageId).getInnerMessageId();
+            Assert.assertTrue(messageIds.remove(messageId), "Failed to receive 
Message");
         }
         log.info("Message IDs = " + messageIds);
         Assert.assertEquals(messageIds.size(), 0, "Not all messages received 
successfully");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 4f9cc7002..b3e74f7b9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -161,13 +161,13 @@ public void testBinaryProtoToGetTopicsOfNamespace() 
throws Exception {
             .subscribe();
 
         // 4. verify consumer get methods, to get right number of partitions 
and topics.
-        assertSame(pattern, ((PatternTopicsConsumerImpl<?>) 
consumer).getPattern());
-        List<String> topics = ((PatternTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics();
-        List<ConsumerImpl<byte[]>> consumers = 
((PatternTopicsConsumerImpl<byte[]>) consumer).getConsumers();
+        assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern());
+        List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics();
+        List<ConsumerImpl<byte[]>> consumers = 
((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers();
 
         assertEquals(topics.size(), 6);
         assertEquals(consumers.size(), 6);
-        assertEquals(((PatternTopicsConsumerImpl<?>) 
consumer).getTopics().size(), 3);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getTopics().size(), 3);
 
         topics.forEach(topic -> log.debug("topic: {}", topic));
         consumers.forEach(c -> log.debug("consumer: {}", c.getTopic()));
@@ -175,7 +175,7 @@ public void testBinaryProtoToGetTopicsOfNamespace() throws 
Exception {
         IntStream.range(0, topics.size()).forEach(index ->
             
assertTrue(topics.get(index).equals(consumers.get(index).getTopic())));
 
-        ((PatternTopicsConsumerImpl<?>) consumer).getTopics().forEach(topic -> 
log.debug("getTopics topic: {}", topic));
+        ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic));
 
         // 5. produce data
         for (int i = 0; i < totalMessages / 3; i++) {
@@ -235,8 +235,8 @@ public void testTopicsListMinus() throws Exception {
         List<String> oldNames = Lists.newArrayList(topicName1, topicName2, 
topicName3, topicName4);
         List<String> newNames = Lists.newArrayList(topicName3, topicName4, 
topicName5, topicName6);
 
-        List<String> addedNames = 
PatternTopicsConsumerImpl.topicsListsMinus(newNames, oldNames);
-        List<String> removedNames = 
PatternTopicsConsumerImpl.topicsListsMinus(oldNames, newNames);
+        List<String> addedNames = 
PatternMultiTopicsConsumerImpl.topicsListsMinus(newNames, oldNames);
+        List<String> removedNames = 
PatternMultiTopicsConsumerImpl.topicsListsMinus(oldNames, newNames);
 
         assertTrue(addedNames.size() == 2 &&
             addedNames.contains(topicName5) &&
@@ -246,21 +246,21 @@ public void testTopicsListMinus() throws Exception {
             removedNames.contains(topicName2));
 
         // totally 2 different list, should return content of first lists.
-        List<String> addedNames2 = 
PatternTopicsConsumerImpl.topicsListsMinus(addedNames, removedNames);
+        List<String> addedNames2 = 
PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames, removedNames);
         assertTrue(addedNames2.size() == 2 &&
             addedNames2.contains(topicName5) &&
             addedNames2.contains(topicName6));
 
         // 2 same list, should return empty list.
-        List<String> addedNames3 = 
PatternTopicsConsumerImpl.topicsListsMinus(addedNames, addedNames);
+        List<String> addedNames3 = 
PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames, addedNames);
         assertEquals(addedNames3.size(), 0);
 
         // empty list minus: addedNames2.size = 2, addedNames3.size = 0
-        List<String> addedNames4 = 
PatternTopicsConsumerImpl.topicsListsMinus(addedNames2, addedNames3);
+        List<String> addedNames4 = 
PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames2, addedNames3);
         assertTrue(addedNames4.size() == addedNames2.size());
         addedNames4.forEach(name -> assertTrue(addedNames2.contains(name)));
 
-        List<String> addedNames5 = 
PatternTopicsConsumerImpl.topicsListsMinus(addedNames3, addedNames2);
+        List<String> addedNames5 = 
PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames3, addedNames2);
         assertEquals(addedNames5.size(), 0);
     }
 
@@ -290,10 +290,10 @@ public void testStartEmptyPatternConsumer() throws 
Exception {
             .subscribe();
 
         // 3. verify consumer get methods, to get 0 number of partitions and 
topics.
-        assertSame(pattern, ((PatternTopicsConsumerImpl<?>) 
consumer).getPattern());
-        assertEquals(((PatternTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 0);
-        assertEquals(((PatternTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 0);
-        assertEquals(((PatternTopicsConsumerImpl<?>) 
consumer).getTopics().size(), 0);
+        assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern());
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 0);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 0);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getTopics().size(), 0);
 
         // 4. create producer
         String messagePredicate = "my-message-" + key + "-";
@@ -310,15 +310,15 @@ public void testStartEmptyPatternConsumer() throws 
Exception {
 
         // 5. call recheckTopics to subscribe each added topics above
         log.debug("recheck topics change");
-        PatternTopicsConsumerImpl<byte[]> consumer1 = 
((PatternTopicsConsumerImpl<byte[]>) consumer);
+        PatternMultiTopicsConsumerImpl<byte[]> consumer1 = 
((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
         consumer1.run(consumer1.getRecheckPatternTimeout());
         Thread.sleep(100);
 
         // 6. verify consumer get methods, to get number of partitions and 
topics, value 6=1+2+3.
-        assertSame(pattern, ((PatternTopicsConsumerImpl<?>) 
consumer).getPattern());
-        assertEquals(((PatternTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 6);
-        assertEquals(((PatternTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 6);
-        assertEquals(((PatternTopicsConsumerImpl<?>) 
consumer).getTopics().size(), 3);
+        assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern());
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 6);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 6);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getTopics().size(), 3);
 
 
         // 7. produce data
@@ -384,13 +384,13 @@ public void testAutoSubscribePatternConsumer() throws 
Exception {
             .receiverQueueSize(4)
             .subscribe();
 
-        assertTrue(consumer instanceof PatternTopicsConsumerImpl);
+        assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);
 
         // 4. verify consumer get methods, to get 6 number of partitions and 
topics: 6=1+2+3
-        assertSame(pattern, ((PatternTopicsConsumerImpl<?>) 
consumer).getPattern());
-        assertEquals(((PatternTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 6);
-        assertEquals(((PatternTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 6);
-        assertEquals(((PatternTopicsConsumerImpl<?>) 
consumer).getTopics().size(), 3);
+        assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern());
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 6);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 6);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getTopics().size(), 3);
 
         // 5. produce data to topic 1,2,3; verify should receive all the 
message
         for (int i = 0; i < totalMessages / 3; i++) {
@@ -419,12 +419,12 @@ public void testAutoSubscribePatternConsumer() throws 
Exception {
 
         // 7. call recheckTopics to subscribe each added topics above, verify 
topics number: 10=1+2+3+4
         log.debug("recheck topics change");
-        PatternTopicsConsumerImpl<byte[]> consumer1 = 
((PatternTopicsConsumerImpl<byte[]>) consumer);
+        PatternMultiTopicsConsumerImpl<byte[]> consumer1 = 
((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
         consumer1.run(consumer1.getRecheckPatternTimeout());
         Thread.sleep(100);
-        assertEquals(((PatternTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 10);
-        assertEquals(((PatternTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 10);
-        assertEquals(((PatternTopicsConsumerImpl<?>) 
consumer).getTopics().size(), 4);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 10);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 10);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getTopics().size(), 4);
 
         // 8. produce data to topic3 and topic4, verify should receive all the 
message
         for (int i = 0; i < totalMessages / 2; i++) {
@@ -487,13 +487,13 @@ public void testAutoUnbubscribePatternConsumer() throws 
Exception {
             .receiverQueueSize(4)
             .subscribe();
 
-        assertTrue(consumer instanceof PatternTopicsConsumerImpl);
+        assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);
 
         // 4. verify consumer get methods, to get 0 number of partitions and 
topics: 6=1+2+3
-        assertSame(pattern, ((PatternTopicsConsumerImpl<?>) 
consumer).getPattern());
-        assertEquals(((PatternTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 6);
-        assertEquals(((PatternTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 6);
-        assertEquals(((PatternTopicsConsumerImpl<?>) 
consumer).getTopics().size(), 3);
+        assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPattern());
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getPartitionedTopics().size(), 6);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getConsumers().size(), 6);
+        assertEquals(((PatternMultiTopicsConsumerImpl<?>) 
consumer).getTopics().size(), 3);
 
         // 5. produce data to topic 1,2,3; verify should receive all the 
message
         for (int i = 0; i < totalMessages / 3; i++) {
@@ -521,12 +521,12 @@ public void testAutoUnbubscribePatternConsumer() throws 
Exception {
 
         // 7. call recheckTopics to unsubscribe topic 1,3 , verify topics 
number: 2=6-1-3
         log.debug("recheck topics change");
-        PatternTopicsConsumerImpl<byte[]> consumer1 = 
((PatternTopicsConsumerImpl<byte[]>) consumer);
+        PatternMultiTopicsConsumerImpl<byte[]> consumer1 = 
((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
         consumer1.run(consumer1.getRecheckPatternTimeout());
         Thread.sleep(100);
-        assertEquals(((PatternTopicsConsumerImpl<byte[]>) 
consumer).getPartitionedTopics().size(), 2);
-        assertEquals(((PatternTopicsConsumerImpl<byte[]>) 
consumer).getConsumers().size(), 2);
-        assertEquals(((PatternTopicsConsumerImpl<byte[]>) 
consumer).getTopics().size(), 1);
+        assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) 
consumer).getPartitionedTopics().size(), 2);
+        assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) 
consumer).getConsumers().size(), 2);
+        assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) 
consumer).getTopics().size(), 1);
 
         // 8. produce data to topic2, verify should receive all the message
         for (int i = 0; i < totalMessages; i++) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
index 7a94cc26e..053cb5ea1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
@@ -338,7 +338,7 @@ public void testFailoverAckedNormalTopic() throws Exception 
{
     }
 
     private static long 
getUnackedMessagesCountInPartitionedConsumer(Consumer<byte[]> c) {
-        PartitionedConsumerImpl<byte[]> pc = (PartitionedConsumerImpl<byte[]>) 
c;
+        MultiTopicsConsumerImpl<byte[]> pc = (MultiTopicsConsumerImpl<byte[]>) 
c;
         return pc.getUnAckedMessageTracker().size()
                 + pc.getConsumers().stream().mapToLong(consumer -> 
consumer.getUnAckedMessageTracker().size()).sum();
     }
@@ -405,8 +405,8 @@ public void testSharedAckedPartitionedTopic() throws 
Exception {
         assertEquals(received, 5);
 
         // 7. Simulate ackTimeout
-        ((PartitionedConsumerImpl<byte[]>) 
consumer).getUnAckedMessageTracker().toggle();
-        ((PartitionedConsumerImpl<byte[]>) consumer).getConsumers().forEach(c 
-> c.getUnAckedMessageTracker().toggle());
+        ((MultiTopicsConsumerImpl<byte[]>) 
consumer).getUnAckedMessageTracker().toggle();
+        ((MultiTopicsConsumerImpl<byte[]>) consumer).getConsumers().forEach(c 
-> c.getUnAckedMessageTracker().toggle());
 
         // 8. producer publish more messages
         for (int i = 0; i < totalMessages / 3; i++) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 952dfac22..6aa574ddf 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -116,10 +116,10 @@ public void testGetConsumersAndGetTopics() throws 
Exception {
             .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
             .receiverQueueSize(4)
             .subscribe();
-        assertTrue(consumer instanceof TopicsConsumerImpl);
+        assertTrue(consumer instanceof MultiTopicsConsumerImpl);
 
-        List<String> topics = ((TopicsConsumerImpl<byte[]>) 
consumer).getPartitionedTopics();
-        List<ConsumerImpl<byte[]>> consumers = ((TopicsConsumerImpl) 
consumer).getConsumers();
+        List<String> topics = ((MultiTopicsConsumerImpl<byte[]>) 
consumer).getPartitionedTopics();
+        List<ConsumerImpl<byte[]>> consumers = ((MultiTopicsConsumerImpl) 
consumer).getConsumers();
 
         topics.forEach(topic -> log.info("topic: {}", topic));
         consumers.forEach(c -> log.info("consumer: {}", c.getTopic()));
@@ -127,7 +127,7 @@ public void testGetConsumersAndGetTopics() throws Exception 
{
         IntStream.range(0, 6).forEach(index ->
             
assertTrue(topics.get(index).equals(consumers.get(index).getTopic())));
 
-        assertTrue(((TopicsConsumerImpl<byte[]>) consumer).getTopics().size() 
== 3);
+        assertTrue(((MultiTopicsConsumerImpl<byte[]>) 
consumer).getTopics().size() == 3);
 
         consumer.unsubscribe();
         consumer.close();
@@ -167,7 +167,7 @@ public void testSyncProducerAndConsumer() throws Exception {
             .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
             .receiverQueueSize(4)
             .subscribe();
-        assertTrue(consumer instanceof TopicsConsumerImpl);
+        assertTrue(consumer instanceof MultiTopicsConsumerImpl);
 
         // 3. producer publish messages
         for (int i = 0; i < totalMessages / 3; i++) {
@@ -228,7 +228,7 @@ public void testAsyncConsumer() throws Exception {
             .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
             .receiverQueueSize(4)
             .subscribe();
-        assertTrue(consumer instanceof TopicsConsumerImpl);
+        assertTrue(consumer instanceof MultiTopicsConsumerImpl);
 
         // Asynchronously produce messages
         List<Future<MessageId>> futures = Lists.newArrayList();
@@ -307,7 +307,7 @@ public void testConsumerUnackedRedelivery() throws 
Exception {
             .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
             .receiverQueueSize(4)
             .subscribe();
-        assertTrue(consumer instanceof TopicsConsumerImpl);
+        assertTrue(consumer instanceof MultiTopicsConsumerImpl);
 
         // 3. producer publish messages
         for (int i = 0; i < totalMessages / 3; i++) {
@@ -323,7 +323,7 @@ public void testConsumerUnackedRedelivery() throws 
Exception {
             log.debug("Consumer received : " + new String(message.getData()));
             message = consumer.receive(500, TimeUnit.MILLISECONDS);
         }
-        long size = ((TopicsConsumerImpl<byte[]>) 
consumer).getUnAckedMessageTracker().size();
+        long size = ((MultiTopicsConsumerImpl<byte[]>) 
consumer).getUnAckedMessageTracker().size();
         log.debug(key + " Unacked Message Tracker size is " + size);
         assertEquals(size, totalMessages);
 
@@ -338,7 +338,7 @@ public void testConsumerUnackedRedelivery() throws 
Exception {
             message = consumer.receive(500, TimeUnit.MILLISECONDS);
         } while (message != null);
 
-        size = ((TopicsConsumerImpl<byte[]>) 
consumer).getUnAckedMessageTracker().size();
+        size = ((MultiTopicsConsumerImpl<byte[]>) 
consumer).getUnAckedMessageTracker().size();
         log.debug(key + " Unacked Message Tracker size is " + size);
         assertEquals(size, 0);
         assertEquals(hSet.size(), totalMessages);
@@ -361,14 +361,14 @@ public void testConsumerUnackedRedelivery() throws 
Exception {
             consumer.acknowledge(message);
             message = consumer.receive(100, TimeUnit.MILLISECONDS);
         }
-        size = ((TopicsConsumerImpl<byte[]>) 
consumer).getUnAckedMessageTracker().size();
+        size = ((MultiTopicsConsumerImpl<byte[]>) 
consumer).getUnAckedMessageTracker().size();
         log.debug(key + " Unacked Message Tracker size is " + size);
         assertEquals(size, 0);
         assertEquals(received, totalMessages);
 
         // 8. Simulate ackTimeout
-        ((TopicsConsumerImpl<byte[]>) 
consumer).getUnAckedMessageTracker().toggle();
-        ((TopicsConsumerImpl<byte[]>) consumer).getConsumers().forEach(c -> 
c.getUnAckedMessageTracker().toggle());
+        ((MultiTopicsConsumerImpl<byte[]>) 
consumer).getUnAckedMessageTracker().toggle();
+        ((MultiTopicsConsumerImpl<byte[]>) consumer).getConsumers().forEach(c 
-> c.getUnAckedMessageTracker().toggle());
 
         // 9. producer publish more messages
         for (int i = 0; i < totalMessages / 3; i++) {
@@ -384,7 +384,7 @@ public void testConsumerUnackedRedelivery() throws 
Exception {
             log.debug("Consumer received : " + data);
             message = consumer.receive(100, TimeUnit.MILLISECONDS);
         }
-        size = ((TopicsConsumerImpl<byte[]>) 
consumer).getUnAckedMessageTracker().size();
+        size = ((MultiTopicsConsumerImpl<byte[]>) 
consumer).getUnAckedMessageTracker().size();
         log.debug(key + " Unacked Message Tracker size is " + size);
         assertEquals(size, 30);
 
@@ -402,7 +402,7 @@ public void testConsumerUnackedRedelivery() throws 
Exception {
             message = consumer.receive(100, TimeUnit.MILLISECONDS);
         }
         assertEquals(redelivered, 30);
-        size =  ((TopicsConsumerImpl<byte[]>) 
consumer).getUnAckedMessageTracker().size();
+        size =  ((MultiTopicsConsumerImpl<byte[]>) 
consumer).getUnAckedMessageTracker().size();
         log.info(key + " Unacked Message Tracker size is " + size);
         assertEquals(size, 0);
 
@@ -447,7 +447,7 @@ public void testSubscribeUnsubscribeSingleTopic() throws 
Exception {
             .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
             .receiverQueueSize(4)
             .subscribe();
-        assertTrue(consumer instanceof TopicsConsumerImpl);
+        assertTrue(consumer instanceof MultiTopicsConsumerImpl);
 
         // 3. producer publish messages
         for (int i = 0; i < totalMessages / 3; i++) {
@@ -468,7 +468,7 @@ public void testSubscribeUnsubscribeSingleTopic() throws 
Exception {
         assertEquals(messageSet, totalMessages);
 
         // 4, unsubscribe topic3
-        CompletableFuture<Void> unsubFuture = ((TopicsConsumerImpl<byte[]>) 
consumer).unsubscribeAsync(topicName3);
+        CompletableFuture<Void> unsubFuture = 
((MultiTopicsConsumerImpl<byte[]>) consumer).unsubscribeAsync(topicName3);
         unsubFuture.get();
 
         // 5. producer publish messages
@@ -491,15 +491,15 @@ public void testSubscribeUnsubscribeSingleTopic() throws 
Exception {
         assertEquals(messageSet, totalMessages * 2 / 3);
 
         // 7. use getter to verify internal topics number after un-subscribe 
topic3
-        List<String> topics = ((TopicsConsumerImpl<byte[]>) 
consumer).getPartitionedTopics();
-        List<ConsumerImpl<byte[]>> consumers = ((TopicsConsumerImpl) 
consumer).getConsumers();
+        List<String> topics = ((MultiTopicsConsumerImpl<byte[]>) 
consumer).getPartitionedTopics();
+        List<ConsumerImpl<byte[]>> consumers = ((MultiTopicsConsumerImpl) 
consumer).getConsumers();
 
         assertEquals(topics.size(), 3);
         assertEquals(consumers.size(), 3);
-        assertTrue(((TopicsConsumerImpl<byte[]>) consumer).getTopics().size() 
== 2);
+        assertTrue(((MultiTopicsConsumerImpl<byte[]>) 
consumer).getTopics().size() == 2);
 
         // 8. re-subscribe topic3
-        CompletableFuture<Void> subFuture = 
((TopicsConsumerImpl<byte[]>)consumer).subscribeAsync(topicName3);
+        CompletableFuture<Void> subFuture = 
((MultiTopicsConsumerImpl<byte[]>)consumer).subscribeAsync(topicName3);
         subFuture.get();
 
         // 9. producer publish messages
@@ -522,12 +522,12 @@ public void testSubscribeUnsubscribeSingleTopic() throws 
Exception {
         assertEquals(messageSet, totalMessages);
 
         // 11. use getter to verify internal topics number after subscribe 
topic3
-        topics = ((TopicsConsumerImpl<byte[]>) 
consumer).getPartitionedTopics();
-        consumers = ((TopicsConsumerImpl) consumer).getConsumers();
+        topics = ((MultiTopicsConsumerImpl<byte[]>) 
consumer).getPartitionedTopics();
+        consumers = ((MultiTopicsConsumerImpl) consumer).getConsumers();
 
         assertEquals(topics.size(), 6);
         assertEquals(consumers.size(), 6);
-        assertTrue(((TopicsConsumerImpl<byte[]>) consumer).getTopics().size() 
== 3);
+        assertTrue(((MultiTopicsConsumerImpl<byte[]>) 
consumer).getTopics().size() == 3);
 
         consumer.unsubscribe();
         consumer.close();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 2cbd64224..2a88bf051 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -914,7 +914,7 @@ protected synchronized void messageProcessed(Message<?> 
msg) {
                 id = new MessageIdImpl(id.getLedgerId(), id.getEntryId(), 
getPartitionIndex());
             }
             if (partitionIndex != -1) {
-                // we should no longer track this message, 
PartitionedConsumerImpl will take care from now onwards
+                // we should no longer track this message, TopicsConsumer will 
take care from now onwards
                 unAckedMessageTracker.remove(id);
             } else {
                 unAckedMessageTracker.add(id);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
similarity index 74%
rename from 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
rename to 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 1089cef53..6da72ec08 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -56,7 +56,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class TopicsConsumerImpl<T> extends ConsumerBase<T> {
+public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
 
     // All topics should be in same namespace
     protected NamespaceName namespaceName;
@@ -76,15 +76,15 @@
     private final int sharedQueueResumeThreshold;
 
     // sum of topicPartitions, simple topic has 1, partitioned topic equals to 
partition number.
-    AtomicInteger numberTopicPartitions;
+    AtomicInteger allTopicPartitionsNumber;
 
     private final ReadWriteLock lock = new ReentrantReadWriteLock();
     private final ConsumerStatsRecorder stats;
     private final UnAckedMessageTracker unAckedMessageTracker;
     private final ConsumerConfigurationData<T> internalConfig;
 
-    TopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> 
conf, ExecutorService listenerExecutor,
-                       CompletableFuture<Consumer<T>> subscribeFuture, 
Schema<T> schema) {
+    MultiTopicsConsumerImpl(PulsarClientImpl client, 
ConsumerConfigurationData<T> conf, ExecutorService listenerExecutor,
+                            CompletableFuture<Consumer<T>> subscribeFuture, 
Schema<T> schema) {
         super(client, "TopicsConsumerFakeTopicName" + 
ConsumerName.generateRandomName(), conf,
                 Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, 
subscribeFuture, schema);
 
@@ -95,7 +95,7 @@
         this.consumers = new ConcurrentHashMap<>();
         this.pausedConsumers = new ConcurrentLinkedQueue<>();
         this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
-        this.numberTopicPartitions = new AtomicInteger(0);
+        this.allTopicPartitionsNumber = new AtomicInteger(0);
 
         if (conf.getAckTimeoutMillis() != 0) {
             this.unAckedMessageTracker = new 
UnAckedTopicMessageTracker(client, this, conf.getAckTimeoutMillis());
@@ -109,7 +109,7 @@
         if (conf.getTopicNames().isEmpty()) {
             this.namespaceName = null;
             setState(State.Ready);
-            subscribeFuture().complete(TopicsConsumerImpl.this);
+            subscribeFuture().complete(MultiTopicsConsumerImpl.this);
             return;
         }
 
@@ -122,15 +122,15 @@
         FutureUtil.waitForAll(futures)
             .thenAccept(finalFuture -> {
                 try {
-                    if (numberTopicPartitions.get() > maxReceiverQueueSize) {
-                        setMaxReceiverQueueSize(numberTopicPartitions.get());
+                    if (allTopicPartitionsNumber.get() > maxReceiverQueueSize) 
{
+                        
setMaxReceiverQueueSize(allTopicPartitionsNumber.get());
                     }
                     setState(State.Ready);
                     // We have successfully created N consumers, so we can 
start receiving messages now
                     
startReceivingMessages(consumers.values().stream().collect(Collectors.toList()));
-                    subscribeFuture().complete(TopicsConsumerImpl.this);
+                    subscribeFuture().complete(MultiTopicsConsumerImpl.this);
                     log.info("[{}] [{}] Created topics consumer with {} 
sub-consumers",
-                        topic, subscription, numberTopicPartitions.get());
+                        topic, subscription, allTopicPartitionsNumber.get());
                 } catch (PulsarClientException e) {
                     log.warn("[{}] Failed startReceivingMessages while 
subscribe topics: {}", topic, e.getMessage());
                     subscribeFuture.completeExceptionally(e);
@@ -245,7 +245,7 @@ private void messageReceived(ConsumerImpl<T> consumer, 
Message<T> message) {
             } else {
                 // Enqueue the message so that it can be retrieved when 
application calls receive()
                 // Waits for the queue to have space for the message
-                // This should never block cause TopicsConsumerImpl should 
always use GrowableArrayBlockingQueue
+                // This should never block cause MultiTopicsConsumerImpl 
should always use GrowableArrayBlockingQueue
                 incomingMessages.put(topicMessage);
             }
         } catch (InterruptedException e) {
@@ -271,7 +271,7 @@ private void messageReceived(ConsumerImpl<T> consumer, 
Message<T> message) {
                         log.debug("[{}][{}] Calling message listener for 
message {}",
                             topic, subscription, message.getMessageId());
                     }
-                    listener.received(TopicsConsumerImpl.this, msg);
+                    listener.received(MultiTopicsConsumerImpl.this, msg);
                 } catch (Throwable t) {
                     log.error("[{}][{}] Message listener error in processing 
message: {}",
                         topic, subscription, message, t);
@@ -613,117 +613,172 @@ private boolean topicNameValid(String topicName) {
         }
 
         CompletableFuture<Void> subscribeResult = new CompletableFuture<>();
-        final AtomicInteger partitionNumber = new AtomicInteger(0);
 
-        client.getPartitionedTopicMetadata(topicName).thenAccept(metadata -> {
-            if (log.isDebugEnabled()) {
-                log.debug("Received topic {} metadata.partitions: {}", 
topicName, metadata.partitions);
-            }
+        client.getPartitionedTopicMetadata(topicName)
+            .thenAccept(metadata -> subscribeTopicPartitions(subscribeResult, 
topicName, metadata.partitions))
+            .exceptionally(ex1 -> {
+                log.warn("[{}] Failed to get partitioned topic metadata: {}", 
topicName, ex1.getMessage());
+                subscribeResult.completeExceptionally(ex1);
+                return null;
+            });
 
-            List<CompletableFuture<Consumer<T>>> futureList;
-
-            if (metadata.partitions > 1) {
-                this.topics.putIfAbsent(topicName, metadata.partitions);
-                numberTopicPartitions.addAndGet(metadata.partitions);
-                partitionNumber.addAndGet(metadata.partitions);
-
-                futureList = IntStream
-                    .range(0, partitionNumber.get())
-                    .mapToObj(
-                        partitionIndex -> {
-                            String partitionName = 
TopicName.get(topicName).getPartition(partitionIndex).toString();
-                            CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
-                            ConsumerImpl<T> newConsumer = new 
ConsumerImpl<>(client, partitionName, internalConfig,
-                                    
client.externalExecutorProvider().getExecutor(), partitionIndex, subFuture, 
schema);
-                            consumers.putIfAbsent(newConsumer.getTopic(), 
newConsumer);
-                            return subFuture;
-                        })
-                    .collect(Collectors.toList());
-            } else {
-                this.topics.putIfAbsent(topicName, 1);
-                numberTopicPartitions.incrementAndGet();
-                partitionNumber.incrementAndGet();
+        return subscribeResult;
+    }
 
-                CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
-                ConsumerImpl<T> newConsumer = new ConsumerImpl<>(client, 
topicName, internalConfig,
-                        client.externalExecutorProvider().getExecutor(), 0, 
subFuture, schema);
-                consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
+    // create consumer for a single topic with already known partitions.
+    // first create a consumer with no topic, then do subscription for already 
know partitionedTopic.
+    public static <T> MultiTopicsConsumerImpl<T> 
createPartitionedConsumer(PulsarClientImpl client,
+                                                                           
ConsumerConfigurationData<T> conf,
+                                                                           
ExecutorService listenerExecutor,
+                                                                           
CompletableFuture<Consumer<T>> subscribeFuture,
+                                                                           int 
numPartitions,
+                                                                           
Schema<T> schema) {
+        checkArgument(conf.getTopicNames().size() == 1, "Should have only 1 
topic for partitioned consumer");
+
+        // get topic name, then remove it from conf, so constructor will 
create a consumer with no topic.
+        ConsumerConfigurationData cloneConf = conf.clone();
+        String topicName = cloneConf.getSingleTopic();
+        cloneConf.getTopicNames().remove(topicName);
+
+        CompletableFuture<Consumer> future = new CompletableFuture<>();
+        MultiTopicsConsumerImpl consumer = new MultiTopicsConsumerImpl(client, 
cloneConf, listenerExecutor, future, schema);
+
+        future.thenCompose(c -> 
((MultiTopicsConsumerImpl)c).subscribeAsync(topicName, numPartitions))
+            .thenRun(()-> subscribeFuture.complete(consumer))
+            .exceptionally(e -> {
+                log.warn("Failed subscription for createPartitionedConsumer: 
{} {}, e:{}",
+                    topicName, numPartitions,  e);
+                
subscribeFuture.completeExceptionally(((Throwable)e).getCause());
+                return null;
+            });;
+        return consumer;
+    }
 
-                futureList = Collections.singletonList(subFuture);
-            }
+    // subscribe one more given topic, but already know the numberPartitions
+    private CompletableFuture<Void> subscribeAsync(String topicName, int 
numberPartitions) {
+        if (!topicNameValid(topicName)) {
+            return FutureUtil.failedFuture(
+                new PulsarClientException.AlreadyClosedException("Topic name 
not valid"));
+        }
 
-            FutureUtil.waitForAll(futureList)
-                .thenAccept(finalFuture -> {
-                    try {
-                        if (numberTopicPartitions.get() > 
maxReceiverQueueSize) {
-                            
setMaxReceiverQueueSize(numberTopicPartitions.get());
-                        }
-                        int numTopics = 
this.topics.values().stream().mapToInt(Integer::intValue).sum();
-                        checkState(numberTopicPartitions.get() == numTopics,
-                            "numberTopicPartitions " + 
numberTopicPartitions.get()
-                                + " not equals expected: " + numTopics);
-
-                        // We have successfully created new consumers, so we 
can start receiving messages for them
-                        startReceivingMessages(
-                            consumers.values().stream()
-                                .filter(consumer1 -> {
-                                    String consumerTopicName = 
consumer1.getTopic();
-                                    if 
(TopicName.get(consumerTopicName).getPartitionedTopicName().equals(
-                                        
TopicName.get(topicName).getPartitionedTopicName().toString())) {
-                                        return true;
-                                    } else {
-                                        return false;
-                                    }
-                                })
-                                .collect(Collectors.toList()));
-
-                        subscribeResult.complete(null);
-                        log.info("[{}] [{}] Success subscribe new topic {} in 
topics consumer, numberTopicPartitions {}",
-                            topic, subscription, topicName, 
numberTopicPartitions.get());
-                        if (this.namespaceName == null) {
-                            this.namespaceName = 
TopicName.get(topicName).getNamespaceObject();
-                        }
-                        return;
-                    } catch (PulsarClientException e) {
-                        handleSubscribeOneTopicError(topicName, e);
-                        subscribeResult.completeExceptionally(e);
-                    }
-                })
-                .exceptionally(ex -> {
-                    handleSubscribeOneTopicError(topicName, ex);
-                    subscribeResult.completeExceptionally(ex);
-                    return null;
-                });
-        }).exceptionally(ex1 -> {
-            log.warn("[{}] Failed to get partitioned topic metadata: {}", 
topicName, ex1.getMessage());
-            subscribeResult.completeExceptionally(ex1);
-            return null;
-        });
+        if (getState() == State.Closing || getState() == State.Closed) {
+            return FutureUtil.failedFuture(
+                new PulsarClientException.AlreadyClosedException("Topics 
Consumer was already closed"));
+        }
+
+        CompletableFuture<Void> subscribeResult = new CompletableFuture<>();
+        subscribeTopicPartitions(subscribeResult, topicName, numberPartitions);
 
         return subscribeResult;
     }
 
-    // handling failure during subscribe new topic, unsubscribe success 
created partitions
-    private void handleSubscribeOneTopicError(String topicName, Throwable 
error) {
-        log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer 
", topic, topicName, error.getMessage());
+    private void subscribeTopicPartitions(CompletableFuture<Void> 
subscribeResult, String topicName, int partitionNumber) {
+        if (log.isDebugEnabled()) {
+            log.debug("Subscribe to topic {} metadata.partitions: {}", 
topicName, partitionNumber);
+        }
 
-        consumers.values().stream().filter(consumer1 -> {
-            String consumerTopicName = consumer1.getTopic();
-            if 
(TopicName.get(consumerTopicName).getPartitionedTopicName().equals(topicName)) {
-                return true;
-            } else {
-                return false;
-            }
-        }).forEach(consumer2 ->  {
-            consumer2.closeAsync().handle((ok, closeException) -> {
-                consumer2.subscribeFuture().completeExceptionally(error);
+        List<CompletableFuture<Consumer<T>>> futureList;
+
+        if (partitionNumber > 1) {
+            this.topics.putIfAbsent(topicName, partitionNumber);
+            allTopicPartitionsNumber.addAndGet(partitionNumber);
+
+            futureList = IntStream
+                .range(0, partitionNumber)
+                .mapToObj(
+                    partitionIndex -> {
+                        String partitionName = 
TopicName.get(topicName).getPartition(partitionIndex).toString();
+                        CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
+                        ConsumerImpl<T> newConsumer = new 
ConsumerImpl<>(client, partitionName, internalConfig,
+                            client.externalExecutorProvider().getExecutor(), 
partitionIndex, subFuture, schema);
+                        consumers.putIfAbsent(newConsumer.getTopic(), 
newConsumer);
+                        return subFuture;
+                    })
+                .collect(Collectors.toList());
+        } else {
+            this.topics.putIfAbsent(topicName, 1);
+            allTopicPartitionsNumber.incrementAndGet();
+
+            CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
+            ConsumerImpl<T> newConsumer = new ConsumerImpl<>(client, 
topicName, internalConfig,
+                client.externalExecutorProvider().getExecutor(), 0, subFuture, 
schema);
+            consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
+
+            futureList = Collections.singletonList(subFuture);
+        }
+
+        FutureUtil.waitForAll(futureList)
+            .thenAccept(finalFuture -> {
+                try {
+                    if (allTopicPartitionsNumber.get() > maxReceiverQueueSize) 
{
+                        
setMaxReceiverQueueSize(allTopicPartitionsNumber.get());
+                    }
+                    int numTopics = 
this.topics.values().stream().mapToInt(Integer::intValue).sum();
+                    checkState(allTopicPartitionsNumber.get() == numTopics,
+                        "allTopicPartitionsNumber " + 
allTopicPartitionsNumber.get()
+                            + " not equals expected: " + numTopics);
+
+                    // We have successfully created new consumers, so we can 
start receiving messages for them
+                    startReceivingMessages(
+                        consumers.values().stream()
+                            .filter(consumer1 -> {
+                                String consumerTopicName = 
consumer1.getTopic();
+                                if 
(TopicName.get(consumerTopicName).getPartitionedTopicName().equals(
+                                    
TopicName.get(topicName).getPartitionedTopicName().toString())) {
+                                    return true;
+                                } else {
+                                    return false;
+                                }
+                            })
+                            .collect(Collectors.toList()));
+
+                    subscribeResult.complete(null);
+                    log.info("[{}] [{}] Success subscribe new topic {} in 
topics consumer, partitions: {}, allTopicPartitionsNumber: {}",
+                        topic, subscription, topicName, partitionNumber, 
allTopicPartitionsNumber.get());
+                    if (this.namespaceName == null) {
+                        this.namespaceName = 
TopicName.get(topicName).getNamespaceObject();
+                    }
+                    return;
+                } catch (PulsarClientException e) {
+                    handleSubscribeOneTopicError(topicName, e, 
subscribeResult);
+                }
+            })
+            .exceptionally(ex -> {
+                handleSubscribeOneTopicError(topicName, ex, subscribeResult);
                 return null;
             });
-            consumers.remove(consumer2.getTopic());
-        });
+    }
 
-        topics.remove(topicName);
-        checkState(numberTopicPartitions.get() == consumers.values().size());
+    // handling failure during subscribe new topic, unsubscribe success 
created partitions
+    private void handleSubscribeOneTopicError(String topicName, Throwable 
error, CompletableFuture<Void> subscribeFuture) {
+        log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer 
{}", topic, topicName, error.getMessage());
+
+        client.externalExecutorProvider().getExecutor().submit(() -> {
+            AtomicInteger toCloseNum = new AtomicInteger(0);
+            consumers.values().stream().filter(consumer1 -> {
+                String consumerTopicName = consumer1.getTopic();
+                if 
(TopicName.get(consumerTopicName).getPartitionedTopicName().equals(topicName)) {
+                    return true;
+                } else {
+                    return false;
+                }
+            }).forEach(consumer2 -> {
+                toCloseNum.incrementAndGet();
+                consumer2.closeAsync().whenComplete((r, ex) -> {
+                    consumer2.subscribeFuture().completeExceptionally(error);
+                    allTopicPartitionsNumber.decrementAndGet();
+                    consumers.remove(consumer2.getTopic());
+                    if (toCloseNum.decrementAndGet() == 0) {
+                        log.warn("[{}] Failed to subscribe for topic [{}] in 
topics consumer, subscribe error: {}",
+                            topic, topicName, error.getMessage());
+                        topics.remove(topicName);
+                        checkState(allTopicPartitionsNumber.get() == 
consumers.values().size());
+                        subscribeFuture.completeExceptionally(error);
+                    }
+                    return;
+                });
+            });
+        });
     }
 
     // un-subscribe a given topic
@@ -757,15 +812,15 @@ private void handleSubscribeOneTopicError(String 
topicName, Throwable error) {
                     consumersToUnsub.forEach(consumer1 -> {
                         consumers.remove(consumer1.getTopic());
                         pausedConsumers.remove(consumer1);
-                        numberTopicPartitions.decrementAndGet();
+                        allTopicPartitionsNumber.decrementAndGet();
                     });
 
                     topics.remove(topicName);
                     ((UnAckedTopicMessageTracker) 
unAckedMessageTracker).removeTopicMessages(topicName);
 
                     unsubscribeFuture.complete(null);
-                    log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer, 
numberTopicPartitions: {}",
-                        topicName, subscription, consumerName, 
numberTopicPartitions);
+                    log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer, 
allTopicPartitionsNumber: {}",
+                        topicName, subscription, consumerName, 
allTopicPartitionsNumber);
                 } else {
                     unsubscribeFuture.completeExceptionally(ex);
                     setState(State.Failed);
@@ -792,5 +847,5 @@ private void handleSubscribeOneTopicError(String topicName, 
Throwable error) {
         return consumers.values().stream().collect(Collectors.toList());
     }
 
-    private static final Logger log = 
LoggerFactory.getLogger(TopicsConsumerImpl.class);
+    private static final Logger log = 
LoggerFactory.getLogger(MultiTopicsConsumerImpl.class);
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
deleted file mode 100644
index 9c952dd22..000000000
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
+++ /dev/null
@@ -1,553 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.impl;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
-
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.util.FutureUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-public class PartitionedConsumerImpl<T> extends ConsumerBase<T> {
-
-    private final List<ConsumerImpl<T>> consumers;
-
-    // Queue of partition consumers on which we have stopped calling 
receiveAsync() because the
-    // shared incoming queue was full
-    private final ConcurrentLinkedQueue<ConsumerImpl<T>> pausedConsumers;
-
-    // Threshold for the shared queue. When the size of the shared queue goes 
below the threshold, we are going to
-    // resume receiving from the paused consumer partitions
-    private final int sharedQueueResumeThreshold;
-
-    private final int numPartitions;
-    private final ReadWriteLock lock = new ReentrantReadWriteLock();
-    private final ConsumerStatsRecorderImpl stats;
-    private final UnAckedMessageTracker unAckedMessageTracker;
-
-    PartitionedConsumerImpl(PulsarClientImpl client, 
ConsumerConfigurationData<T> conf, int numPartitions,
-                            ExecutorService listenerExecutor, 
CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema) {
-        super(client, conf.getSingleTopic(), conf, Math.max(Math.max(2, 
numPartitions), conf.getReceiverQueueSize()),
-                listenerExecutor, subscribeFuture, schema);
-        this.consumers = Lists.newArrayListWithCapacity(numPartitions);
-        this.pausedConsumers = new ConcurrentLinkedQueue<>();
-        this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
-        this.numPartitions = numPartitions;
-
-        if (conf.getAckTimeoutMillis() != 0) {
-            this.unAckedMessageTracker = new UnAckedMessageTracker(client, 
this, conf.getAckTimeoutMillis());
-        } else {
-            this.unAckedMessageTracker = 
UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED;
-        }
-
-        stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new 
ConsumerStatsRecorderImpl() : null;
-        checkArgument(conf.getReceiverQueueSize() > 0,
-                "Receiver queue size needs to be greater than 0 for 
Partitioned Topics");
-        start();
-    }
-
-    private void start() {
-        AtomicReference<Throwable> subscribeFail = new 
AtomicReference<Throwable>();
-        AtomicInteger completed = new AtomicInteger();
-        ConsumerConfigurationData<T> internalConfig = 
getInternalConsumerConfig();
-        for (int partitionIndex = 0; partitionIndex < numPartitions; 
partitionIndex++) {
-            String partitionName = 
TopicName.get(topic).getPartition(partitionIndex).toString();
-            ConsumerImpl<T> consumer = new ConsumerImpl<>(client, 
partitionName, internalConfig,
-                    client.externalExecutorProvider().getExecutor(), 
partitionIndex, new CompletableFuture<>(), schema);
-            consumers.add(consumer);
-            consumer.subscribeFuture().handle((cons, subscribeException) -> {
-                if (subscribeException != null) {
-                    setState(State.Failed);
-                    subscribeFail.compareAndSet(null, subscribeException);
-                    client.cleanupConsumer(this);
-                }
-                if (completed.incrementAndGet() == numPartitions) {
-                    if (subscribeFail.get() == null) {
-                        try {
-                            // We have successfully created N consumers, so we 
can start receiving messages now
-                            starReceivingMessages();
-                            setState(State.Ready);
-                            
subscribeFuture().complete(PartitionedConsumerImpl.this);
-                            log.info("[{}] [{}] Created partitioned consumer", 
topic, subscription);
-                            return null;
-                        } catch (PulsarClientException e) {
-                            subscribeFail.set(e);
-                        }
-                    }
-                    closeAsync().handle((ok, closeException) -> {
-                        
subscribeFuture().completeExceptionally(subscribeFail.get());
-                        client.cleanupConsumer(this);
-                        return null;
-                    });
-                    log.error("[{}] [{}] Could not create partitioned 
consumer.", topic, subscription,
-                            subscribeFail.get().getCause());
-                }
-                return null;
-            });
-        }
-    }
-
-    private void starReceivingMessages() throws PulsarClientException {
-        for (ConsumerImpl<T> consumer : consumers) {
-            
consumer.sendFlowPermitsToBroker(consumer.getConnectionHandler().cnx(), 
conf.getReceiverQueueSize());
-            receiveMessageFromConsumer(consumer);
-        }
-    }
-
-    private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {
-        consumer.receiveAsync().thenAccept(message -> {
-            // Process the message, add to the queue and trigger listener or 
async callback
-            messageReceived(message);
-
-            // we're modifying pausedConsumers
-            lock.writeLock().lock();
-            try {
-                int size = incomingMessages.size();
-                if (size >= maxReceiverQueueSize
-                        || (size > sharedQueueResumeThreshold && 
!pausedConsumers.isEmpty())) {
-                    // mark this consumer to be resumed later: if No more 
space left in shared queue,
-                    // or if any consumer is already paused (to create fair 
chance for already paused consumers)
-                    pausedConsumers.add(consumer);
-                } else {
-                    // Schedule next receiveAsync() if the incoming queue is 
not full. Use a different thread to avoid
-                    // recursion and stack overflow
-                    client.eventLoopGroup().execute(() -> {
-                        receiveMessageFromConsumer(consumer);
-                    });
-                }
-            } finally {
-                lock.writeLock().unlock();
-            }
-        });
-    }
-
-    private void resumeReceivingFromPausedConsumersIfNeeded() {
-        lock.readLock().lock();
-        try {
-            if (incomingMessages.size() <= sharedQueueResumeThreshold && 
!pausedConsumers.isEmpty()) {
-                while (true) {
-                    ConsumerImpl<T> consumer = pausedConsumers.poll();
-                    if (consumer == null) {
-                        break;
-                    }
-
-                    // if messages are readily available on consumer we will 
attempt to writeLock on the same thread
-                    client.eventLoopGroup().execute(() -> {
-                        receiveMessageFromConsumer(consumer);
-                    });
-                }
-            }
-        } finally {
-            lock.readLock().unlock();
-        }
-    }
-
-    @Override
-    protected Message<T> internalReceive() throws PulsarClientException {
-        Message<T> message;
-        try {
-            message = incomingMessages.take();
-            unAckedMessageTracker.add((MessageIdImpl) message.getMessageId());
-            resumeReceivingFromPausedConsumersIfNeeded();
-            return message;
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarClientException(e);
-        }
-    }
-
-    @Override
-    protected Message<T> internalReceive(int timeout, TimeUnit unit) throws 
PulsarClientException {
-        Message<T> message;
-        try {
-            message = incomingMessages.poll(timeout, unit);
-            if (message != null) {
-                unAckedMessageTracker.add(message.getMessageId());
-            }
-            resumeReceivingFromPausedConsumersIfNeeded();
-            return message;
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new PulsarClientException(e);
-        }
-    }
-
-    @Override
-    protected CompletableFuture<Message<T>> internalReceiveAsync() {
-        CompletableFuture<Message<T>> result = new CompletableFuture<>();
-        Message<T> message;
-        try {
-            lock.writeLock().lock();
-            message = incomingMessages.poll(0, TimeUnit.SECONDS);
-            if (message == null) {
-                pendingReceives.add(result);
-            } else {
-                unAckedMessageTracker.add(message.getMessageId());
-                resumeReceivingFromPausedConsumersIfNeeded();
-                result.complete(message);
-            }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            result.completeExceptionally(new PulsarClientException(e));
-        } finally {
-            lock.writeLock().unlock();
-        }
-
-        return result;
-    }
-
-    @Override
-    protected CompletableFuture<Void> doAcknowledge(MessageId messageId, 
AckType ackType,
-                                                    Map<String,Long> 
properties) {
-        checkArgument(messageId instanceof MessageIdImpl);
-
-        if (getState() != State.Ready) {
-            return FutureUtil.failedFuture(new PulsarClientException("Consumer 
already closed"));
-        }
-
-        if (ackType == AckType.Cumulative) {
-            return FutureUtil.failedFuture(new 
PulsarClientException.NotSupportedException(
-                    "Cumulative acknowledge not supported for partitioned 
topics"));
-        } else {
-
-            ConsumerImpl<T> consumer = consumers.get(((MessageIdImpl) 
messageId).getPartitionIndex());
-            return consumer.doAcknowledge(messageId, ackType, 
properties).thenRun(() ->
-                    unAckedMessageTracker.remove(messageId));
-        }
-
-    }
-
-    @Override
-    public CompletableFuture<Void> unsubscribeAsync() {
-        if (getState() == State.Closing || getState() == State.Closed) {
-            return FutureUtil.failedFuture(
-                    new 
PulsarClientException.AlreadyClosedException("Partitioned Consumer was already 
closed"));
-        }
-        setState(State.Closing);
-
-        AtomicReference<Throwable> unsubscribeFail = new 
AtomicReference<Throwable>();
-        AtomicInteger completed = new AtomicInteger(numPartitions);
-        CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
-        for (Consumer<T> consumer : consumers) {
-            if (consumer != null) {
-                consumer.unsubscribeAsync().handle((unsubscribed, ex) -> {
-                    if (ex != null) {
-                        unsubscribeFail.compareAndSet(null, ex);
-                    }
-                    if (completed.decrementAndGet() == 0) {
-                        if (unsubscribeFail.get() == null) {
-                            setState(State.Closed);
-                            unAckedMessageTracker.close();
-                            unsubscribeFuture.complete(null);
-                            log.info("[{}] [{}] Unsubscribed Partitioned 
Consumer", topic, subscription);
-                        } else {
-                            setState(State.Failed);
-                            
unsubscribeFuture.completeExceptionally(unsubscribeFail.get());
-                            log.error("[{}] [{}] Could not unsubscribe 
Partitioned Consumer", topic, subscription,
-                                    unsubscribeFail.get().getCause());
-                        }
-                    }
-
-                    return null;
-                });
-            }
-
-        }
-
-        return unsubscribeFuture;
-    }
-
-    @Override
-    public CompletableFuture<Void> closeAsync() {
-        if (getState() == State.Closing || getState() == State.Closed) {
-            unAckedMessageTracker.close();
-            return CompletableFuture.completedFuture(null);
-        }
-        setState(State.Closing);
-
-        AtomicReference<Throwable> closeFail = new 
AtomicReference<Throwable>();
-        AtomicInteger completed = new AtomicInteger(numPartitions);
-        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
-        for (Consumer<T> consumer : consumers) {
-            if (consumer != null) {
-                consumer.closeAsync().handle((closed, ex) -> {
-                    if (ex != null) {
-                        closeFail.compareAndSet(null, ex);
-                    }
-                    if (completed.decrementAndGet() == 0) {
-                        if (closeFail.get() == null) {
-                            setState(State.Closed);
-                            unAckedMessageTracker.close();
-                            closeFuture.complete(null);
-                            log.info("[{}] [{}] Closed Partitioned Consumer", 
topic, subscription);
-                            client.cleanupConsumer(this);
-                            // fail all pending-receive futures to notify 
application
-                            failPendingReceive();
-                        } else {
-                            setState(State.Failed);
-                            closeFuture.completeExceptionally(closeFail.get());
-                            log.error("[{}] [{}] Could not close Partitioned 
Consumer", topic, subscription,
-                                    closeFail.get().getCause());
-                        }
-                    }
-
-                    return null;
-                });
-            }
-
-        }
-
-        return closeFuture;
-    }
-
-    private void failPendingReceive() {
-        lock.readLock().lock();
-        try {
-            if (listenerExecutor != null && !listenerExecutor.isShutdown()) {
-                while (!pendingReceives.isEmpty()) {
-                    CompletableFuture<Message<T>> receiveFuture = 
pendingReceives.poll();
-                    if (receiveFuture != null) {
-                        receiveFuture.completeExceptionally(
-                                new 
PulsarClientException.AlreadyClosedException("Consumer is already closed"));
-                    } else {
-                        break;
-                    }
-                }
-            }
-        } finally {
-            lock.readLock().unlock();
-        }
-    }
-
-    @Override
-    public boolean isConnected() {
-        return consumers.stream().allMatch(ConsumerImpl::isConnected);
-    }
-
-    void messageReceived(Message<T> message) {
-        lock.writeLock().lock();
-        try {
-            unAckedMessageTracker.add(message.getMessageId());
-            if (log.isDebugEnabled()) {
-                log.debug("[{}][{}] Received message from partitioned-consumer 
{}", topic, subscription, message.getMessageId());
-            }
-            // if asyncReceive is waiting : return message to callback without 
adding to incomingMessages queue
-            if (!pendingReceives.isEmpty()) {
-                CompletableFuture<Message<T>> receivedFuture = 
pendingReceives.poll();
-                listenerExecutor.execute(() -> 
receivedFuture.complete(message));
-            } else {
-                // Enqueue the message so that it can be retrieved when 
application calls receive()
-                // Waits for the queue to have space for the message
-                // This should never block cause PartitonedConsumerImpl should 
always use GrowableArrayBlockingQueue
-                incomingMessages.put(message);
-            }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        } finally {
-            lock.writeLock().unlock();
-        }
-
-        if (listener != null) {
-            // Trigger the notification on the message listener in a separate 
thread to avoid blocking the networking
-            // thread while the message processing happens
-            listenerExecutor.execute(() -> {
-                Message<T> msg;
-                try {
-                    msg = internalReceive();
-                } catch (PulsarClientException e) {
-                    log.warn("[{}] [{}] Failed to dequeue the message for 
listener", topic, subscription, e);
-                    return;
-                }
-
-                try {
-                    if (log.isDebugEnabled()) {
-                        log.debug("[{}][{}] Calling message listener for 
message {}", topic, subscription, message.getMessageId());
-                    }
-                    listener.received(PartitionedConsumerImpl.this, msg);
-                } catch (Throwable t) {
-                    log.error("[{}][{}] Message listener error in processing 
message: {}", topic, subscription, message,
-                            t);
-                }
-            });
-        }
-    }
-
-    @Override
-    String getHandlerName() {
-        return subscription;
-    }
-
-    private ConsumerConfigurationData<T> getInternalConsumerConfig() {
-        ConsumerConfigurationData<T> internalConsumerConfig = new 
ConsumerConfigurationData<>();
-        
internalConsumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize());
-        internalConsumerConfig.setSubscriptionName(conf.getSubscriptionName());
-        internalConsumerConfig.setSubscriptionType(conf.getSubscriptionType());
-        internalConsumerConfig.setConsumerName(consumerName);
-        
internalConsumerConfig.setAcknowledgementsGroupTimeMicros(conf.getAcknowledgementsGroupTimeMicros());
-        internalConsumerConfig.setPriorityLevel(conf.getPriorityLevel());
-        internalConsumerConfig.setProperties(conf.getProperties());
-        internalConsumerConfig.setReadCompacted(conf.isReadCompacted());
-        if (null != conf.getConsumerEventListener()) {
-            
internalConsumerConfig.setConsumerEventListener(conf.getConsumerEventListener());
-        }
-
-        int receiverQueueSize = Math.min(conf.getReceiverQueueSize(),
-                conf.getMaxTotalReceiverQueueSizeAcrossPartitions() / 
numPartitions);
-        internalConsumerConfig.setReceiverQueueSize(receiverQueueSize);
-
-        if (conf.getCryptoKeyReader() != null) {
-            
internalConsumerConfig.setCryptoKeyReader(conf.getCryptoKeyReader());
-            
internalConsumerConfig.setCryptoFailureAction(conf.getCryptoFailureAction());
-        }
-        if (conf.getAckTimeoutMillis() != 0) {
-            
internalConsumerConfig.setAckTimeoutMillis(conf.getAckTimeoutMillis());
-        }
-
-        return internalConsumerConfig;
-    }
-
-    @Override
-    public void redeliverUnacknowledgedMessages() {
-        synchronized (this) {
-            for (ConsumerImpl<T> c : consumers) {
-                c.redeliverUnacknowledgedMessages();
-            }
-            incomingMessages.clear();
-            unAckedMessageTracker.clear();
-            resumeReceivingFromPausedConsumersIfNeeded();
-        }
-    }
-
-    @Override
-    public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
-        checkArgument(messageIds.stream().findFirst().get() instanceof 
MessageIdImpl);
-        if (conf.getSubscriptionType() != SubscriptionType.Shared) {
-            // We cannot redeliver single messages if subscription type is not 
Shared
-            redeliverUnacknowledgedMessages();
-            return;
-        }
-        removeExpiredMessagesFromQueue(messageIds);
-        messageIds.stream()
-            .map(messageId -> (MessageIdImpl)messageId)
-            .collect(Collectors.groupingBy(MessageIdImpl::getPartitionIndex, 
Collectors.toSet()))
-            .forEach((partitionIndex, messageIds1) ->
-                consumers.get(partitionIndex).redeliverUnacknowledgedMessages(
-                    messageIds1.stream().map(mid -> 
(MessageId)mid).collect(Collectors.toSet())));
-        resumeReceivingFromPausedConsumersIfNeeded();
-    }
-
-    @Override
-    public void seek(MessageId messageId) throws PulsarClientException {
-        try {
-            seekAsync(messageId).get();
-        } catch (ExecutionException e) {
-            throw new PulsarClientException(e.getCause());
-        } catch (InterruptedException e) {
-            throw new PulsarClientException(e);
-        }
-    }
-
-    @Override
-    public CompletableFuture<Void> seekAsync(MessageId messageId) {
-        return FutureUtil.failedFuture(new PulsarClientException("Seek 
operation not supported on partitioned topics"));
-    }
-
-    List<ConsumerImpl<T>> getConsumers() {
-        return consumers;
-    }
-
-    @Override
-    public int getAvailablePermits() {
-        return 
consumers.stream().mapToInt(ConsumerImpl::getAvailablePermits).sum();
-    }
-
-    @Override
-    public boolean hasReachedEndOfTopic() {
-        return consumers.stream().allMatch(Consumer::hasReachedEndOfTopic);
-    }
-
-    @Override
-    public int numMessagesInQueue() {
-        return incomingMessages.size() + 
consumers.stream().mapToInt(ConsumerImpl::numMessagesInQueue).sum();
-    }
-
-    @Override
-    public synchronized ConsumerStatsRecorderImpl getStats() {
-        if (stats == null) {
-            return null;
-        }
-        stats.reset();
-        for (int i = 0; i < numPartitions; i++) {
-            stats.updateCumulativeStats(consumers.get(i).getStats());
-        }
-        return stats;
-    }
-
-    public UnAckedMessageTracker getUnAckedMessageTracker() {
-        return unAckedMessageTracker;
-    }
-
-    private void removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
-        Message<T> peek = incomingMessages.peek();
-        if (peek != null) {
-            if (!messageIds.contains(peek.getMessageId())) {
-                // first message is not expired, then no message is expired in 
queue.
-                return;
-            }
-
-            // try not to remove elements that are added while we remove
-            Message<T> message = incomingMessages.poll();
-            while (message != null) {
-                MessageIdImpl messageId = (MessageIdImpl) 
message.getMessageId();
-                if (!messageIds.contains(messageId)) {
-                    messageIds.add(messageId);
-                    break;
-                }
-                message = incomingMessages.poll();
-            }
-        }
-    }
-
-    private static final Logger log = 
LoggerFactory.getLogger(PartitionedConsumerImpl.class);
-}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
similarity index 89%
rename from 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java
rename to 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index f9cf550e5..d0b0c606c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -41,17 +41,17 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class PatternTopicsConsumerImpl<T> extends TopicsConsumerImpl<T> 
implements TimerTask {
+public class PatternMultiTopicsConsumerImpl<T> extends 
MultiTopicsConsumerImpl<T> implements TimerTask {
     private final Pattern topicsPattern;
     private final TopicsChangedListener topicsChangeListener;
     private volatile Timeout recheckPatternTimeout = null;
 
-    public PatternTopicsConsumerImpl(Pattern topicsPattern,
-                                     PulsarClientImpl client,
-                                     ConsumerConfigurationData<T> conf,
-                                     ExecutorService listenerExecutor,
-                                     CompletableFuture<Consumer<T>> 
subscribeFuture,
-                                     Schema<T> schema) {
+    public PatternMultiTopicsConsumerImpl(Pattern topicsPattern,
+                                          PulsarClientImpl client,
+                                          ConsumerConfigurationData<T> conf,
+                                          ExecutorService listenerExecutor,
+                                          CompletableFuture<Consumer<T>> 
subscribeFuture,
+                                          Schema<T> schema) {
         super(client, conf, listenerExecutor, subscribeFuture, schema);
         this.topicsPattern = topicsPattern;
 
@@ -86,7 +86,7 @@ public void run(Timeout timeout) throws Exception {
             }
 
             List<String> newTopics = 
PulsarClientImpl.topicsPatternFilter(topics, topicsPattern);
-            List<String> oldTopics = 
PatternTopicsConsumerImpl.this.getTopics();
+            List<String> oldTopics = 
PatternMultiTopicsConsumerImpl.this.getTopics();
 
             
futures.add(topicsChangeListener.onTopicsAdded(topicsListsMinus(newTopics, 
oldTopics)));
             
futures.add(topicsChangeListener.onTopicsRemoved(topicsListsMinus(oldTopics, 
newTopics)));
@@ -100,7 +100,7 @@ public void run(Timeout timeout) throws Exception {
         });
 
         // schedule the next re-check task
-        client.timer().newTimeout(PatternTopicsConsumerImpl.this,
+        client.timer().newTimeout(PatternMultiTopicsConsumerImpl.this,
             Math.min(1, conf.getPatternAutoDiscoveryPeriod()), 
TimeUnit.MINUTES);
     }
 
@@ -109,9 +109,9 @@ public Pattern getPattern() {
     }
 
     interface TopicsChangedListener {
-        // unsubscribe and delete ConsumerImpl in the `consumers` map in 
`TopicsConsumerImpl` based on added topics.
+        // unsubscribe and delete ConsumerImpl in the `consumers` map in 
`MultiTopicsConsumerImpl` based on added topics.
         CompletableFuture<Void> onTopicsRemoved(Collection<String> 
removedTopics);
-        // subscribe and create a list of new ConsumerImpl, added them to the 
`consumers` map in `TopicsConsumerImpl`.
+        // subscribe and create a list of new ConsumerImpl, added them to the 
`consumers` map in `MultiTopicsConsumerImpl`.
         CompletableFuture<Void> onTopicsAdded(Collection<String> addedTopics);
     }
 
@@ -181,5 +181,5 @@ Timeout getRecheckPatternTimeout() {
         return recheckPatternTimeout;
     }
 
-    private static final Logger log = 
LoggerFactory.getLogger(PatternTopicsConsumerImpl.class);
+    private static final Logger log = 
LoggerFactory.getLogger(PatternMultiTopicsConsumerImpl.class);
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 47e953b5f..dd07cf9fc 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -384,8 +384,8 @@ public ClientConfigurationData getConfiguration() {
             // gets the next single threaded executor from the list of 
executors
             ExecutorService listenerThread = 
externalExecutorProvider.getExecutor();
             if (metadata.partitions > 1) {
-                consumer = new 
PartitionedConsumerImpl<>(PulsarClientImpl.this, conf, metadata.partitions, 
listenerThread,
-                        consumerSubscribedFuture, schema);
+                consumer = 
MultiTopicsConsumerImpl.createPartitionedConsumer(PulsarClientImpl.this, conf,
+                    listenerThread, consumerSubscribedFuture, 
metadata.partitions, schema);
             } else {
                 consumer = new ConsumerImpl<>(PulsarClientImpl.this, topic, 
conf, listenerThread, -1,
                         consumerSubscribedFuture, schema);
@@ -406,7 +406,7 @@ public ClientConfigurationData getConfiguration() {
     private <T> CompletableFuture<Consumer<T>> 
multiTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema) {
         CompletableFuture<Consumer<T>> consumerSubscribedFuture = new 
CompletableFuture<>();
 
-        ConsumerBase<T> consumer = new 
TopicsConsumerImpl<>(PulsarClientImpl.this, conf,
+        ConsumerBase<T> consumer = new 
MultiTopicsConsumerImpl<>(PulsarClientImpl.this, conf,
                 externalExecutorProvider.getExecutor(), 
consumerSubscribedFuture, schema);
 
         synchronized (consumers) {
@@ -436,7 +436,7 @@ public ClientConfigurationData getConfiguration() {
 
                 List<String> topicsList = topicsPatternFilter(topics, 
conf.getTopicsPattern());
                 conf.getTopicNames().addAll(topicsList);
-                ConsumerBase<T> consumer = new 
PatternTopicsConsumerImpl<>(conf.getTopicsPattern(),
+                ConsumerBase<T> consumer = new 
PatternMultiTopicsConsumerImpl<>(conf.getTopicsPattern(),
                     PulsarClientImpl.this,
                     conf,
                     externalExecutorProvider.getExecutor(),
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index 6f3218821..5ae452ee5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -27,7 +27,7 @@
 
     private final String topicName;
     private final Message<T> msg;
-    private final MessageId msgId;
+    private final TopicMessageIdImpl msgId;
 
     TopicMessageImpl(String topicName,
                      Message<T> msg) {
@@ -49,6 +49,10 @@ public MessageId getMessageId() {
         return msgId;
     }
 
+    public MessageId getInnerMessageId() {
+        return msgId.getInnerMessageId();
+    }
+
     @Override
     public Map<String, String> getProperties() {
         return msg.getProperties();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to