This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 372575a  [issue #3975] Bugfix NPE on non durable consumer (#3988)
372575a is described below

commit 372575a9877bf50e8f55a9568ef6c07fcae86644
Author: Ezequiel Lovelle <ezequiellove...@gmail.com>
AuthorDate: Mon Apr 8 23:01:19 2019 -0300

    [issue #3975] Bugfix NPE on non durable consumer (#3988)
    
    *Motivation*
    
    Trying to fix #3975
    
    When a reset of a cursor is performed with some timestamp on a non-durable
    consumer the message finder will fail with null pointer exception due to
    `cursor.getName()` being null.
    
    *Modifications*
    
      - Add method overloading for `newNonDurableCursor()` with subscription 
name.
      - Fix method getNonDurableSubscription to call `newNonDurableCursor()` 
with
        proper subscription name
      - Add test to assert issue.
---
 .../apache/bookkeeper/mledger/ManagedLedger.java   |  1 +
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 10 ++++++++
 .../broker/service/persistent/PersistentTopic.java |  2 +-
 .../apache/pulsar/client/api/TopicReaderTest.java  | 28 +++++++++++++++++++++-
 4 files changed, 39 insertions(+), 2 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 00fc2d0..d51b0d8 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -193,6 +193,7 @@ public interface ManagedLedger {
      * @return the new NonDurableCursor
      */
     ManagedCursor newNonDurableCursor(Position startCursorPosition) throws 
ManagedLedgerException;
+    ManagedCursor newNonDurableCursor(Position startPosition, String 
subscriptionName) throws ManagedLedgerException;
 
     /**
      * Delete a ManagedCursor asynchronously.
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index a398162..7a4eb60 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -845,6 +845,16 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     }
 
     @Override
+    public ManagedCursor newNonDurableCursor(Position startCursorPosition, 
String cursorName)
+            throws ManagedLedgerException {
+        checkManagedLedgerIsOpen();
+        checkFenced();
+
+        return new NonDurableCursorImpl(bookKeeper, config, this, cursorName,
+                (PositionImpl) startCursorPosition);
+    }
+
+    @Override
     public Iterable<ManagedCursor> getCursors() {
         return cursors;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 1039437..e1a0509 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -641,7 +641,7 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
             Position startPosition = new PositionImpl(ledgerId, entryId);
             ManagedCursor cursor = null;
             try {
-                cursor = ledger.newNonDurableCursor(startPosition);
+                cursor = ledger.newNonDurableCursor(startPosition, 
subscriptionName);
             } catch (ManagedLedgerException e) {
                 subscriptionFuture.completeExceptionally(e);
             }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
index 6318974..c1e93aa 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java
@@ -34,7 +34,9 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.ReaderImpl;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.util.RelativeTimeUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -479,9 +481,33 @@ public class TopicReaderTest extends ProducerConsumerBase {
             assertTrue(reader.hasMessageAvailable());
 
             String readOut = new String(reader.readNext().getData());
-            assertTrue(readOut.equals(content));
+            assertEquals(content, readOut);
             assertFalse(reader.hasMessageAvailable());
         }
 
     }
+
+    @Test(timeOut = 10000)
+    public void testReaderNonDurableIsAbleToSeekRelativeTime() throws 
Exception {
+        final int numOfMessage = 10;
+        final String topicName = "persistent://my-property/my-ns/ReaderSeek";
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName).create();
+
+        for (int i = 0; i < numOfMessage; i++) {
+            producer.send(String.format("msg num %d", i).getBytes());
+        }
+
+        Reader<byte[]> reader = pulsarClient.newReader().topic(topicName)
+                .startMessageId(MessageId.earliest).create();
+        assertTrue(reader.hasMessageAvailable());
+
+        ((ReaderImpl) 
reader).getConsumer().seek(RelativeTimeUtil.parseRelativeTimeInSeconds("-1m"));
+
+        assertTrue(reader.hasMessageAvailable());
+
+        reader.close();
+        producer.close();
+    }
 }

Reply via email to