This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c70438c  Feature - support seek() on Reader (#4031)
c70438c is described below

commit c70438c63d4a12015763d2b93f65b3cd14a9635f
Author: Ezequiel Lovelle <[email protected]>
AuthorDate: Tue Apr 23 14:40:17 2019 -0300

    Feature - support seek() on Reader (#4031)
    
    
    *Motivation*
    
     fix #3976
    
    According to what was discussed in pull #3983 it would be an acceptable 
solution
    to add seek() command to Reader in order to reset a non durable cursor after
    Reader instance was build.
    
    *Modifications*
    
      - Bugfix reset() by timestamp on a non-durable consumer, previously the
        cached cursor was not present, therefore the state set by reset() was 
missed
        resulting in a reset() at the beginning of the cursor instead of a 
reset()
        at the expected position.
      - Copy seek() commands to Reader interface from Consumer interface.
      - Fix inconsistency with lastDequeuedMessage field after seek() command 
was
        performed successfully.
      - Fix consumer discarding messages on receive (after seek() command) due 
to
        messages being present on acknowledge grouping tacker.
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  17 ++-
 .../apache/pulsar/client/api/TopicReaderTest.java  | 122 +++++++++++++++++++++
 .../java/org/apache/pulsar/client/api/Reader.java  |  59 ++++++++++
 .../impl/AcknowledgmentsGroupingTracker.java       |   1 +
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   6 +
 ...NonPersistentAcknowledgmentGroupingTracker.java |   5 +
 .../PersistentAcknowledgmentsGroupingTracker.java  |   7 ++
 .../org/apache/pulsar/client/impl/ReaderImpl.java  |  29 +++--
 8 files changed, 237 insertions(+), 9 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 7a4eb60..e19b913 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -850,8 +850,23 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
         checkManagedLedgerIsOpen();
         checkFenced();
 
-        return new NonDurableCursorImpl(bookKeeper, config, this, cursorName,
+        ManagedCursor cachedCursor = cursors.get(cursorName);
+        if (cachedCursor != null) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Cursor was already created {}", name, 
cachedCursor);
+            }
+            return cachedCursor;
+        }
+
+        NonDurableCursorImpl cursor = new NonDurableCursorImpl(bookKeeper, 
config, this, cursorName,
                 (PositionImpl) startCursorPosition);
+
+        log.info("[{}] Opened new cursor: {}", name, cursor);
+        synchronized (this) {
+            cursors.add(cursor);
+        }
+
+        return cursor;
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index c1e93aa..baf3d13 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -510,4 +510,126 @@ public class TopicReaderTest extends ProducerConsumerBase 
{
         reader.close();
         producer.close();
     }
+
+    @Test
+    public void testReaderIsAbleToSeekWithTimeOnBeginningOfTopic() throws 
Exception {
+        final String topicName = 
"persistent://my-property/my-ns/ReaderSeekWithTimeOnBeginningOfTopic";
+        final int numOfMessage = 10;
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName).create();
+
+        for (int i = 0; i < numOfMessage; i++) {
+            producer.send(String.format("msg num %d", i).getBytes());
+        }
+
+        Reader<byte[]> reader = pulsarClient.newReader().topic(topicName)
+                .startMessageId(MessageId.earliest).create();
+
+        assertTrue(reader.hasMessageAvailable());
+
+        // Read all messages the first time
+        for (int i = 0; i < numOfMessage; i++) {
+            Message<byte[]> message = reader.readNext();
+            Assert.assertEquals(message.getData(), String.format("msg num %d", 
i).getBytes());
+        }
+
+        assertFalse(reader.hasMessageAvailable());
+
+        // Perform cursor reset by time
+        reader.seek(RelativeTimeUtil.parseRelativeTimeInSeconds("-1m"));
+
+        // Read all messages a second time after seek()
+        for (int i = 0; i < numOfMessage; i++) {
+            Message<byte[]> message = reader.readNext();
+            Assert.assertEquals(message.getData(), String.format("msg num %d", 
i).getBytes());
+        }
+
+        // Reader should be finished
+        assertTrue(reader.isConnected());
+        assertFalse(reader.hasMessageAvailable());
+        assertEquals(((ReaderImpl) reader).getConsumer().numMessagesInQueue(), 
0);
+
+        reader.close();
+        producer.close();
+    }
+
+    @Test
+    public void testReaderIsAbleToSeekWithMessageIdOnMiddleOfTopic() throws 
Exception {
+        final String topicName = 
"persistent://my-property/my-ns/ReaderSeekWithMessageIdOnMiddleOfTopic";
+        final int numOfMessage = 100;
+        final int halfMessages = numOfMessage / 2;
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName).create();
+
+        for (int i = 0; i < numOfMessage; i++) {
+            producer.send(String.format("msg num %d", i).getBytes());
+        }
+
+        Reader<byte[]> reader = pulsarClient.newReader().topic(topicName)
+                .startMessageId(MessageId.earliest).create();
+
+        assertTrue(reader.hasMessageAvailable());
+
+        // Read all messages the first time
+        MessageId midmessageToSeek = null;
+        for (int i = 0; i < numOfMessage; i++) {
+            Message<byte[]> message = reader.readNext();
+            Assert.assertEquals(message.getData(), String.format("msg num %d", 
i).getBytes());
+
+            if (i == halfMessages) {
+                midmessageToSeek = message.getMessageId();
+            }
+        }
+
+        assertFalse(reader.hasMessageAvailable());
+
+        // Perform cursor reset by MessageId to half of the topic
+        reader.seek(midmessageToSeek);
+
+        // Read all halved messages after seek()
+        for (int i = halfMessages + 1; i < numOfMessage; i++) {
+            Message<byte[]> message = reader.readNext();
+            Assert.assertEquals(message.getData(), String.format("msg num %d", 
i).getBytes());
+        }
+
+        // Reader should be finished
+        assertTrue(reader.isConnected());
+        assertFalse(reader.hasMessageAvailable());
+        assertEquals(((ReaderImpl) reader).getConsumer().numMessagesInQueue(), 
0);
+
+        reader.close();
+        producer.close();
+    }
+
+    @Test
+    public void testReaderIsAbleToSeekWithTimeOnMiddleOfTopic() throws 
Exception {
+        final String topicName = 
"persistent://my-property/my-ns/ReaderIsAbleToSeekWithTimeOnMiddleOfTopic";
+        final int numOfMessage = 10;
+        final int halfMessages = numOfMessage / 2;
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName).create();
+
+        long l = System.currentTimeMillis();
+        for (int i = 0; i < numOfMessage; i++) {
+            producer.send(String.format("msg num %d", i).getBytes());
+            Thread.sleep(100);
+        }
+
+        Reader<byte[]> reader = pulsarClient.newReader().topic(topicName)
+                .startMessageId(MessageId.earliest).create();
+
+        int plusTime = (halfMessages + 1) * 100;
+        reader.seek(l + plusTime);
+
+        for (int i = halfMessages; i < numOfMessage; i++) {
+            Message<byte[]> message = reader.readNext();
+            Assert.assertEquals(message.getData(), String.format("msg num %d", 
i).getBytes());
+        }
+
+        reader.close();
+        producer.close();
+    }
 }
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java
index aa22cb6..c2b593b 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java
@@ -116,4 +116,63 @@ public interface Reader<T> extends Closeable {
      * @return Whether the reader is connected to the broker
      */
     boolean isConnected();
+
+    /**
+     * Reset the subscription associated with this reader to a specific 
message id.
+     * <p>
+     *
+     * The message id can either be a specific message or represent the first 
or last messages in the topic.
+     * <p>
+     * <ul>
+     * <li><code>MessageId.earliest</code> : Reset the reader on the earliest 
message available in the topic
+     * <li><code>MessageId.latest</code> : Reset the reader on the latest 
message in the topic
+     * </ul>
+     *
+     * Note: this operation can only be done on non-partitioned topics. For 
these, one can rather perform the seek() on
+     * the individual partitions.
+     *
+     * @param messageId the message id where to reposition the reader
+     */
+    void seek(MessageId messageId) throws PulsarClientException;
+
+    /**
+     * Reset the subscription associated with this reader to a specific 
message publish time.
+     *
+     * Note: this operation can only be done on non-partitioned topics. For 
these, one can rather perform the seek() on
+     * the individual partitions.
+     *
+     * @param timestamp the message publish time where to reposition the reader
+     */
+    void seek(long timestamp) throws PulsarClientException;
+
+    /**
+     * Reset the subscription associated with this reader to a specific 
message id.
+     * <p>
+     *
+     * The message id can either be a specific message or represent the first 
or last messages in the topic.
+     * <p>
+     * <ul>
+     * <li><code>MessageId.earliest</code> : Reset the reader on the earliest 
message available in the topic
+     * <li><code>MessageId.latest</code> : Reset the reader on the latest 
message in the topic
+     * </ul>
+     *
+     * Note: this operation can only be done on non-partitioned topics. For 
these, one can rather perform the seek() on
+     * the individual partitions.
+     *
+     * @param messageId the message id where to position the reader
+     * @return a future to track the completion of the seek operation
+     */
+    CompletableFuture<Void> seekAsync(MessageId messageId);
+
+    /**
+     * Reset the subscription associated with this reader to a specific 
message publish time.
+     *
+     * Note: this operation can only be done on non-partitioned topics. For 
these, one can rather perform the seek() on
+     * the individual partitions.
+     *
+     * @param timestamp
+     *            the message publish time where to position the reader
+     * @return a future to track the completion of the seek operation
+     */
+    CompletableFuture<Void> seekAsync(long timestamp);
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
index 195a9bb..93600ec 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
@@ -36,4 +36,5 @@ public interface AcknowledgmentsGroupingTracker extends 
AutoCloseable {
     @Override
     void close();
 
+    void flushAndClean();
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 9b24d35..41eaa6d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1377,6 +1377,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
         cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
             log.info("[{}][{}] Successfully reset subscription to publish time 
{}", topic, subscription, timestamp);
+            acknowledgmentsGroupingTracker.flushAndClean();
+            lastDequeuedMessage = MessageId.earliest;
+            incomingMessages.clear();
             seekFuture.complete(null);
         }).exceptionally(e -> {
             log.error("[{}][{}] Failed to reset subscription: {}", topic, 
subscription, e.getCause().getMessage());
@@ -1408,6 +1411,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
         cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
             log.info("[{}][{}] Successfully reset subscription to message id 
{}", topic, subscription, messageId);
+            acknowledgmentsGroupingTracker.flushAndClean();
+            lastDequeuedMessage = messageId;
+            incomingMessages.clear();
             seekFuture.complete(null);
         }).exceptionally(e -> {
             log.error("[{}][{}] Failed to reset subscription: {}", topic, 
subscription, e.getCause().getMessage());
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java
index fbf6d03..3c987d9 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NonPersistentAcknowledgmentGroupingTracker.java
@@ -54,4 +54,9 @@ public class NonPersistentAcknowledgmentGroupingTracker 
implements Acknowledgmen
     public void close() {
         // no-op
     }
+
+    @Override
+    public void flushAndClean() {
+        // no-op
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index c79d24f..b40cce4 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -208,6 +208,13 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
     }
 
     @Override
+    public void flushAndClean() {
+        flush();
+        lastCumulativeAck = (MessageIdImpl) MessageId.earliest;
+        pendingIndividualAcks.clear();
+    }
+
+    @Override
     public void close() {
         flush();
         if (scheduledTask != null) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 08ca1ed..26a2ad7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -25,14 +25,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageListener;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.client.api.ReaderListener;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.*;
 import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
@@ -156,4 +149,24 @@ public class ReaderImpl<T> implements Reader<T> {
     public boolean isConnected() {
         return consumer.isConnected();
     }
+
+    @Override
+    public void seek(MessageId messageId) throws PulsarClientException {
+        consumer.seek(messageId);
+    }
+
+    @Override
+    public void seek(long timestamp) throws PulsarClientException {
+        consumer.seek(timestamp);
+    }
+
+    @Override
+    public CompletableFuture<Void> seekAsync(MessageId messageId) {
+        return consumer.seekAsync(messageId);
+    }
+
+    @Override
+    public CompletableFuture<Void> seekAsync(long timestamp) {
+        return consumer.seekAsync(timestamp);
+    }
 }

Reply via email to