This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 374d2ac Improve message id comparison (#1982) 374d2ac is described below commit 374d2ac51c99a61c9895b1fec095bbb0439b722c Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Mon Jun 18 18:29:10 2018 -0700 Improve message id comparison (#1982) *Motivation* Fixes #1918 *Solution* Make sure `MessageIdImpl` and `BatchMessageIdImpl` recognize TopicMessageIdImpl --- .../pulsar/client/impl/BatchMessageIdImpl.java | 2 ++ .../apache/pulsar/client/impl/MessageIdImpl.java | 21 ++++++----- .../pulsar/client/impl/TopicMessageIdImpl.java | 11 ++++++ .../pulsar/client/impl/MessageIdCompareToTest.java | 42 ++++++++++++++++++++++ 4 files changed, 68 insertions(+), 8 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java index d87a6ab..78109c4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java @@ -72,6 +72,8 @@ public class BatchMessageIdImpl extends MessageIdImpl { } else { return res; } + } else if (o instanceof TopicMessageIdImpl) { + return compareTo(((TopicMessageIdImpl) o).getInnerMessageId()); } else { throw new IllegalArgumentException( "expected BatchMessageIdImpl object. Got instance of " + o.getClass().getName()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java index 4810f0c..41cc9c9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java @@ -73,6 +73,9 @@ public class MessageIdImpl implements MessageId { if (obj instanceof MessageIdImpl) { MessageIdImpl other = (MessageIdImpl) obj; return ledgerId == other.ledgerId && entryId == other.entryId && partitionIndex == other.partitionIndex; + } else if (obj instanceof BatchMessageIdImpl){ + BatchMessageIdImpl other = (BatchMessageIdImpl) obj; + return other.equals(this); } return false; } @@ -148,16 +151,18 @@ public class MessageIdImpl implements MessageId { @Override public int compareTo(MessageId o) { - if (!(o instanceof MessageIdImpl)) { + if (o instanceof MessageIdImpl) { + MessageIdImpl other = (MessageIdImpl) o; + return ComparisonChain.start() + .compare(this.ledgerId, other.ledgerId) + .compare(this.entryId, other.entryId) + .compare(this.getPartitionIndex(), other.getPartitionIndex()) + .result(); + } else if (o instanceof TopicMessageIdImpl) { + return compareTo(((TopicMessageIdImpl) o).getInnerMessageId()); + } else { throw new IllegalArgumentException( "expected MessageIdImpl object. Got instance of " + o.getClass().getName()); } - - MessageIdImpl other = (MessageIdImpl) o; - return ComparisonChain.start() - .compare(this.ledgerId, other.ledgerId) - .compare(this.entryId, other.entryId) - .compare(this.getPartitionIndex(), other.getPartitionIndex()) - .result(); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java index c7cc453..071b804 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl; +import java.util.Objects; import org.apache.pulsar.client.api.MessageId; public class TopicMessageIdImpl implements MessageId { @@ -43,6 +44,16 @@ public class TopicMessageIdImpl implements MessageId { } @Override + public boolean equals(Object obj) { + if (!(obj instanceof TopicMessageIdImpl)) { + return false; + } + TopicMessageIdImpl other = (TopicMessageIdImpl) obj; + return Objects.equals(topicName, other.topicName) + && Objects.equals(messageId, other.messageId); + } + + @Override public int compareTo(MessageId o) { return messageId.compareTo(o); } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java index 690fc89..78af44e 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import org.testng.annotations.Test; @@ -117,4 +118,45 @@ public class MessageIdCompareToTest { assertTrue(batchMessageId3.compareTo(messageIdImpl) == 0, "Expected to be equal"); } + @Test + public void testMessageIdImplCompareToTopicMessageId() { + MessageIdImpl messageIdImpl = new MessageIdImpl(123L, 345L, 567); + TopicMessageIdImpl topicMessageId1 = new TopicMessageIdImpl( + "test-topic", + new BatchMessageIdImpl(123L, 345L, 566, 789)); + TopicMessageIdImpl topicMessageId2 = new TopicMessageIdImpl( + "test-topic", + new BatchMessageIdImpl(123L, 345L, 567, 789)); + TopicMessageIdImpl topicMessageId3 = new TopicMessageIdImpl( + "test-topic", + new BatchMessageIdImpl(messageIdImpl)); + assertTrue(messageIdImpl.compareTo(topicMessageId1) > 0, "Expected to be greater than"); + assertTrue(messageIdImpl.compareTo(topicMessageId2) == 0, "Expected to be equal"); + assertTrue(messageIdImpl.compareTo(topicMessageId3) == 0, "Expected to be equal"); + assertTrue(topicMessageId1.compareTo(messageIdImpl) < 0, "Expected to be less than"); + assertTrue(topicMessageId2.compareTo(messageIdImpl) > 0, "Expected to be greater than"); + assertTrue(topicMessageId3.compareTo(messageIdImpl) == 0, "Expected to be equal"); + } + + @Test + public void testBatchMessageIdImplCompareToTopicMessageId() { + BatchMessageIdImpl messageIdImpl1 = new BatchMessageIdImpl(123L, 345L, 567, 789); + BatchMessageIdImpl messageIdImpl2 = new BatchMessageIdImpl(123L, 345L, 567, 0); + BatchMessageIdImpl messageIdImpl3 = new BatchMessageIdImpl(123L, 345L, 567, -1); + TopicMessageIdImpl topicMessageId1 = new TopicMessageIdImpl( + "test-topic", + new MessageIdImpl(123L, 345L, 566)); + TopicMessageIdImpl topicMessageId2 = new TopicMessageIdImpl( + "test-topic", + new MessageIdImpl(123L, 345L, 567)); + assertTrue(messageIdImpl1.compareTo(topicMessageId1) > 0, "Expected to be greater than"); + assertTrue(messageIdImpl1.compareTo(topicMessageId2) > 0, "Expected to be greater than"); + assertTrue(messageIdImpl2.compareTo(topicMessageId2) > 0, "Expected to be greater than"); + assertTrue(messageIdImpl3.compareTo(topicMessageId2) == 0, "Expected to be equal"); + assertTrue(topicMessageId1.compareTo(messageIdImpl1) < 0, "Expected to be less than"); + assertTrue(topicMessageId2.compareTo(messageIdImpl1) == 0, "Expected to be equal"); + assertTrue(topicMessageId2.compareTo(messageIdImpl2) == 0, "Expected to be equal"); + assertTrue(topicMessageId2.compareTo(messageIdImpl2) == 0, "Expected to be equal"); + } + }