This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 86c69e86db65c0e1d54a89dba41a6a2879b29767 Author: lipenghui <peng...@apache.org> AuthorDate: Tue Jun 16 09:20:18 2020 +0800 Avoid introduce null read position for the managed cursor. (#7264) ### Motivation Avoid introduce null read position for the managed cursor. Here is the error log related to null read position: ``` 18:52:13.366 [pulsar-stats-updater-23-1] ERROR org.apache.pulsar.broker.service.persistent.PersistentTopic - Got exception when creating consumer stats for subscription itom-di-dp-preload_chotest_2-reader-4bd4e3dd50: null java.lang.NullPointerException: null at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:877) ~[com.google.guava-guava-25.1-jre.jar:?] at org.apache.bookkeeper.mledger.impl.PositionImpl.compareTo(PositionImpl.java:92) ~[org.apache.pulsar-managed-ledger-2.5.2.jar:2.5.2] at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.getNumberOfEntriesSinceFirstNotAckedMessage(ManagedCursorImpl.java:721) ~[org.apache.pulsar-managed-ledger-2.5.2.jar:2.5.2] at org.apache.pulsar.broker.service.persistent.PersistentSubscription.getNumberOfEntriesSinceFirstNotAckedMessage(PersistentSubscription.java:790) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2] at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$updateRates$46(PersistentTopic.java:1419) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2] at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2] at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2] at org.apache.pulsar.broker.service.persistent.PersistentTopic.updateRates(PersistentTopic.java:1387) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2] at org.apache.pulsar.broker.service.PulsarStats.lambda$null$1(PulsarStats.java:134) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2] at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2] at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2] at org.apache.pulsar.broker.service.PulsarStats.lambda$null$3(PulsarStats.java:131) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2] at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2] at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2] at org.apache.pulsar.broker.service.PulsarStats.lambda$updateStats$4(PulsarStats.java:120) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2] at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2] at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2] at org.apache.pulsar.broker.service.PulsarStats.updateStats(PulsarStats.java:110) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2] at org.apache.pulsar.broker.service.BrokerService.updateRates(BrokerService.java:1145) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2] at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [org.apache.pulsar-managed-ledger-2.5.2.jar:2.5.2] at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_242] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_242] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_242] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_242] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_242] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_242] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_242] ``` The most doubtful thing is `getNextValidPosition` method in the ManagedLedgerImpl. If given a position which greater than the last add position, it will return a null value. This may cause the read position to become null. But I haven’t found how this situation appears. So in the PR, I added a log and print the stack trace which can help us to find the root cause and failback to the next position of the last position if the null next valid position occurs. (cherry picked from commit 7955cef6c5dff2f22cfc91e53d1d29562f232846) --- .../apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 2 +- .../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 13 ++++++++++++- .../apache/bookkeeper/mledger/impl/ManagedLedgerTest.java | 2 ++ .../pulsar/broker/service/persistent/PersistentTopic.java | 2 +- 4 files changed, 16 insertions(+), 3 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 304429d..66e977f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -780,7 +780,7 @@ public class ManagedCursorImpl implements ManagedCursor { // validate it before preparing range PositionImpl markDeletePosition = this.markDeletePosition; PositionImpl readPosition = this.readPosition; - return (markDeletePosition.compareTo(readPosition) < 0) + return (markDeletePosition != null && readPosition != null && markDeletePosition.compareTo(readPosition) < 0) ? ledger.getNumberOfEntries(Range.openClosed(markDeletePosition, readPosition)) : 0; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 4b3937f..27d8848 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2864,11 +2864,22 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } public PositionImpl getNextValidPosition(final PositionImpl position) { + PositionImpl next; + try { + next = getNextValidPositionInternal(position); + } catch (NullPointerException e) { + next = lastConfirmedEntry.getNext(); + log.error("[{}] Can't find next valid position, fail back to the next position of the last position.", name, e); + } + return next; + } + + public PositionImpl getNextValidPositionInternal(final PositionImpl position) { PositionImpl nextPosition = position.getNext(); while (!isValidPosition(nextPosition)) { Long nextLedgerId = ledgers.ceilingKey(nextPosition.getLedgerId() + 1); if (nextLedgerId == null) { - return null; + throw new NullPointerException(); } nextPosition = PositionImpl.get(nextLedgerId, 0); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index b5b702f..e507c99 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -1966,6 +1966,8 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { assertEquals(ledger.getNextValidPosition((PositionImpl) c1.getMarkDeletedPosition()), p1); assertEquals(ledger.getNextValidPosition(p1), p2); assertEquals(ledger.getNextValidPosition(p3), PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1)); + assertEquals(ledger.getNextValidPosition(PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1)), PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1)); + assertEquals(ledger.getNextValidPosition(PositionImpl.get(p3.getLedgerId() + 1, p3.getEntryId() + 1)), PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1)); } /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index a7fa72d..d7fcd41 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -624,7 +624,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal future.completeExceptionally(e); } }).exceptionally(ex -> { - log.warn("[{}] Failed to create subscription: {} error: {}", topic, subscriptionName, ex.getMessage()); + log.error("[{}] Failed to create subscription: {} error: {}", topic, subscriptionName, ex); USAGE_COUNT_UPDATER.decrementAndGet(PersistentTopic.this); future.completeExceptionally(new PersistenceException(ex)); return null;