This is an automated email from the ASF dual-hosted git repository. zhaijia pushed a commit to branch branch-2.5 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 77833d90dafe36e62b7962af60f25fc54389960b Author: Yijie Shen <[email protected]> AuthorDate: Mon Feb 24 12:24:58 2020 +0800 [Reader] Should set either start message id or start message from roll back duration. (#6392) Currently, when constructing a reader, users can set both start message id and start time. This is strange and the behavior should be forbidden. (cherry picked from commit f862961cb84c0cc19dff29b8db5a54a6c578fbe4) --- .../org/apache/pulsar/client/impl/ReaderTest.java | 2 +- .../pulsar/client/impl/ReaderBuilderImpl.java | 10 +++++++-- .../apache/pulsar/client/impl/BuildersTest.java | 25 ++++++++++++++++++++++ 3 files changed, 34 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java index 55daf6b..b6c7c01 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java @@ -229,7 +229,7 @@ public class ReaderTest extends MockedPulsarServiceBaseTest { // (3) Create reader and set position 1 hour back so, it should only read messages which are 2 hours old which // published on step 2 - Reader<byte[]> reader = pulsarClient.newReader().topic(topic).startMessageId(MessageId.earliest) + Reader<byte[]> reader = pulsarClient.newReader().topic(topic) .startMessageFromRollbackDuration(2, TimeUnit.HOURS).create(); List<MessageId> receivedMessageIds = Lists.newArrayList(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java index c024a0b..921cf2d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java @@ -78,9 +78,15 @@ public class ReaderBuilderImpl<T> implements ReaderBuilder<T> { .failedFuture(new IllegalArgumentException("Topic name must be set on the reader builder")); } - if (conf.getStartMessageId() == null) { + if (conf.getStartMessageId() != null && conf.getStartMessageFromRollbackDurationInSec() > 0 || + conf.getStartMessageId() == null && conf.getStartMessageFromRollbackDurationInSec() <= 0) { return FutureUtil - .failedFuture(new IllegalArgumentException("Start message id must be set on the reader builder")); + .failedFuture(new IllegalArgumentException( + "Start message id or start message from roll back must be specified but they cannot be specified at the same time")); + } + + if (conf.getStartMessageFromRollbackDurationInSec() > 0) { + conf.setStartMessageId(MessageId.earliest); } return client.createReaderAsync(conf, schema); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java index 2560d67..4f7554e 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java @@ -26,8 +26,12 @@ import static org.testng.Assert.assertTrue; import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; + import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.impl.conf.ReaderConfigurationData; import org.testng.annotations.Test; @@ -96,5 +100,26 @@ public class BuildersTest { assertTrue(obj instanceof ReaderConfigurationData); assertEquals(((ReaderConfigurationData) obj).getTopicName(), topicName); assertEquals(((ReaderConfigurationData) obj).getStartMessageId(), messageId); + client.close(); + } + + @Test(expectedExceptions = {PulsarClientException.class}, expectedExceptionsMessageRegExp = ".* must be specified but they cannot be specified at the same time.*") + public void shouldNotSetTwoOptAtTheSameTime() throws Exception { + PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build(); + try (Reader reader = client.newReader().topic("abc").startMessageId(MessageId.earliest).startMessageFromRollbackDuration(10, TimeUnit.HOURS).create()) { + // no-op + } finally { + client.close(); + } + } + + @Test(expectedExceptions = {PulsarClientException.class}, expectedExceptionsMessageRegExp = ".* must be specified but they cannot be specified at the same time.*") + public void shouldSetOneStartOpt() throws Exception { + PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build(); + try (Reader reader = client.newReader().topic("abc").create()) { + // no-op + } finally { + client.close(); + } } }
