merlimat closed pull request #1365: Delete PartitionedConsumerImpl, use TopicsConsumerImpl instead URL: https://github.com/apache/incubator-pulsar/pull/1365
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java index 617682169..08cd7f654 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java @@ -44,6 +44,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.TopicMessageImpl; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; @@ -323,13 +324,11 @@ public void testSimpleConsumerEventsWithPartition() throws Exception { final String subName = "sub1"; final int numMsgs = 100; Set<String> uniqueMessages = new HashSet<>(); - admin.persistentTopics().createPartitionedTopic(topicName, numPartitions); ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName) .subscriptionType(SubscriptionType.Failover); - // 1. two consumers on the same subscription ActiveInactiveListenerEvent listener1 = new ActiveInactiveListenerEvent(); ActiveInactiveListenerEvent listener2 = new ActiveInactiveListenerEvent(); @@ -374,7 +373,7 @@ public void testSimpleConsumerEventsWithPartition() throws Exception { } totalMessages++; consumer1.acknowledge(msg); - MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId(); + MessageIdImpl msgId = (MessageIdImpl) (((TopicMessageImpl)msg).getInnerMessageId()); receivedPtns.add(msgId.getPartitionIndex()); } @@ -391,7 +390,7 @@ public void testSimpleConsumerEventsWithPartition() throws Exception { } totalMessages++; consumer2.acknowledge(msg); - MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId(); + MessageIdImpl msgId = (MessageIdImpl) (((TopicMessageImpl)msg).getInnerMessageId()); receivedPtns.add(msgId.getPartitionIndex()); } assertTrue(Sets.difference(listener1.inactivePtns, receivedPtns).isEmpty()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java index 4a9912dce..6d6fc924c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageIdTest.java @@ -208,7 +208,8 @@ public void partitionedProducerSendAsync() throws PulsarClientException, PulsarA Assert.assertEquals(messageIds.size(), numberOfMessages, "Not all messages published successfully"); for (int i = 0; i < numberOfMessages; i++) { - MessageId messageId = consumer.receive().getMessageId(); + MessageId topicMessageId = consumer.receive().getMessageId(); + MessageId messageId = ((TopicMessageIdImpl)topicMessageId).getInnerMessageId(); log.info("Message ID Received = " + messageId); Assert.assertTrue(messageIds.remove(messageId), "Failed to receive Message"); } @@ -247,7 +248,9 @@ public void partitionedProducerSend() throws PulsarClientException, PulsarAdminE Assert.assertEquals(messageIds.size(), numberOfMessages, "Not all messages published successfully"); for (int i = 0; i < numberOfMessages; i++) { - Assert.assertTrue(messageIds.remove(consumer.receive().getMessageId()), "Failed to receive Message"); + MessageId topicMessageId = consumer.receive().getMessageId(); + MessageId messageId = ((TopicMessageIdImpl)topicMessageId).getInnerMessageId(); + Assert.assertTrue(messageIds.remove(messageId), "Failed to receive Message"); } log.info("Message IDs = " + messageIds); Assert.assertEquals(messageIds.size(), 0, "Not all messages received successfully"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java index 4f9cc7002..b3e74f7b9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java @@ -161,13 +161,13 @@ public void testBinaryProtoToGetTopicsOfNamespace() throws Exception { .subscribe(); // 4. verify consumer get methods, to get right number of partitions and topics. - assertSame(pattern, ((PatternTopicsConsumerImpl<?>) consumer).getPattern()); - List<String> topics = ((PatternTopicsConsumerImpl<?>) consumer).getPartitionedTopics(); - List<ConsumerImpl<byte[]>> consumers = ((PatternTopicsConsumerImpl<byte[]>) consumer).getConsumers(); + assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern()); + List<String> topics = ((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics(); + List<ConsumerImpl<byte[]>> consumers = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers(); assertEquals(topics.size(), 6); assertEquals(consumers.size(), 6); - assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getTopics().size(), 3); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().size(), 3); topics.forEach(topic -> log.debug("topic: {}", topic)); consumers.forEach(c -> log.debug("consumer: {}", c.getTopic())); @@ -175,7 +175,7 @@ public void testBinaryProtoToGetTopicsOfNamespace() throws Exception { IntStream.range(0, topics.size()).forEach(index -> assertTrue(topics.get(index).equals(consumers.get(index).getTopic()))); - ((PatternTopicsConsumerImpl<?>) consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic)); + ((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic)); // 5. produce data for (int i = 0; i < totalMessages / 3; i++) { @@ -235,8 +235,8 @@ public void testTopicsListMinus() throws Exception { List<String> oldNames = Lists.newArrayList(topicName1, topicName2, topicName3, topicName4); List<String> newNames = Lists.newArrayList(topicName3, topicName4, topicName5, topicName6); - List<String> addedNames = PatternTopicsConsumerImpl.topicsListsMinus(newNames, oldNames); - List<String> removedNames = PatternTopicsConsumerImpl.topicsListsMinus(oldNames, newNames); + List<String> addedNames = PatternMultiTopicsConsumerImpl.topicsListsMinus(newNames, oldNames); + List<String> removedNames = PatternMultiTopicsConsumerImpl.topicsListsMinus(oldNames, newNames); assertTrue(addedNames.size() == 2 && addedNames.contains(topicName5) && @@ -246,21 +246,21 @@ public void testTopicsListMinus() throws Exception { removedNames.contains(topicName2)); // totally 2 different list, should return content of first lists. - List<String> addedNames2 = PatternTopicsConsumerImpl.topicsListsMinus(addedNames, removedNames); + List<String> addedNames2 = PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames, removedNames); assertTrue(addedNames2.size() == 2 && addedNames2.contains(topicName5) && addedNames2.contains(topicName6)); // 2 same list, should return empty list. - List<String> addedNames3 = PatternTopicsConsumerImpl.topicsListsMinus(addedNames, addedNames); + List<String> addedNames3 = PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames, addedNames); assertEquals(addedNames3.size(), 0); // empty list minus: addedNames2.size = 2, addedNames3.size = 0 - List<String> addedNames4 = PatternTopicsConsumerImpl.topicsListsMinus(addedNames2, addedNames3); + List<String> addedNames4 = PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames2, addedNames3); assertTrue(addedNames4.size() == addedNames2.size()); addedNames4.forEach(name -> assertTrue(addedNames2.contains(name))); - List<String> addedNames5 = PatternTopicsConsumerImpl.topicsListsMinus(addedNames3, addedNames2); + List<String> addedNames5 = PatternMultiTopicsConsumerImpl.topicsListsMinus(addedNames3, addedNames2); assertEquals(addedNames5.size(), 0); } @@ -290,10 +290,10 @@ public void testStartEmptyPatternConsumer() throws Exception { .subscribe(); // 3. verify consumer get methods, to get 0 number of partitions and topics. - assertSame(pattern, ((PatternTopicsConsumerImpl<?>) consumer).getPattern()); - assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 0); - assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getConsumers().size(), 0); - assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getTopics().size(), 0); + assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern()); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 0); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 0); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().size(), 0); // 4. create producer String messagePredicate = "my-message-" + key + "-"; @@ -310,15 +310,15 @@ public void testStartEmptyPatternConsumer() throws Exception { // 5. call recheckTopics to subscribe each added topics above log.debug("recheck topics change"); - PatternTopicsConsumerImpl<byte[]> consumer1 = ((PatternTopicsConsumerImpl<byte[]>) consumer); + PatternMultiTopicsConsumerImpl<byte[]> consumer1 = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer); consumer1.run(consumer1.getRecheckPatternTimeout()); Thread.sleep(100); // 6. verify consumer get methods, to get number of partitions and topics, value 6=1+2+3. - assertSame(pattern, ((PatternTopicsConsumerImpl<?>) consumer).getPattern()); - assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 6); - assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getConsumers().size(), 6); - assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getTopics().size(), 3); + assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern()); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 6); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 6); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().size(), 3); // 7. produce data @@ -384,13 +384,13 @@ public void testAutoSubscribePatternConsumer() throws Exception { .receiverQueueSize(4) .subscribe(); - assertTrue(consumer instanceof PatternTopicsConsumerImpl); + assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl); // 4. verify consumer get methods, to get 6 number of partitions and topics: 6=1+2+3 - assertSame(pattern, ((PatternTopicsConsumerImpl<?>) consumer).getPattern()); - assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 6); - assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getConsumers().size(), 6); - assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getTopics().size(), 3); + assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern()); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 6); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 6); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().size(), 3); // 5. produce data to topic 1,2,3; verify should receive all the message for (int i = 0; i < totalMessages / 3; i++) { @@ -419,12 +419,12 @@ public void testAutoSubscribePatternConsumer() throws Exception { // 7. call recheckTopics to subscribe each added topics above, verify topics number: 10=1+2+3+4 log.debug("recheck topics change"); - PatternTopicsConsumerImpl<byte[]> consumer1 = ((PatternTopicsConsumerImpl<byte[]>) consumer); + PatternMultiTopicsConsumerImpl<byte[]> consumer1 = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer); consumer1.run(consumer1.getRecheckPatternTimeout()); Thread.sleep(100); - assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 10); - assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getConsumers().size(), 10); - assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getTopics().size(), 4); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 10); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 10); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().size(), 4); // 8. produce data to topic3 and topic4, verify should receive all the message for (int i = 0; i < totalMessages / 2; i++) { @@ -487,13 +487,13 @@ public void testAutoUnbubscribePatternConsumer() throws Exception { .receiverQueueSize(4) .subscribe(); - assertTrue(consumer instanceof PatternTopicsConsumerImpl); + assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl); // 4. verify consumer get methods, to get 0 number of partitions and topics: 6=1+2+3 - assertSame(pattern, ((PatternTopicsConsumerImpl<?>) consumer).getPattern()); - assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 6); - assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getConsumers().size(), 6); - assertEquals(((PatternTopicsConsumerImpl<?>) consumer).getTopics().size(), 3); + assertSame(pattern, ((PatternMultiTopicsConsumerImpl<?>) consumer).getPattern()); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getPartitionedTopics().size(), 6); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getConsumers().size(), 6); + assertEquals(((PatternMultiTopicsConsumerImpl<?>) consumer).getTopics().size(), 3); // 5. produce data to topic 1,2,3; verify should receive all the message for (int i = 0; i < totalMessages / 3; i++) { @@ -521,12 +521,12 @@ public void testAutoUnbubscribePatternConsumer() throws Exception { // 7. call recheckTopics to unsubscribe topic 1,3 , verify topics number: 2=6-1-3 log.debug("recheck topics change"); - PatternTopicsConsumerImpl<byte[]> consumer1 = ((PatternTopicsConsumerImpl<byte[]>) consumer); + PatternMultiTopicsConsumerImpl<byte[]> consumer1 = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer); consumer1.run(consumer1.getRecheckPatternTimeout()); Thread.sleep(100); - assertEquals(((PatternTopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics().size(), 2); - assertEquals(((PatternTopicsConsumerImpl<byte[]>) consumer).getConsumers().size(), 2); - assertEquals(((PatternTopicsConsumerImpl<byte[]>) consumer).getTopics().size(), 1); + assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics().size(), 2); + assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers().size(), 2); + assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getTopics().size(), 1); // 8. produce data to topic2, verify should receive all the message for (int i = 0; i < totalMessages; i++) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java index 7a94cc26e..053cb5ea1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java @@ -338,7 +338,7 @@ public void testFailoverAckedNormalTopic() throws Exception { } private static long getUnackedMessagesCountInPartitionedConsumer(Consumer<byte[]> c) { - PartitionedConsumerImpl<byte[]> pc = (PartitionedConsumerImpl<byte[]>) c; + MultiTopicsConsumerImpl<byte[]> pc = (MultiTopicsConsumerImpl<byte[]>) c; return pc.getUnAckedMessageTracker().size() + pc.getConsumers().stream().mapToLong(consumer -> consumer.getUnAckedMessageTracker().size()).sum(); } @@ -405,8 +405,8 @@ public void testSharedAckedPartitionedTopic() throws Exception { assertEquals(received, 5); // 7. Simulate ackTimeout - ((PartitionedConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().toggle(); - ((PartitionedConsumerImpl<byte[]>) consumer).getConsumers().forEach(c -> c.getUnAckedMessageTracker().toggle()); + ((MultiTopicsConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().toggle(); + ((MultiTopicsConsumerImpl<byte[]>) consumer).getConsumers().forEach(c -> c.getUnAckedMessageTracker().toggle()); // 8. producer publish more messages for (int i = 0; i < totalMessages / 3; i++) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index 952dfac22..6aa574ddf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -116,10 +116,10 @@ public void testGetConsumersAndGetTopics() throws Exception { .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) .receiverQueueSize(4) .subscribe(); - assertTrue(consumer instanceof TopicsConsumerImpl); + assertTrue(consumer instanceof MultiTopicsConsumerImpl); - List<String> topics = ((TopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics(); - List<ConsumerImpl<byte[]>> consumers = ((TopicsConsumerImpl) consumer).getConsumers(); + List<String> topics = ((MultiTopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics(); + List<ConsumerImpl<byte[]>> consumers = ((MultiTopicsConsumerImpl) consumer).getConsumers(); topics.forEach(topic -> log.info("topic: {}", topic)); consumers.forEach(c -> log.info("consumer: {}", c.getTopic())); @@ -127,7 +127,7 @@ public void testGetConsumersAndGetTopics() throws Exception { IntStream.range(0, 6).forEach(index -> assertTrue(topics.get(index).equals(consumers.get(index).getTopic()))); - assertTrue(((TopicsConsumerImpl<byte[]>) consumer).getTopics().size() == 3); + assertTrue(((MultiTopicsConsumerImpl<byte[]>) consumer).getTopics().size() == 3); consumer.unsubscribe(); consumer.close(); @@ -167,7 +167,7 @@ public void testSyncProducerAndConsumer() throws Exception { .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) .receiverQueueSize(4) .subscribe(); - assertTrue(consumer instanceof TopicsConsumerImpl); + assertTrue(consumer instanceof MultiTopicsConsumerImpl); // 3. producer publish messages for (int i = 0; i < totalMessages / 3; i++) { @@ -228,7 +228,7 @@ public void testAsyncConsumer() throws Exception { .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) .receiverQueueSize(4) .subscribe(); - assertTrue(consumer instanceof TopicsConsumerImpl); + assertTrue(consumer instanceof MultiTopicsConsumerImpl); // Asynchronously produce messages List<Future<MessageId>> futures = Lists.newArrayList(); @@ -307,7 +307,7 @@ public void testConsumerUnackedRedelivery() throws Exception { .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) .receiverQueueSize(4) .subscribe(); - assertTrue(consumer instanceof TopicsConsumerImpl); + assertTrue(consumer instanceof MultiTopicsConsumerImpl); // 3. producer publish messages for (int i = 0; i < totalMessages / 3; i++) { @@ -323,7 +323,7 @@ public void testConsumerUnackedRedelivery() throws Exception { log.debug("Consumer received : " + new String(message.getData())); message = consumer.receive(500, TimeUnit.MILLISECONDS); } - long size = ((TopicsConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().size(); + long size = ((MultiTopicsConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().size(); log.debug(key + " Unacked Message Tracker size is " + size); assertEquals(size, totalMessages); @@ -338,7 +338,7 @@ public void testConsumerUnackedRedelivery() throws Exception { message = consumer.receive(500, TimeUnit.MILLISECONDS); } while (message != null); - size = ((TopicsConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().size(); + size = ((MultiTopicsConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().size(); log.debug(key + " Unacked Message Tracker size is " + size); assertEquals(size, 0); assertEquals(hSet.size(), totalMessages); @@ -361,14 +361,14 @@ public void testConsumerUnackedRedelivery() throws Exception { consumer.acknowledge(message); message = consumer.receive(100, TimeUnit.MILLISECONDS); } - size = ((TopicsConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().size(); + size = ((MultiTopicsConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().size(); log.debug(key + " Unacked Message Tracker size is " + size); assertEquals(size, 0); assertEquals(received, totalMessages); // 8. Simulate ackTimeout - ((TopicsConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().toggle(); - ((TopicsConsumerImpl<byte[]>) consumer).getConsumers().forEach(c -> c.getUnAckedMessageTracker().toggle()); + ((MultiTopicsConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().toggle(); + ((MultiTopicsConsumerImpl<byte[]>) consumer).getConsumers().forEach(c -> c.getUnAckedMessageTracker().toggle()); // 9. producer publish more messages for (int i = 0; i < totalMessages / 3; i++) { @@ -384,7 +384,7 @@ public void testConsumerUnackedRedelivery() throws Exception { log.debug("Consumer received : " + data); message = consumer.receive(100, TimeUnit.MILLISECONDS); } - size = ((TopicsConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().size(); + size = ((MultiTopicsConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().size(); log.debug(key + " Unacked Message Tracker size is " + size); assertEquals(size, 30); @@ -402,7 +402,7 @@ public void testConsumerUnackedRedelivery() throws Exception { message = consumer.receive(100, TimeUnit.MILLISECONDS); } assertEquals(redelivered, 30); - size = ((TopicsConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().size(); + size = ((MultiTopicsConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().size(); log.info(key + " Unacked Message Tracker size is " + size); assertEquals(size, 0); @@ -447,7 +447,7 @@ public void testSubscribeUnsubscribeSingleTopic() throws Exception { .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) .receiverQueueSize(4) .subscribe(); - assertTrue(consumer instanceof TopicsConsumerImpl); + assertTrue(consumer instanceof MultiTopicsConsumerImpl); // 3. producer publish messages for (int i = 0; i < totalMessages / 3; i++) { @@ -468,7 +468,7 @@ public void testSubscribeUnsubscribeSingleTopic() throws Exception { assertEquals(messageSet, totalMessages); // 4, unsubscribe topic3 - CompletableFuture<Void> unsubFuture = ((TopicsConsumerImpl<byte[]>) consumer).unsubscribeAsync(topicName3); + CompletableFuture<Void> unsubFuture = ((MultiTopicsConsumerImpl<byte[]>) consumer).unsubscribeAsync(topicName3); unsubFuture.get(); // 5. producer publish messages @@ -491,15 +491,15 @@ public void testSubscribeUnsubscribeSingleTopic() throws Exception { assertEquals(messageSet, totalMessages * 2 / 3); // 7. use getter to verify internal topics number after un-subscribe topic3 - List<String> topics = ((TopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics(); - List<ConsumerImpl<byte[]>> consumers = ((TopicsConsumerImpl) consumer).getConsumers(); + List<String> topics = ((MultiTopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics(); + List<ConsumerImpl<byte[]>> consumers = ((MultiTopicsConsumerImpl) consumer).getConsumers(); assertEquals(topics.size(), 3); assertEquals(consumers.size(), 3); - assertTrue(((TopicsConsumerImpl<byte[]>) consumer).getTopics().size() == 2); + assertTrue(((MultiTopicsConsumerImpl<byte[]>) consumer).getTopics().size() == 2); // 8. re-subscribe topic3 - CompletableFuture<Void> subFuture = ((TopicsConsumerImpl<byte[]>)consumer).subscribeAsync(topicName3); + CompletableFuture<Void> subFuture = ((MultiTopicsConsumerImpl<byte[]>)consumer).subscribeAsync(topicName3); subFuture.get(); // 9. producer publish messages @@ -522,12 +522,12 @@ public void testSubscribeUnsubscribeSingleTopic() throws Exception { assertEquals(messageSet, totalMessages); // 11. use getter to verify internal topics number after subscribe topic3 - topics = ((TopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics(); - consumers = ((TopicsConsumerImpl) consumer).getConsumers(); + topics = ((MultiTopicsConsumerImpl<byte[]>) consumer).getPartitionedTopics(); + consumers = ((MultiTopicsConsumerImpl) consumer).getConsumers(); assertEquals(topics.size(), 6); assertEquals(consumers.size(), 6); - assertTrue(((TopicsConsumerImpl<byte[]>) consumer).getTopics().size() == 3); + assertTrue(((MultiTopicsConsumerImpl<byte[]>) consumer).getTopics().size() == 3); consumer.unsubscribe(); consumer.close(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 2cbd64224..2a88bf051 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -914,7 +914,7 @@ protected synchronized void messageProcessed(Message<?> msg) { id = new MessageIdImpl(id.getLedgerId(), id.getEntryId(), getPartitionIndex()); } if (partitionIndex != -1) { - // we should no longer track this message, PartitionedConsumerImpl will take care from now onwards + // we should no longer track this message, TopicsConsumer will take care from now onwards unAckedMessageTracker.remove(id); } else { unAckedMessageTracker.add(id); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java similarity index 74% rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java rename to pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 1089cef53..6da72ec08 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -56,7 +56,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class TopicsConsumerImpl<T> extends ConsumerBase<T> { +public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { // All topics should be in same namespace protected NamespaceName namespaceName; @@ -76,15 +76,15 @@ private final int sharedQueueResumeThreshold; // sum of topicPartitions, simple topic has 1, partitioned topic equals to partition number. - AtomicInteger numberTopicPartitions; + AtomicInteger allTopicPartitionsNumber; private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final ConsumerStatsRecorder stats; private final UnAckedMessageTracker unAckedMessageTracker; private final ConsumerConfigurationData<T> internalConfig; - TopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf, ExecutorService listenerExecutor, - CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema) { + MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf, ExecutorService listenerExecutor, + CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema) { super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, subscribeFuture, schema); @@ -95,7 +95,7 @@ this.consumers = new ConcurrentHashMap<>(); this.pausedConsumers = new ConcurrentLinkedQueue<>(); this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2; - this.numberTopicPartitions = new AtomicInteger(0); + this.allTopicPartitionsNumber = new AtomicInteger(0); if (conf.getAckTimeoutMillis() != 0) { this.unAckedMessageTracker = new UnAckedTopicMessageTracker(client, this, conf.getAckTimeoutMillis()); @@ -109,7 +109,7 @@ if (conf.getTopicNames().isEmpty()) { this.namespaceName = null; setState(State.Ready); - subscribeFuture().complete(TopicsConsumerImpl.this); + subscribeFuture().complete(MultiTopicsConsumerImpl.this); return; } @@ -122,15 +122,15 @@ FutureUtil.waitForAll(futures) .thenAccept(finalFuture -> { try { - if (numberTopicPartitions.get() > maxReceiverQueueSize) { - setMaxReceiverQueueSize(numberTopicPartitions.get()); + if (allTopicPartitionsNumber.get() > maxReceiverQueueSize) { + setMaxReceiverQueueSize(allTopicPartitionsNumber.get()); } setState(State.Ready); // We have successfully created N consumers, so we can start receiving messages now startReceivingMessages(consumers.values().stream().collect(Collectors.toList())); - subscribeFuture().complete(TopicsConsumerImpl.this); + subscribeFuture().complete(MultiTopicsConsumerImpl.this); log.info("[{}] [{}] Created topics consumer with {} sub-consumers", - topic, subscription, numberTopicPartitions.get()); + topic, subscription, allTopicPartitionsNumber.get()); } catch (PulsarClientException e) { log.warn("[{}] Failed startReceivingMessages while subscribe topics: {}", topic, e.getMessage()); subscribeFuture.completeExceptionally(e); @@ -245,7 +245,7 @@ private void messageReceived(ConsumerImpl<T> consumer, Message<T> message) { } else { // Enqueue the message so that it can be retrieved when application calls receive() // Waits for the queue to have space for the message - // This should never block cause TopicsConsumerImpl should always use GrowableArrayBlockingQueue + // This should never block cause MultiTopicsConsumerImpl should always use GrowableArrayBlockingQueue incomingMessages.put(topicMessage); } } catch (InterruptedException e) { @@ -271,7 +271,7 @@ private void messageReceived(ConsumerImpl<T> consumer, Message<T> message) { log.debug("[{}][{}] Calling message listener for message {}", topic, subscription, message.getMessageId()); } - listener.received(TopicsConsumerImpl.this, msg); + listener.received(MultiTopicsConsumerImpl.this, msg); } catch (Throwable t) { log.error("[{}][{}] Message listener error in processing message: {}", topic, subscription, message, t); @@ -613,117 +613,172 @@ private boolean topicNameValid(String topicName) { } CompletableFuture<Void> subscribeResult = new CompletableFuture<>(); - final AtomicInteger partitionNumber = new AtomicInteger(0); - client.getPartitionedTopicMetadata(topicName).thenAccept(metadata -> { - if (log.isDebugEnabled()) { - log.debug("Received topic {} metadata.partitions: {}", topicName, metadata.partitions); - } + client.getPartitionedTopicMetadata(topicName) + .thenAccept(metadata -> subscribeTopicPartitions(subscribeResult, topicName, metadata.partitions)) + .exceptionally(ex1 -> { + log.warn("[{}] Failed to get partitioned topic metadata: {}", topicName, ex1.getMessage()); + subscribeResult.completeExceptionally(ex1); + return null; + }); - List<CompletableFuture<Consumer<T>>> futureList; - - if (metadata.partitions > 1) { - this.topics.putIfAbsent(topicName, metadata.partitions); - numberTopicPartitions.addAndGet(metadata.partitions); - partitionNumber.addAndGet(metadata.partitions); - - futureList = IntStream - .range(0, partitionNumber.get()) - .mapToObj( - partitionIndex -> { - String partitionName = TopicName.get(topicName).getPartition(partitionIndex).toString(); - CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>(); - ConsumerImpl<T> newConsumer = new ConsumerImpl<>(client, partitionName, internalConfig, - client.externalExecutorProvider().getExecutor(), partitionIndex, subFuture, schema); - consumers.putIfAbsent(newConsumer.getTopic(), newConsumer); - return subFuture; - }) - .collect(Collectors.toList()); - } else { - this.topics.putIfAbsent(topicName, 1); - numberTopicPartitions.incrementAndGet(); - partitionNumber.incrementAndGet(); + return subscribeResult; + } - CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>(); - ConsumerImpl<T> newConsumer = new ConsumerImpl<>(client, topicName, internalConfig, - client.externalExecutorProvider().getExecutor(), 0, subFuture, schema); - consumers.putIfAbsent(newConsumer.getTopic(), newConsumer); + // create consumer for a single topic with already known partitions. + // first create a consumer with no topic, then do subscription for already know partitionedTopic. + public static <T> MultiTopicsConsumerImpl<T> createPartitionedConsumer(PulsarClientImpl client, + ConsumerConfigurationData<T> conf, + ExecutorService listenerExecutor, + CompletableFuture<Consumer<T>> subscribeFuture, + int numPartitions, + Schema<T> schema) { + checkArgument(conf.getTopicNames().size() == 1, "Should have only 1 topic for partitioned consumer"); + + // get topic name, then remove it from conf, so constructor will create a consumer with no topic. + ConsumerConfigurationData cloneConf = conf.clone(); + String topicName = cloneConf.getSingleTopic(); + cloneConf.getTopicNames().remove(topicName); + + CompletableFuture<Consumer> future = new CompletableFuture<>(); + MultiTopicsConsumerImpl consumer = new MultiTopicsConsumerImpl(client, cloneConf, listenerExecutor, future, schema); + + future.thenCompose(c -> ((MultiTopicsConsumerImpl)c).subscribeAsync(topicName, numPartitions)) + .thenRun(()-> subscribeFuture.complete(consumer)) + .exceptionally(e -> { + log.warn("Failed subscription for createPartitionedConsumer: {} {}, e:{}", + topicName, numPartitions, e); + subscribeFuture.completeExceptionally(((Throwable)e).getCause()); + return null; + });; + return consumer; + } - futureList = Collections.singletonList(subFuture); - } + // subscribe one more given topic, but already know the numberPartitions + private CompletableFuture<Void> subscribeAsync(String topicName, int numberPartitions) { + if (!topicNameValid(topicName)) { + return FutureUtil.failedFuture( + new PulsarClientException.AlreadyClosedException("Topic name not valid")); + } - FutureUtil.waitForAll(futureList) - .thenAccept(finalFuture -> { - try { - if (numberTopicPartitions.get() > maxReceiverQueueSize) { - setMaxReceiverQueueSize(numberTopicPartitions.get()); - } - int numTopics = this.topics.values().stream().mapToInt(Integer::intValue).sum(); - checkState(numberTopicPartitions.get() == numTopics, - "numberTopicPartitions " + numberTopicPartitions.get() - + " not equals expected: " + numTopics); - - // We have successfully created new consumers, so we can start receiving messages for them - startReceivingMessages( - consumers.values().stream() - .filter(consumer1 -> { - String consumerTopicName = consumer1.getTopic(); - if (TopicName.get(consumerTopicName).getPartitionedTopicName().equals( - TopicName.get(topicName).getPartitionedTopicName().toString())) { - return true; - } else { - return false; - } - }) - .collect(Collectors.toList())); - - subscribeResult.complete(null); - log.info("[{}] [{}] Success subscribe new topic {} in topics consumer, numberTopicPartitions {}", - topic, subscription, topicName, numberTopicPartitions.get()); - if (this.namespaceName == null) { - this.namespaceName = TopicName.get(topicName).getNamespaceObject(); - } - return; - } catch (PulsarClientException e) { - handleSubscribeOneTopicError(topicName, e); - subscribeResult.completeExceptionally(e); - } - }) - .exceptionally(ex -> { - handleSubscribeOneTopicError(topicName, ex); - subscribeResult.completeExceptionally(ex); - return null; - }); - }).exceptionally(ex1 -> { - log.warn("[{}] Failed to get partitioned topic metadata: {}", topicName, ex1.getMessage()); - subscribeResult.completeExceptionally(ex1); - return null; - }); + if (getState() == State.Closing || getState() == State.Closed) { + return FutureUtil.failedFuture( + new PulsarClientException.AlreadyClosedException("Topics Consumer was already closed")); + } + + CompletableFuture<Void> subscribeResult = new CompletableFuture<>(); + subscribeTopicPartitions(subscribeResult, topicName, numberPartitions); return subscribeResult; } - // handling failure during subscribe new topic, unsubscribe success created partitions - private void handleSubscribeOneTopicError(String topicName, Throwable error) { - log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer ", topic, topicName, error.getMessage()); + private void subscribeTopicPartitions(CompletableFuture<Void> subscribeResult, String topicName, int partitionNumber) { + if (log.isDebugEnabled()) { + log.debug("Subscribe to topic {} metadata.partitions: {}", topicName, partitionNumber); + } - consumers.values().stream().filter(consumer1 -> { - String consumerTopicName = consumer1.getTopic(); - if (TopicName.get(consumerTopicName).getPartitionedTopicName().equals(topicName)) { - return true; - } else { - return false; - } - }).forEach(consumer2 -> { - consumer2.closeAsync().handle((ok, closeException) -> { - consumer2.subscribeFuture().completeExceptionally(error); + List<CompletableFuture<Consumer<T>>> futureList; + + if (partitionNumber > 1) { + this.topics.putIfAbsent(topicName, partitionNumber); + allTopicPartitionsNumber.addAndGet(partitionNumber); + + futureList = IntStream + .range(0, partitionNumber) + .mapToObj( + partitionIndex -> { + String partitionName = TopicName.get(topicName).getPartition(partitionIndex).toString(); + CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>(); + ConsumerImpl<T> newConsumer = new ConsumerImpl<>(client, partitionName, internalConfig, + client.externalExecutorProvider().getExecutor(), partitionIndex, subFuture, schema); + consumers.putIfAbsent(newConsumer.getTopic(), newConsumer); + return subFuture; + }) + .collect(Collectors.toList()); + } else { + this.topics.putIfAbsent(topicName, 1); + allTopicPartitionsNumber.incrementAndGet(); + + CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>(); + ConsumerImpl<T> newConsumer = new ConsumerImpl<>(client, topicName, internalConfig, + client.externalExecutorProvider().getExecutor(), 0, subFuture, schema); + consumers.putIfAbsent(newConsumer.getTopic(), newConsumer); + + futureList = Collections.singletonList(subFuture); + } + + FutureUtil.waitForAll(futureList) + .thenAccept(finalFuture -> { + try { + if (allTopicPartitionsNumber.get() > maxReceiverQueueSize) { + setMaxReceiverQueueSize(allTopicPartitionsNumber.get()); + } + int numTopics = this.topics.values().stream().mapToInt(Integer::intValue).sum(); + checkState(allTopicPartitionsNumber.get() == numTopics, + "allTopicPartitionsNumber " + allTopicPartitionsNumber.get() + + " not equals expected: " + numTopics); + + // We have successfully created new consumers, so we can start receiving messages for them + startReceivingMessages( + consumers.values().stream() + .filter(consumer1 -> { + String consumerTopicName = consumer1.getTopic(); + if (TopicName.get(consumerTopicName).getPartitionedTopicName().equals( + TopicName.get(topicName).getPartitionedTopicName().toString())) { + return true; + } else { + return false; + } + }) + .collect(Collectors.toList())); + + subscribeResult.complete(null); + log.info("[{}] [{}] Success subscribe new topic {} in topics consumer, partitions: {}, allTopicPartitionsNumber: {}", + topic, subscription, topicName, partitionNumber, allTopicPartitionsNumber.get()); + if (this.namespaceName == null) { + this.namespaceName = TopicName.get(topicName).getNamespaceObject(); + } + return; + } catch (PulsarClientException e) { + handleSubscribeOneTopicError(topicName, e, subscribeResult); + } + }) + .exceptionally(ex -> { + handleSubscribeOneTopicError(topicName, ex, subscribeResult); return null; }); - consumers.remove(consumer2.getTopic()); - }); + } - topics.remove(topicName); - checkState(numberTopicPartitions.get() == consumers.values().size()); + // handling failure during subscribe new topic, unsubscribe success created partitions + private void handleSubscribeOneTopicError(String topicName, Throwable error, CompletableFuture<Void> subscribeFuture) { + log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer {}", topic, topicName, error.getMessage()); + + client.externalExecutorProvider().getExecutor().submit(() -> { + AtomicInteger toCloseNum = new AtomicInteger(0); + consumers.values().stream().filter(consumer1 -> { + String consumerTopicName = consumer1.getTopic(); + if (TopicName.get(consumerTopicName).getPartitionedTopicName().equals(topicName)) { + return true; + } else { + return false; + } + }).forEach(consumer2 -> { + toCloseNum.incrementAndGet(); + consumer2.closeAsync().whenComplete((r, ex) -> { + consumer2.subscribeFuture().completeExceptionally(error); + allTopicPartitionsNumber.decrementAndGet(); + consumers.remove(consumer2.getTopic()); + if (toCloseNum.decrementAndGet() == 0) { + log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer, subscribe error: {}", + topic, topicName, error.getMessage()); + topics.remove(topicName); + checkState(allTopicPartitionsNumber.get() == consumers.values().size()); + subscribeFuture.completeExceptionally(error); + } + return; + }); + }); + }); } // un-subscribe a given topic @@ -757,15 +812,15 @@ private void handleSubscribeOneTopicError(String topicName, Throwable error) { consumersToUnsub.forEach(consumer1 -> { consumers.remove(consumer1.getTopic()); pausedConsumers.remove(consumer1); - numberTopicPartitions.decrementAndGet(); + allTopicPartitionsNumber.decrementAndGet(); }); topics.remove(topicName); ((UnAckedTopicMessageTracker) unAckedMessageTracker).removeTopicMessages(topicName); unsubscribeFuture.complete(null); - log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer, numberTopicPartitions: {}", - topicName, subscription, consumerName, numberTopicPartitions); + log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer, allTopicPartitionsNumber: {}", + topicName, subscription, consumerName, allTopicPartitionsNumber); } else { unsubscribeFuture.completeExceptionally(ex); setState(State.Failed); @@ -792,5 +847,5 @@ private void handleSubscribeOneTopicError(String topicName, Throwable error) { return consumers.values().stream().collect(Collectors.toList()); } - private static final Logger log = LoggerFactory.getLogger(TopicsConsumerImpl.class); + private static final Logger log = LoggerFactory.getLogger(MultiTopicsConsumerImpl.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java deleted file mode 100644 index 9c952dd22..000000000 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java +++ /dev/null @@ -1,553 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.client.impl; - -import static com.google.common.base.Preconditions.checkArgument; - -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; - -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; -import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; -import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.util.FutureUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; - -public class PartitionedConsumerImpl<T> extends ConsumerBase<T> { - - private final List<ConsumerImpl<T>> consumers; - - // Queue of partition consumers on which we have stopped calling receiveAsync() because the - // shared incoming queue was full - private final ConcurrentLinkedQueue<ConsumerImpl<T>> pausedConsumers; - - // Threshold for the shared queue. When the size of the shared queue goes below the threshold, we are going to - // resume receiving from the paused consumer partitions - private final int sharedQueueResumeThreshold; - - private final int numPartitions; - private final ReadWriteLock lock = new ReentrantReadWriteLock(); - private final ConsumerStatsRecorderImpl stats; - private final UnAckedMessageTracker unAckedMessageTracker; - - PartitionedConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf, int numPartitions, - ExecutorService listenerExecutor, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema) { - super(client, conf.getSingleTopic(), conf, Math.max(Math.max(2, numPartitions), conf.getReceiverQueueSize()), - listenerExecutor, subscribeFuture, schema); - this.consumers = Lists.newArrayListWithCapacity(numPartitions); - this.pausedConsumers = new ConcurrentLinkedQueue<>(); - this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2; - this.numPartitions = numPartitions; - - if (conf.getAckTimeoutMillis() != 0) { - this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis()); - } else { - this.unAckedMessageTracker = UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED; - } - - stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ConsumerStatsRecorderImpl() : null; - checkArgument(conf.getReceiverQueueSize() > 0, - "Receiver queue size needs to be greater than 0 for Partitioned Topics"); - start(); - } - - private void start() { - AtomicReference<Throwable> subscribeFail = new AtomicReference<Throwable>(); - AtomicInteger completed = new AtomicInteger(); - ConsumerConfigurationData<T> internalConfig = getInternalConsumerConfig(); - for (int partitionIndex = 0; partitionIndex < numPartitions; partitionIndex++) { - String partitionName = TopicName.get(topic).getPartition(partitionIndex).toString(); - ConsumerImpl<T> consumer = new ConsumerImpl<>(client, partitionName, internalConfig, - client.externalExecutorProvider().getExecutor(), partitionIndex, new CompletableFuture<>(), schema); - consumers.add(consumer); - consumer.subscribeFuture().handle((cons, subscribeException) -> { - if (subscribeException != null) { - setState(State.Failed); - subscribeFail.compareAndSet(null, subscribeException); - client.cleanupConsumer(this); - } - if (completed.incrementAndGet() == numPartitions) { - if (subscribeFail.get() == null) { - try { - // We have successfully created N consumers, so we can start receiving messages now - starReceivingMessages(); - setState(State.Ready); - subscribeFuture().complete(PartitionedConsumerImpl.this); - log.info("[{}] [{}] Created partitioned consumer", topic, subscription); - return null; - } catch (PulsarClientException e) { - subscribeFail.set(e); - } - } - closeAsync().handle((ok, closeException) -> { - subscribeFuture().completeExceptionally(subscribeFail.get()); - client.cleanupConsumer(this); - return null; - }); - log.error("[{}] [{}] Could not create partitioned consumer.", topic, subscription, - subscribeFail.get().getCause()); - } - return null; - }); - } - } - - private void starReceivingMessages() throws PulsarClientException { - for (ConsumerImpl<T> consumer : consumers) { - consumer.sendFlowPermitsToBroker(consumer.getConnectionHandler().cnx(), conf.getReceiverQueueSize()); - receiveMessageFromConsumer(consumer); - } - } - - private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) { - consumer.receiveAsync().thenAccept(message -> { - // Process the message, add to the queue and trigger listener or async callback - messageReceived(message); - - // we're modifying pausedConsumers - lock.writeLock().lock(); - try { - int size = incomingMessages.size(); - if (size >= maxReceiverQueueSize - || (size > sharedQueueResumeThreshold && !pausedConsumers.isEmpty())) { - // mark this consumer to be resumed later: if No more space left in shared queue, - // or if any consumer is already paused (to create fair chance for already paused consumers) - pausedConsumers.add(consumer); - } else { - // Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid - // recursion and stack overflow - client.eventLoopGroup().execute(() -> { - receiveMessageFromConsumer(consumer); - }); - } - } finally { - lock.writeLock().unlock(); - } - }); - } - - private void resumeReceivingFromPausedConsumersIfNeeded() { - lock.readLock().lock(); - try { - if (incomingMessages.size() <= sharedQueueResumeThreshold && !pausedConsumers.isEmpty()) { - while (true) { - ConsumerImpl<T> consumer = pausedConsumers.poll(); - if (consumer == null) { - break; - } - - // if messages are readily available on consumer we will attempt to writeLock on the same thread - client.eventLoopGroup().execute(() -> { - receiveMessageFromConsumer(consumer); - }); - } - } - } finally { - lock.readLock().unlock(); - } - } - - @Override - protected Message<T> internalReceive() throws PulsarClientException { - Message<T> message; - try { - message = incomingMessages.take(); - unAckedMessageTracker.add((MessageIdImpl) message.getMessageId()); - resumeReceivingFromPausedConsumersIfNeeded(); - return message; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new PulsarClientException(e); - } - } - - @Override - protected Message<T> internalReceive(int timeout, TimeUnit unit) throws PulsarClientException { - Message<T> message; - try { - message = incomingMessages.poll(timeout, unit); - if (message != null) { - unAckedMessageTracker.add(message.getMessageId()); - } - resumeReceivingFromPausedConsumersIfNeeded(); - return message; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new PulsarClientException(e); - } - } - - @Override - protected CompletableFuture<Message<T>> internalReceiveAsync() { - CompletableFuture<Message<T>> result = new CompletableFuture<>(); - Message<T> message; - try { - lock.writeLock().lock(); - message = incomingMessages.poll(0, TimeUnit.SECONDS); - if (message == null) { - pendingReceives.add(result); - } else { - unAckedMessageTracker.add(message.getMessageId()); - resumeReceivingFromPausedConsumersIfNeeded(); - result.complete(message); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - result.completeExceptionally(new PulsarClientException(e)); - } finally { - lock.writeLock().unlock(); - } - - return result; - } - - @Override - protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType, - Map<String,Long> properties) { - checkArgument(messageId instanceof MessageIdImpl); - - if (getState() != State.Ready) { - return FutureUtil.failedFuture(new PulsarClientException("Consumer already closed")); - } - - if (ackType == AckType.Cumulative) { - return FutureUtil.failedFuture(new PulsarClientException.NotSupportedException( - "Cumulative acknowledge not supported for partitioned topics")); - } else { - - ConsumerImpl<T> consumer = consumers.get(((MessageIdImpl) messageId).getPartitionIndex()); - return consumer.doAcknowledge(messageId, ackType, properties).thenRun(() -> - unAckedMessageTracker.remove(messageId)); - } - - } - - @Override - public CompletableFuture<Void> unsubscribeAsync() { - if (getState() == State.Closing || getState() == State.Closed) { - return FutureUtil.failedFuture( - new PulsarClientException.AlreadyClosedException("Partitioned Consumer was already closed")); - } - setState(State.Closing); - - AtomicReference<Throwable> unsubscribeFail = new AtomicReference<Throwable>(); - AtomicInteger completed = new AtomicInteger(numPartitions); - CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>(); - for (Consumer<T> consumer : consumers) { - if (consumer != null) { - consumer.unsubscribeAsync().handle((unsubscribed, ex) -> { - if (ex != null) { - unsubscribeFail.compareAndSet(null, ex); - } - if (completed.decrementAndGet() == 0) { - if (unsubscribeFail.get() == null) { - setState(State.Closed); - unAckedMessageTracker.close(); - unsubscribeFuture.complete(null); - log.info("[{}] [{}] Unsubscribed Partitioned Consumer", topic, subscription); - } else { - setState(State.Failed); - unsubscribeFuture.completeExceptionally(unsubscribeFail.get()); - log.error("[{}] [{}] Could not unsubscribe Partitioned Consumer", topic, subscription, - unsubscribeFail.get().getCause()); - } - } - - return null; - }); - } - - } - - return unsubscribeFuture; - } - - @Override - public CompletableFuture<Void> closeAsync() { - if (getState() == State.Closing || getState() == State.Closed) { - unAckedMessageTracker.close(); - return CompletableFuture.completedFuture(null); - } - setState(State.Closing); - - AtomicReference<Throwable> closeFail = new AtomicReference<Throwable>(); - AtomicInteger completed = new AtomicInteger(numPartitions); - CompletableFuture<Void> closeFuture = new CompletableFuture<>(); - for (Consumer<T> consumer : consumers) { - if (consumer != null) { - consumer.closeAsync().handle((closed, ex) -> { - if (ex != null) { - closeFail.compareAndSet(null, ex); - } - if (completed.decrementAndGet() == 0) { - if (closeFail.get() == null) { - setState(State.Closed); - unAckedMessageTracker.close(); - closeFuture.complete(null); - log.info("[{}] [{}] Closed Partitioned Consumer", topic, subscription); - client.cleanupConsumer(this); - // fail all pending-receive futures to notify application - failPendingReceive(); - } else { - setState(State.Failed); - closeFuture.completeExceptionally(closeFail.get()); - log.error("[{}] [{}] Could not close Partitioned Consumer", topic, subscription, - closeFail.get().getCause()); - } - } - - return null; - }); - } - - } - - return closeFuture; - } - - private void failPendingReceive() { - lock.readLock().lock(); - try { - if (listenerExecutor != null && !listenerExecutor.isShutdown()) { - while (!pendingReceives.isEmpty()) { - CompletableFuture<Message<T>> receiveFuture = pendingReceives.poll(); - if (receiveFuture != null) { - receiveFuture.completeExceptionally( - new PulsarClientException.AlreadyClosedException("Consumer is already closed")); - } else { - break; - } - } - } - } finally { - lock.readLock().unlock(); - } - } - - @Override - public boolean isConnected() { - return consumers.stream().allMatch(ConsumerImpl::isConnected); - } - - void messageReceived(Message<T> message) { - lock.writeLock().lock(); - try { - unAckedMessageTracker.add(message.getMessageId()); - if (log.isDebugEnabled()) { - log.debug("[{}][{}] Received message from partitioned-consumer {}", topic, subscription, message.getMessageId()); - } - // if asyncReceive is waiting : return message to callback without adding to incomingMessages queue - if (!pendingReceives.isEmpty()) { - CompletableFuture<Message<T>> receivedFuture = pendingReceives.poll(); - listenerExecutor.execute(() -> receivedFuture.complete(message)); - } else { - // Enqueue the message so that it can be retrieved when application calls receive() - // Waits for the queue to have space for the message - // This should never block cause PartitonedConsumerImpl should always use GrowableArrayBlockingQueue - incomingMessages.put(message); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } finally { - lock.writeLock().unlock(); - } - - if (listener != null) { - // Trigger the notification on the message listener in a separate thread to avoid blocking the networking - // thread while the message processing happens - listenerExecutor.execute(() -> { - Message<T> msg; - try { - msg = internalReceive(); - } catch (PulsarClientException e) { - log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e); - return; - } - - try { - if (log.isDebugEnabled()) { - log.debug("[{}][{}] Calling message listener for message {}", topic, subscription, message.getMessageId()); - } - listener.received(PartitionedConsumerImpl.this, msg); - } catch (Throwable t) { - log.error("[{}][{}] Message listener error in processing message: {}", topic, subscription, message, - t); - } - }); - } - } - - @Override - String getHandlerName() { - return subscription; - } - - private ConsumerConfigurationData<T> getInternalConsumerConfig() { - ConsumerConfigurationData<T> internalConsumerConfig = new ConsumerConfigurationData<>(); - internalConsumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize()); - internalConsumerConfig.setSubscriptionName(conf.getSubscriptionName()); - internalConsumerConfig.setSubscriptionType(conf.getSubscriptionType()); - internalConsumerConfig.setConsumerName(consumerName); - internalConsumerConfig.setAcknowledgementsGroupTimeMicros(conf.getAcknowledgementsGroupTimeMicros()); - internalConsumerConfig.setPriorityLevel(conf.getPriorityLevel()); - internalConsumerConfig.setProperties(conf.getProperties()); - internalConsumerConfig.setReadCompacted(conf.isReadCompacted()); - if (null != conf.getConsumerEventListener()) { - internalConsumerConfig.setConsumerEventListener(conf.getConsumerEventListener()); - } - - int receiverQueueSize = Math.min(conf.getReceiverQueueSize(), - conf.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions); - internalConsumerConfig.setReceiverQueueSize(receiverQueueSize); - - if (conf.getCryptoKeyReader() != null) { - internalConsumerConfig.setCryptoKeyReader(conf.getCryptoKeyReader()); - internalConsumerConfig.setCryptoFailureAction(conf.getCryptoFailureAction()); - } - if (conf.getAckTimeoutMillis() != 0) { - internalConsumerConfig.setAckTimeoutMillis(conf.getAckTimeoutMillis()); - } - - return internalConsumerConfig; - } - - @Override - public void redeliverUnacknowledgedMessages() { - synchronized (this) { - for (ConsumerImpl<T> c : consumers) { - c.redeliverUnacknowledgedMessages(); - } - incomingMessages.clear(); - unAckedMessageTracker.clear(); - resumeReceivingFromPausedConsumersIfNeeded(); - } - } - - @Override - public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) { - checkArgument(messageIds.stream().findFirst().get() instanceof MessageIdImpl); - if (conf.getSubscriptionType() != SubscriptionType.Shared) { - // We cannot redeliver single messages if subscription type is not Shared - redeliverUnacknowledgedMessages(); - return; - } - removeExpiredMessagesFromQueue(messageIds); - messageIds.stream() - .map(messageId -> (MessageIdImpl)messageId) - .collect(Collectors.groupingBy(MessageIdImpl::getPartitionIndex, Collectors.toSet())) - .forEach((partitionIndex, messageIds1) -> - consumers.get(partitionIndex).redeliverUnacknowledgedMessages( - messageIds1.stream().map(mid -> (MessageId)mid).collect(Collectors.toSet()))); - resumeReceivingFromPausedConsumersIfNeeded(); - } - - @Override - public void seek(MessageId messageId) throws PulsarClientException { - try { - seekAsync(messageId).get(); - } catch (ExecutionException e) { - throw new PulsarClientException(e.getCause()); - } catch (InterruptedException e) { - throw new PulsarClientException(e); - } - } - - @Override - public CompletableFuture<Void> seekAsync(MessageId messageId) { - return FutureUtil.failedFuture(new PulsarClientException("Seek operation not supported on partitioned topics")); - } - - List<ConsumerImpl<T>> getConsumers() { - return consumers; - } - - @Override - public int getAvailablePermits() { - return consumers.stream().mapToInt(ConsumerImpl::getAvailablePermits).sum(); - } - - @Override - public boolean hasReachedEndOfTopic() { - return consumers.stream().allMatch(Consumer::hasReachedEndOfTopic); - } - - @Override - public int numMessagesInQueue() { - return incomingMessages.size() + consumers.stream().mapToInt(ConsumerImpl::numMessagesInQueue).sum(); - } - - @Override - public synchronized ConsumerStatsRecorderImpl getStats() { - if (stats == null) { - return null; - } - stats.reset(); - for (int i = 0; i < numPartitions; i++) { - stats.updateCumulativeStats(consumers.get(i).getStats()); - } - return stats; - } - - public UnAckedMessageTracker getUnAckedMessageTracker() { - return unAckedMessageTracker; - } - - private void removeExpiredMessagesFromQueue(Set<MessageId> messageIds) { - Message<T> peek = incomingMessages.peek(); - if (peek != null) { - if (!messageIds.contains(peek.getMessageId())) { - // first message is not expired, then no message is expired in queue. - return; - } - - // try not to remove elements that are added while we remove - Message<T> message = incomingMessages.poll(); - while (message != null) { - MessageIdImpl messageId = (MessageIdImpl) message.getMessageId(); - if (!messageIds.contains(messageId)) { - messageIds.add(messageId); - break; - } - message = incomingMessages.poll(); - } - } - } - - private static final Logger log = LoggerFactory.getLogger(PartitionedConsumerImpl.class); -} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java similarity index 89% rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java rename to pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java index f9cf550e5..d0b0c606c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java @@ -41,17 +41,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class PatternTopicsConsumerImpl<T> extends TopicsConsumerImpl<T> implements TimerTask { +public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T> implements TimerTask { private final Pattern topicsPattern; private final TopicsChangedListener topicsChangeListener; private volatile Timeout recheckPatternTimeout = null; - public PatternTopicsConsumerImpl(Pattern topicsPattern, - PulsarClientImpl client, - ConsumerConfigurationData<T> conf, - ExecutorService listenerExecutor, - CompletableFuture<Consumer<T>> subscribeFuture, - Schema<T> schema) { + public PatternMultiTopicsConsumerImpl(Pattern topicsPattern, + PulsarClientImpl client, + ConsumerConfigurationData<T> conf, + ExecutorService listenerExecutor, + CompletableFuture<Consumer<T>> subscribeFuture, + Schema<T> schema) { super(client, conf, listenerExecutor, subscribeFuture, schema); this.topicsPattern = topicsPattern; @@ -86,7 +86,7 @@ public void run(Timeout timeout) throws Exception { } List<String> newTopics = PulsarClientImpl.topicsPatternFilter(topics, topicsPattern); - List<String> oldTopics = PatternTopicsConsumerImpl.this.getTopics(); + List<String> oldTopics = PatternMultiTopicsConsumerImpl.this.getTopics(); futures.add(topicsChangeListener.onTopicsAdded(topicsListsMinus(newTopics, oldTopics))); futures.add(topicsChangeListener.onTopicsRemoved(topicsListsMinus(oldTopics, newTopics))); @@ -100,7 +100,7 @@ public void run(Timeout timeout) throws Exception { }); // schedule the next re-check task - client.timer().newTimeout(PatternTopicsConsumerImpl.this, + client.timer().newTimeout(PatternMultiTopicsConsumerImpl.this, Math.min(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.MINUTES); } @@ -109,9 +109,9 @@ public Pattern getPattern() { } interface TopicsChangedListener { - // unsubscribe and delete ConsumerImpl in the `consumers` map in `TopicsConsumerImpl` based on added topics. + // unsubscribe and delete ConsumerImpl in the `consumers` map in `MultiTopicsConsumerImpl` based on added topics. CompletableFuture<Void> onTopicsRemoved(Collection<String> removedTopics); - // subscribe and create a list of new ConsumerImpl, added them to the `consumers` map in `TopicsConsumerImpl`. + // subscribe and create a list of new ConsumerImpl, added them to the `consumers` map in `MultiTopicsConsumerImpl`. CompletableFuture<Void> onTopicsAdded(Collection<String> addedTopics); } @@ -181,5 +181,5 @@ Timeout getRecheckPatternTimeout() { return recheckPatternTimeout; } - private static final Logger log = LoggerFactory.getLogger(PatternTopicsConsumerImpl.class); + private static final Logger log = LoggerFactory.getLogger(PatternMultiTopicsConsumerImpl.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 47e953b5f..dd07cf9fc 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -384,8 +384,8 @@ public ClientConfigurationData getConfiguration() { // gets the next single threaded executor from the list of executors ExecutorService listenerThread = externalExecutorProvider.getExecutor(); if (metadata.partitions > 1) { - consumer = new PartitionedConsumerImpl<>(PulsarClientImpl.this, conf, metadata.partitions, listenerThread, - consumerSubscribedFuture, schema); + consumer = MultiTopicsConsumerImpl.createPartitionedConsumer(PulsarClientImpl.this, conf, + listenerThread, consumerSubscribedFuture, metadata.partitions, schema); } else { consumer = new ConsumerImpl<>(PulsarClientImpl.this, topic, conf, listenerThread, -1, consumerSubscribedFuture, schema); @@ -406,7 +406,7 @@ public ClientConfigurationData getConfiguration() { private <T> CompletableFuture<Consumer<T>> multiTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema) { CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>(); - ConsumerBase<T> consumer = new TopicsConsumerImpl<>(PulsarClientImpl.this, conf, + ConsumerBase<T> consumer = new MultiTopicsConsumerImpl<>(PulsarClientImpl.this, conf, externalExecutorProvider.getExecutor(), consumerSubscribedFuture, schema); synchronized (consumers) { @@ -436,7 +436,7 @@ public ClientConfigurationData getConfiguration() { List<String> topicsList = topicsPatternFilter(topics, conf.getTopicsPattern()); conf.getTopicNames().addAll(topicsList); - ConsumerBase<T> consumer = new PatternTopicsConsumerImpl<>(conf.getTopicsPattern(), + ConsumerBase<T> consumer = new PatternMultiTopicsConsumerImpl<>(conf.getTopicsPattern(), PulsarClientImpl.this, conf, externalExecutorProvider.getExecutor(), diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java index 6f3218821..5ae452ee5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java @@ -27,7 +27,7 @@ private final String topicName; private final Message<T> msg; - private final MessageId msgId; + private final TopicMessageIdImpl msgId; TopicMessageImpl(String topicName, Message<T> msg) { @@ -49,6 +49,10 @@ public MessageId getMessageId() { return msgId; } + public MessageId getInnerMessageId() { + return msgId.getInnerMessageId(); + } + @Override public Map<String, String> getProperties() { return msg.getProperties(); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services