This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c70438c Feature - support seek() on Reader (#4031)
c70438c is described below
commit c70438c63d4a12015763d2b93f65b3cd14a9635f
Author: Ezequiel Lovelle <[email protected]>
AuthorDate: Tue Apr 23 14:40:17 2019 -0300
Feature - support seek() on Reader (#4031)
*Motivation*
fix #3976
According to what was discussed in pull #3983 it would be an acceptable
solution
to add seek() command to Reader in order to reset a non durable cursor after
Reader instance was build.
*Modifications*
- Bugfix reset() by timestamp on a non-durable consumer, previously the
cached cursor was not present, therefore the state set by reset() was
missed
resulting in a reset() at the beginning of the cursor instead of a
reset()
at the expected position.
- Copy seek() commands to Reader interface from Consumer interface.
- Fix inconsistency with lastDequeuedMessage field after seek() command
was
performed successfully.
- Fix consumer discarding messages on receive (after seek() command) due
to
messages being present on acknowledge grouping tacker.
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 17 ++-
.../apache/pulsar/client/api/TopicReaderTest.java | 122 +++++++++++++++++++++
.../java/org/apache/pulsar/client/api/Reader.java | 59 ++++++++++
.../impl/AcknowledgmentsGroupingTracker.java | 1 +
.../apache/pulsar/client/impl/ConsumerImpl.java | 6 +
...NonPersistentAcknowledgmentGroupingTracker.java | 5 +
.../PersistentAcknowledgmentsGroupingTracker.java | 7 ++
.../org/apache/pulsar/client/impl/ReaderImpl.java | 29 +++--
8 files changed, 237 insertions(+), 9 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 7a4eb60..e19b913 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -850,8 +850,23 @@ public class ManagedLedgerImpl implements ManagedLedger,
CreateCallback {
checkManagedLedgerIsOpen();
checkFenced();
- return new NonDurableCursorImpl(bookKeeper, config, this, cursorName,
+ ManagedCursor cachedCursor = cursors.get(cursorName);
+ if (cachedCursor != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Cursor was already created {}", name,
cachedCursor);
+ }
+ return cachedCursor;
+ }
+
+ NonDurableCursorImpl cursor = new NonDurableCursorImpl(bookKeeper,
config, this, cursorName,
(PositionImpl) startCursorPosition);
+
+ log.info("[{}] Opened new cursor: {}", name, cursor);
+ synchronized (this) {
+ cursors.add(cursor);
+ }
+
+ return cursor;
}
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index c1e93aa..baf3d13 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -510,4 +510,126 @@ public class TopicReaderTest extends ProducerConsumerBase
{
reader.close();
producer.close();
}
+
+ @Test
+ public void testReaderIsAbleToSeekWithTimeOnBeginningOfTopic() throws
Exception {
+ final String topicName =
"persistent://my-property/my-ns/ReaderSeekWithTimeOnBeginningOfTopic";
+ final int numOfMessage = 10;
+
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName).create();
+
+ for (int i = 0; i < numOfMessage; i++) {
+ producer.send(String.format("msg num %d", i).getBytes());
+ }
+
+ Reader<byte[]> reader = pulsarClient.newReader().topic(topicName)
+ .startMessageId(MessageId.earliest).create();
+
+ assertTrue(reader.hasMessageAvailable());
+
+ // Read all messages the first time
+ for (int i = 0; i < numOfMessage; i++) {
+ Message<byte[]> message = reader.readNext();
+ Assert.assertEquals(message.getData(), String.format("msg num %d",
i).getBytes());
+ }
+
+ assertFalse(reader.hasMessageAvailable());
+
+ // Perform cursor reset by time
+ reader.seek(RelativeTimeUtil.parseRelativeTimeInSeconds("-1m"));
+
+ // Read all messages a second time after seek()
+ for (int i = 0; i < numOfMessage; i++) {
+ Message<byte[]> message = reader.readNext();
+ Assert.assertEquals(message.getData(), String.format("msg num %d",
i).getBytes());
+ }
+
+ // Reader should be finished
+ assertTrue(reader.isConnected());
+ assertFalse(reader.hasMessageAvailable());
+ assertEquals(((ReaderImpl) reader).getConsumer().numMessagesInQueue(),
0);
+
+ reader.close();
+ producer.close();
+ }
+
+ @Test
+ public void testReaderIsAbleToSeekWithMessageIdOnMiddleOfTopic() throws
Exception {
+ final String topicName =
"persistent://my-property/my-ns/ReaderSeekWithMessageIdOnMiddleOfTopic";
+ final int numOfMessage = 100;
+ final int halfMessages = numOfMessage / 2;
+
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName).create();
+
+ for (int i = 0; i < numOfMessage; i++) {
+ producer.send(String.format("msg num %d", i).getBytes());
+ }
+
+ Reader<byte[]> reader = pulsarClient.newReader().topic(topicName)
+ .startMessageId(MessageId.earliest).create();
+
+ assertTrue(reader.hasMessageAvailable());
+
+ // Read all messages the first time
+ MessageId midmessageToSeek = null;
+ for (int i = 0; i < numOfMessage; i++) {
+ Message<byte[]> message = reader.readNext();
+ Assert.assertEquals(message.getData(), String.format("msg num %d",
i).getBytes());
+
+ if (i == halfMessages) {
+ midmessageToSeek = message.getMessageId();
+ }
+ }
+
+ assertFalse(reader.hasMessageAvailable());
+
+ // Perform cursor reset by MessageId to half of the topic
+ reader.seek(midmessageToSeek);
+
+ // Read all halved messages after seek()
+ for (int i = halfMessages + 1; i < numOfMessage; i++) {
+ Message<byte[]> message = reader.readNext();
+ Assert.assertEquals(message.getData(), String.format("msg num %d",
i).getBytes());
+ }
+
+ // Reader should be finished
+ assertTrue(reader.isConnected());
+ assertFalse(reader.hasMessageAvailable());
+ assertEquals(((ReaderImpl) reader).getConsumer().numMessagesInQueue(),
0);
+
+ reader.close();
+ producer.close();
+ }
+
+ @Test
+ public void testReaderIsAbleToSeekWithTimeOnMiddleOfTopic() throws
Exception {
+ final String topicName =
"persistent://my-property/my-ns/ReaderIsAbleToSeekWithTimeOnMiddleOfTopic";
+ final int numOfMessage = 10;
+ final int halfMessages = numOfMessage / 2;
+
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName).create();
+
+ long l = System.currentTimeMillis();
+ for (int i = 0; i < numOfMessage; i++) {
+ producer.send(String.format("msg num %d", i).getBytes());
+ Thread.sleep(100);
+ }
+
+ Reader<byte[]> reader = pulsarClient.newReader().topic(topicName)
+ .startMessageId(MessageId.earliest).create();
+
+ int plusTime = (halfMessages + 1) * 100;
+ reader.seek(l + plusTime);
+
+ for (int i = halfMessages; i < numOfMessage; i++) {
+ Message<byte[]> message = reader.readNext();
+ Assert.assertEquals(message.getData(), String.format("msg num %d",
i).getBytes());
+ }
+
+ reader.close();
+ producer.close();
+ }
}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java
index aa22cb6..c2b593b 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java
@@ -116,4 +116,63 @@ public interface Reader<T> extends Closeable {
* @return Whether the reader is connected to the broker
*/
boolean isConnected();
+
+ /**
+ * Reset the subscription associated with this reader to a specific
message id.
+ * <p>
+ *
+ * The message id can either be a specific message or represent the first
or last messages in the topic.
+ * <p>
+ * <ul>
+ * <li><code>MessageId.earliest</code> : Reset the reader on the earliest
message available in the topic
+ * <li><code>MessageId.latest</code> : Reset the reader on the latest
message in the topic
+ * </ul>
+ *
+ * Note: this operation can only be done on non-partitioned topics. For
these, one can rather perform the seek() on
+ * the individual partitions.
+ *
+ * @param messageId the message id where to reposition the reader
+ */
+ void seek(MessageId messageId) throws PulsarClientException;
+
+ /**
+ * Reset the subscription associated with this reader to a specific
message publish time.
+ *
+ * Note: this operation can only be done on non-partitioned topics. For
these, one can rather perform the seek() on
+ * the individual partitions.
+ *
+ * @param timestamp the message publish time where to reposition the reader
+ */
+ void seek(long timestamp) throws PulsarClientException;
+
+ /**
+ * Reset the subscription associated with this reader to a specific
message id.
+ * <p>
+ *
+ * The message id can either be a specific message or represent the first
or last messages in the topic.
+ * <p>
+ * <ul>
+ * <li><code>MessageId.earliest</code> : Reset the reader on the earliest
message available in the topic
+ * <li><code>MessageId.latest</code> : Reset the reader on the latest
message in the topic
+ * </ul>
+ *
+ * Note: this operation can only be done on non-partitioned topics. For
these, one can rather perform the seek() on
+ * the individual partitions.
+ *
+ * @param messageId the message id where to position the reader
+ * @return a future to track the completion of the seek operation
+ */
+ CompletableFuture<Void> seekAsync(MessageId messageId);
+
+ /**
+ * Reset the subscription associated with this reader to a specific
message publish time.
+ *
+ * Note: this operation can only be done on non-partitioned topics. For
these, one can rather perform the seek() on
+ * the individual partitions.
+ *
+ * @param timestamp
+ * the message publish time where to position the reader
+ * @return a future to track the completion of the seek operation
+ */
+ CompletableFuture<Void> seekAsync(long timestamp);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
index 195a9bb..93600ec 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
@@ -36,4 +36,5 @@ public interface AcknowledgmentsGroupingTracker extends
AutoCloseable {
@Override
void close();
+ void flushAndClean();
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 9b24d35..41eaa6d 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1377,6 +1377,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
log.info("[{}][{}] Successfully reset subscription to publish time
{}", topic, subscription, timestamp);
+ acknowledgmentsGroupingTracker.flushAndClean();
+ lastDequeuedMessage = MessageId.earliest;
+ incomingMessages.clear();
seekFuture.complete(null);
}).exceptionally(e -> {
log.error("[{}][{}] Failed to reset subscription: {}", topic,
subscription, e.getCause().getMessage());
@@ -1408,6 +1411,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
log.info("[{}][{}] Successfully reset subscription to message id
{}", topic, subscription, messageId);
+ acknowledgmentsGroupingTracker.flushAndClean();
+ lastDequeuedMessage = messageId;
+ incomingMessages.clear();
seekFuture.complete(null);
}).exceptionally(e -> {
log.error("[{}][{}] Failed to reset subscription: {}", topic,
subscription, e.getCause().getMessage());
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java
index fbf6d03..3c987d9 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java
@@ -54,4 +54,9 @@ public class NonPersistentAcknowledgmentGroupingTracker
implements Acknowledgmen
public void close() {
// no-op
}
+
+ @Override
+ public void flushAndClean() {
+ // no-op
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index c79d24f..b40cce4 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -208,6 +208,13 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
}
@Override
+ public void flushAndClean() {
+ flush();
+ lastCumulativeAck = (MessageIdImpl) MessageId.earliest;
+ pendingIndividualAcks.clear();
+ }
+
+ @Override
public void close() {
flush();
if (scheduledTask != null) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 08ca1ed..26a2ad7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -25,14 +25,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageListener;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.client.api.ReaderListener;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
@@ -156,4 +149,24 @@ public class ReaderImpl<T> implements Reader<T> {
public boolean isConnected() {
return consumer.isConnected();
}
+
+ @Override
+ public void seek(MessageId messageId) throws PulsarClientException {
+ consumer.seek(messageId);
+ }
+
+ @Override
+ public void seek(long timestamp) throws PulsarClientException {
+ consumer.seek(timestamp);
+ }
+
+ @Override
+ public CompletableFuture<Void> seekAsync(MessageId messageId) {
+ return consumer.seekAsync(messageId);
+ }
+
+ @Override
+ public CompletableFuture<Void> seekAsync(long timestamp) {
+ return consumer.seekAsync(timestamp);
+ }
}