This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 53615e05cff4010ef0834f15f057b78c1eb6fc77 Author: Qiang Zhao <mattisonc...@gmail.com> AuthorDate: Tue Jun 7 21:52:05 2022 +0800 [Fix][broker] Fix NPE when ledger id not found in `OpReadEntry` (#15837) (cherry picked from commit 7a3ad611f51511afca4bcaa1de299517a1907e8e) --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 9 ++---- .../bookkeeper/mledger/impl/OpReadEntry.java | 4 +-- .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 32 ++++++++++++++++++++-- .../broker/service/MessageCumulativeAckTest.java | 15 ++++------ 4 files changed, 39 insertions(+), 21 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index d03d98a3f06..c79b8a9f8f7 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2153,14 +2153,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } } - PositionImpl startReadOperationOnLedger(PositionImpl position, OpReadEntry opReadEntry) { + PositionImpl startReadOperationOnLedger(PositionImpl position) { Long ledgerId = ledgers.ceilingKey(position.getLedgerId()); - if (null == ledgerId) { - opReadEntry.readEntriesFailed(new ManagedLedgerException.NoMoreEntriesToReadException("The ceilingKey(K key) method is used to return the " + - "least key greater than or equal to the given key, or null if there is no such key"), null); - } - - if (ledgerId != position.getLedgerId()) { + if (ledgerId != null && ledgerId != position.getLedgerId()) { // The ledger pointed by this position does not exist anymore. It was deleted because it was empty. We need // to skip on the next available ledger position = new PositionImpl(ledgerId, 0); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java index d7eb0467f56..27e99169e31 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java @@ -48,7 +48,7 @@ class OpReadEntry implements ReadEntriesCallback { public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl readPositionRef, int count, ReadEntriesCallback callback, Object ctx, PositionImpl maxPosition) { OpReadEntry op = RECYCLER.get(); - op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef, op); + op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef); op.cursor = cursor; op.count = count; op.callback = callback; @@ -140,7 +140,7 @@ class OpReadEntry implements ReadEntriesCallback { // We still have more entries to read from the next ledger, schedule a new async operation cursor.ledger.getExecutor().execute(safeRun(() -> { - readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this); + readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition); cursor.ledger.asyncReadEntries(OpReadEntry.this); })); } else { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 8826f0d99fc..317fb7e2b30 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -52,6 +52,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NavigableMap; import java.util.Optional; import java.util.Set; import java.util.UUID; @@ -408,6 +409,33 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { ledger.close(); } + @Test + public void testStartReadOperationOnLedgerWithEmptyLedgers() throws ManagedLedgerException, InterruptedException { + ManagedLedger ledger = factory.open("my_test_ledger_1"); + ManagedLedgerImpl ledgerImpl = (ManagedLedgerImpl) ledger; + NavigableMap<Long, LedgerInfo> ledgers = ledgerImpl.getLedgersInfo(); + LedgerInfo ledgerInfo = ledgers.firstEntry().getValue(); + ledgers.clear(); + ManagedCursor c1 = ledger.openCursor("c1"); + PositionImpl position = new PositionImpl(ledgerInfo.getLedgerId(), 0); + PositionImpl maxPosition = new PositionImpl(ledgerInfo.getLedgerId(), 99); + OpReadEntry opReadEntry = OpReadEntry.create((ManagedCursorImpl) c1, position, 20, + new ReadEntriesCallback() { + + @Override + public void readEntriesComplete(List<Entry> entries, Object ctx) { + + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + + } + }, null, maxPosition); + Assert.assertEquals(opReadEntry.readPosition, position); + } + + @Test(timeOut = 20000) public void spanningMultipleLedgersWithSize() throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(1000000); @@ -2262,8 +2290,8 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened); managedLedger.rollCurrentLedgerIfFull(); Awaitility.await().untilAsserted(() -> { - assertEquals(managedLedger.getLedgersInfo().size(), 2); - assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.ClosedLedger); + assertEquals(managedLedger.getLedgersInfo().size(), 3); + assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.LedgerOpened); }); assertEquals(5, managedLedger.getLedgersInfoAsList().get(0).getEntries()); assertEquals(5, managedLedger.getLedgersInfoAsList().get(1).getEntries()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java index 86754efc0c2..d45054fab79 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java @@ -25,7 +25,6 @@ import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Exclus import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Failover; import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Key_Shared; import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Shared; -import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; @@ -39,7 +38,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import java.net.InetSocketAddress; -import java.util.Optional; import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; @@ -79,7 +77,7 @@ public class MessageCumulativeAckTest { executor = OrderedExecutor.newBuilder().numThreads(1).name("persistent-dispatcher-cumulative-ack-test").build(); ServiceConfiguration svcConfig = spy(ServiceConfiguration.class); svcConfig.setBrokerShutdownTimeoutMs(0L); - svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d)); + svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(1.0d); svcConfig.setClusterName("pulsar-cluster"); pulsar = spyWithClassAndConstructorArgs(PulsarService.class, svcConfig); doReturn(svcConfig).when(pulsar).getConfiguration(); @@ -89,7 +87,7 @@ public class MessageCumulativeAckTest { doReturn(TransactionTestBase.createMockBookKeeper(executor)) .when(pulsar).getBookKeeperClient(); - store = MetadataStoreFactory.create("memory:local", MetadataStoreConfig.builder().build()); + store = MetadataStoreFactory.create("memory://local", MetadataStoreConfig.builder().build()); doReturn(store).when(pulsar).getLocalMetadataStore(); doReturn(store).when(pulsar).getConfigurationMetadataStore(); @@ -154,8 +152,7 @@ public class MessageCumulativeAckTest { @Test(timeOut = 5000, dataProvider = "individualAckModes") public void testAckWithIndividualAckMode(CommandSubscribe.SubType subType) throws Exception { Consumer consumer = new Consumer(sub, subType, "topic-1", consumerId, 0, - "Cons1", true, serverCnx, "myrole-1", emptyMap(), false, null, - MessageId.latest, DEFAULT_CONSUMER_EPOCH); + "Cons1", 50000, serverCnx, "myrole-1", emptyMap(), false, CommandSubscribe.InitialPosition.Latest, null, MessageId.latest); CommandAck commandAck = new CommandAck(); commandAck.setAckType(Cumulative); @@ -169,8 +166,7 @@ public class MessageCumulativeAckTest { @Test(timeOut = 5000, dataProvider = "notIndividualAckModes") public void testAckWithNotIndividualAckMode(CommandSubscribe.SubType subType) throws Exception { Consumer consumer = new Consumer(sub, subType, "topic-1", consumerId, 0, - "Cons1", true, serverCnx, "myrole-1", emptyMap(), false, null, - MessageId.latest, DEFAULT_CONSUMER_EPOCH); + "Cons1", 50000, serverCnx, "myrole-1", emptyMap(), false, CommandSubscribe.InitialPosition.Latest, null, MessageId.latest); CommandAck commandAck = new CommandAck(); commandAck.setAckType(Cumulative); @@ -184,8 +180,7 @@ public class MessageCumulativeAckTest { @Test(timeOut = 5000) public void testAckWithMoreThanNoneMessageIds() throws Exception { Consumer consumer = new Consumer(sub, Failover, "topic-1", consumerId, 0, - "Cons1", true, serverCnx, "myrole-1", emptyMap(), false, null, - MessageId.latest, DEFAULT_CONSUMER_EPOCH); + "Cons1", 50000, serverCnx, "myrole-1", emptyMap(), false, CommandSubscribe.InitialPosition.Latest, null, MessageId.latest); CommandAck commandAck = new CommandAck(); commandAck.setAckType(Cumulative);