This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 8688b67 Issue #2330: change getTopicName in MultiTopicsConsumer (#2346) 8688b67 is described below commit 8688b6733177ee4a48d805df18a1d82865cf7320 Author: Jia Zhai <jiaz...@users.noreply.github.com> AuthorDate: Tue Aug 28 02:38:41 2018 +0800 Issue #2330: change getTopicName in MultiTopicsConsumer (#2346) * change getTopicName in MultiTopicsConsumer * change following sijie's comments * keep both topicName and topicPartitonName in consumer to avoid new string --- .../apache/pulsar/client/impl/ConsumerImpl.java | 8 +++++++ .../client/impl/MultiTopicsConsumerImpl.java | 9 ++++---- .../pulsar/client/impl/TopicMessageIdImpl.java | 27 ++++++++++++++++++---- .../pulsar/client/impl/TopicMessageImpl.java | 20 ++++++++++++---- .../client/impl/UnAckedTopicMessageTracker.java | 4 ++-- .../pulsar/client/impl/MessageIdCompareToTest.java | 6 ++++- .../tests/integration/semantics/SemanticsTest.java | 2 +- 7 files changed, 60 insertions(+), 16 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index da04534..fe37b69 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -131,6 +131,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle private final SubscriptionInitialPosition subscriptionInitialPosition; private final ConnectionHandler connectionHandler; + private final String topicNameWithoutPartition; + enum SubscriptionMode { // Make the subscription to be backed by a durable cursor that will retain messages and persist the current // position @@ -203,6 +205,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle NonPersistentAcknowledgmentGroupingTracker.of(); } + topicNameWithoutPartition = topicName.getPartitionedTopicName(); + grabCnx(); } @@ -1458,6 +1462,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle this.connectionHandler.grabCnx(); } + public String getTopicNameWithoutPartition() { + return topicNameWithoutPartition; + } + private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 799bec8..f1cb9cf 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -231,7 +231,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { checkArgument(message instanceof MessageImpl); lock.writeLock().lock(); try { - TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(consumer.getTopic(), message); + TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>( + consumer.getTopic(), consumer.getTopicNameWithoutPartition(), message); unAckedMessageTracker.add(topicMessage.getMessageId()); if (log.isDebugEnabled()) { @@ -370,7 +371,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { } if (ackType == AckType.Cumulative) { - Consumer individualConsumer = consumers.get(topicMessageId.getTopicName()); + Consumer individualConsumer = consumers.get(topicMessageId.getTopicPartitionName()); if (individualConsumer != null) { MessageId innerId = topicMessageId.getInnerMessageId(); return individualConsumer.acknowledgeCumulativeAsync(innerId); @@ -378,7 +379,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException()); } } else { - ConsumerImpl<T> consumer = consumers.get(topicMessageId.getTopicName()); + ConsumerImpl<T> consumer = consumers.get(topicMessageId.getTopicPartitionName()); MessageId innerId = topicMessageId.getInnerMessageId(); return consumer.doAcknowledge(innerId, ackType, properties) @@ -511,7 +512,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { } removeExpiredMessagesFromQueue(messageIds); messageIds.stream().map(messageId -> (TopicMessageIdImpl)messageId) - .collect(Collectors.groupingBy(TopicMessageIdImpl::getTopicName, Collectors.toSet())) + .collect(Collectors.groupingBy(TopicMessageIdImpl::getTopicPartitionName, Collectors.toSet())) .forEach((topicName, messageIds1) -> consumers.get(topicName) .redeliverUnacknowledgedMessages(messageIds1.stream() diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java index 071b804..dd1b37d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java @@ -18,20 +18,39 @@ */ package org.apache.pulsar.client.impl; +import static com.google.common.base.Preconditions.checkState; +import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX; + import java.util.Objects; import org.apache.pulsar.client.api.MessageId; public class TopicMessageIdImpl implements MessageId { + + /** This topicPartitionName is get from ConsumerImpl, it contains partition part. */ + private final String topicPartitionName; private final String topicName; private final MessageId messageId; - TopicMessageIdImpl(String topicName, MessageId messageId) { - this.topicName = topicName; + TopicMessageIdImpl(String topicPartitionName, String topicName, MessageId messageId) { this.messageId = messageId; + this.topicPartitionName = topicPartitionName; + this.topicName = topicName; } + /** + * Get the topic name without partition part of this message. + * @return the name of the topic on which this message was published + */ public String getTopicName() { - return topicName; + return this.topicName; + } + + /** + * Get the topic name which contains partition part for this message. + * @return the topic name which contains Partition part + */ + public String getTopicPartitionName() { + return this.topicPartitionName; } public MessageId getInnerMessageId() { @@ -49,7 +68,7 @@ public class TopicMessageIdImpl implements MessageId { return false; } TopicMessageIdImpl other = (TopicMessageIdImpl) obj; - return Objects.equals(topicName, other.topicName) + return Objects.equals(topicPartitionName, other.topicPartitionName) && Objects.equals(messageId, other.messageId); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java index 4f5ac13..eae02b0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java @@ -21,32 +21,44 @@ package org.apache.pulsar.client.impl; import java.util.Map; import java.util.Optional; - import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.api.EncryptionContext; public class TopicMessageImpl<T> implements Message<T> { + /** This topicPartitionName is get from ConsumerImpl, it contains partition part. */ + private final String topicPartitionName; private final String topicName; private final Message<T> msg; private final TopicMessageIdImpl messageId; - TopicMessageImpl(String topicName, + TopicMessageImpl(String topicPartitionName, + String topicName, Message<T> msg) { + this.topicPartitionName = topicPartitionName; this.topicName = topicName; + this.msg = msg; - this.messageId = new TopicMessageIdImpl(topicName, msg.getMessageId()); + this.messageId = new TopicMessageIdImpl(topicPartitionName, topicName, msg.getMessageId()); } /** - * Get the topic name of this message. + * Get the topic name without partition part of this message. * @return the name of the topic on which this message was published */ public String getTopicName() { return topicName; } + /** + * Get the topic name which contains partition part for this message. + * @return the topic name which contains Partition part + */ + public String getTopicPartitionName() { + return topicPartitionName; + } + @Override public MessageId getMessageId() { return messageId; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java index dfc9257..f500fda 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java @@ -32,12 +32,12 @@ public class UnAckedTopicMessageTracker extends UnAckedMessageTracker { int currentSetRemovedMsgCount = currentSet.removeIf(m -> { checkState(m instanceof TopicMessageIdImpl, "message should be of type TopicMessageIdImpl"); - return ((TopicMessageIdImpl)m).getTopicName().contains(topicName); + return ((TopicMessageIdImpl)m).getTopicPartitionName().contains(topicName); }); int oldSetRemovedMsgCount = oldOpenSet.removeIf(m -> { checkState(m instanceof TopicMessageIdImpl, "message should be of type TopicMessageIdImpl"); - return ((TopicMessageIdImpl)m).getTopicName().contains(topicName); + return ((TopicMessageIdImpl)m).getTopicPartitionName().contains(topicName); }); return currentSetRemovedMsgCount + oldSetRemovedMsgCount; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java index 78af44e..d032c23 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.client.impl; -import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import org.testng.annotations.Test; @@ -122,12 +121,15 @@ public class MessageIdCompareToTest { public void testMessageIdImplCompareToTopicMessageId() { MessageIdImpl messageIdImpl = new MessageIdImpl(123L, 345L, 567); TopicMessageIdImpl topicMessageId1 = new TopicMessageIdImpl( + "test-topic-partition-0", "test-topic", new BatchMessageIdImpl(123L, 345L, 566, 789)); TopicMessageIdImpl topicMessageId2 = new TopicMessageIdImpl( + "test-topic-partition-0", "test-topic", new BatchMessageIdImpl(123L, 345L, 567, 789)); TopicMessageIdImpl topicMessageId3 = new TopicMessageIdImpl( + "test-topic-partition-0", "test-topic", new BatchMessageIdImpl(messageIdImpl)); assertTrue(messageIdImpl.compareTo(topicMessageId1) > 0, "Expected to be greater than"); @@ -144,9 +146,11 @@ public class MessageIdCompareToTest { BatchMessageIdImpl messageIdImpl2 = new BatchMessageIdImpl(123L, 345L, 567, 0); BatchMessageIdImpl messageIdImpl3 = new BatchMessageIdImpl(123L, 345L, 567, -1); TopicMessageIdImpl topicMessageId1 = new TopicMessageIdImpl( + "test-topic-partition-0", "test-topic", new MessageIdImpl(123L, 345L, 566)); TopicMessageIdImpl topicMessageId2 = new TopicMessageIdImpl( + "test-topic-partition-0", "test-topic", new MessageIdImpl(123L, 345L, 567)); assertTrue(messageIdImpl1.compareTo(topicMessageId1) > 0, "Expected to be greater than"); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java index 2078726..84dfad1 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/semantics/SemanticsTest.java @@ -245,7 +245,7 @@ public class SemanticsTest extends PulsarClusterTestBase { Message<String> m = consumer.receive(); int topicIdx; if (numTopics > 1) { - String topic = ((TopicMessageIdImpl) m.getMessageId()).getTopicName(); + String topic = ((TopicMessageIdImpl) m.getMessageId()).getTopicPartitionName(); String[] topicParts = StringUtils.split(topic, '-'); topicIdx = Integer.parseInt(topicParts[topicParts.length - 1]);