This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 372575a [issue #3975] Bugfix NPE on non durable consumer (#3988) 372575a is described below commit 372575a9877bf50e8f55a9568ef6c07fcae86644 Author: Ezequiel Lovelle <ezequiellove...@gmail.com> AuthorDate: Mon Apr 8 23:01:19 2019 -0300 [issue #3975] Bugfix NPE on non durable consumer (#3988) *Motivation* Trying to fix #3975 When a reset of a cursor is performed with some timestamp on a non-durable consumer the message finder will fail with null pointer exception due to `cursor.getName()` being null. *Modifications* - Add method overloading for `newNonDurableCursor()` with subscription name. - Fix method getNonDurableSubscription to call `newNonDurableCursor()` with proper subscription name - Add test to assert issue. --- .../apache/bookkeeper/mledger/ManagedLedger.java | 1 + .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 10 ++++++++ .../broker/service/persistent/PersistentTopic.java | 2 +- .../apache/pulsar/client/api/TopicReaderTest.java | 28 +++++++++++++++++++++- 4 files changed, 39 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index 00fc2d0..d51b0d8 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -193,6 +193,7 @@ public interface ManagedLedger { * @return the new NonDurableCursor */ ManagedCursor newNonDurableCursor(Position startCursorPosition) throws ManagedLedgerException; + ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName) throws ManagedLedgerException; /** * Delete a ManagedCursor asynchronously. diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index a398162..7a4eb60 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -845,6 +845,16 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } @Override + public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cursorName) + throws ManagedLedgerException { + checkManagedLedgerIsOpen(); + checkFenced(); + + return new NonDurableCursorImpl(bookKeeper, config, this, cursorName, + (PositionImpl) startCursorPosition); + } + + @Override public Iterable<ManagedCursor> getCursors() { return cursors; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 1039437..e1a0509 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -641,7 +641,7 @@ public class PersistentTopic implements Topic, AddEntryCallback { Position startPosition = new PositionImpl(ledgerId, entryId); ManagedCursor cursor = null; try { - cursor = ledger.newNonDurableCursor(startPosition); + cursor = ledger.newNonDurableCursor(startPosition, subscriptionName); } catch (ManagedLedgerException e) { subscriptionFuture.completeExceptionally(e); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java index 6318974..c1e93aa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java @@ -34,7 +34,9 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.ReaderImpl; import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.pulsar.common.util.RelativeTimeUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -479,9 +481,33 @@ public class TopicReaderTest extends ProducerConsumerBase { assertTrue(reader.hasMessageAvailable()); String readOut = new String(reader.readNext().getData()); - assertTrue(readOut.equals(content)); + assertEquals(content, readOut); assertFalse(reader.hasMessageAvailable()); } } + + @Test(timeOut = 10000) + public void testReaderNonDurableIsAbleToSeekRelativeTime() throws Exception { + final int numOfMessage = 10; + final String topicName = "persistent://my-property/my-ns/ReaderSeek"; + + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName).create(); + + for (int i = 0; i < numOfMessage; i++) { + producer.send(String.format("msg num %d", i).getBytes()); + } + + Reader<byte[]> reader = pulsarClient.newReader().topic(topicName) + .startMessageId(MessageId.earliest).create(); + assertTrue(reader.hasMessageAvailable()); + + ((ReaderImpl) reader).getConsumer().seek(RelativeTimeUtil.parseRelativeTimeInSeconds("-1m")); + + assertTrue(reader.hasMessageAvailable()); + + reader.close(); + producer.close(); + } }