codelipenghui commented on code in PR #22034: URL: https://github.com/apache/pulsar/pull/22034#discussion_r1493743281
########## managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java: ########## @@ -2667,7 +2664,8 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) { ledgers.headMap(slowestReaderLedgerId, false).values().iterator(); while (ledgerInfoIterator.hasNext()){ LedgerInfo ls = ledgerInfoIterator.next(); - // currentLedger can not be deleted + // If the current ledger is closed, the new ledger will be created later and this current ledger will + // be deleted next time. Review Comment: I don't think we need to change the description here. ########## managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java: ########## @@ -1754,10 +1754,7 @@ synchronized void ledgerClosed(final LedgerHandle lh) { maybeOffloadInBackground(NULL_OFFLOAD_PROMISE); - if (!pendingAddEntries.isEmpty()) { - // Need to create a new ledger to write pending entries - createLedgerAfterClosed(); - } + createLedgerAfterClosed(); Review Comment: We'd better also remove the other places that call `createLedgerAfterClosed` directly from the ManagedLedgerImpl.java - rollCurrentLedgerIfFull - checkInactiveLedgerAndRollOver The solution introduced a behavior change in the broker. But it looks make sense. All the other places (managed ledger initialization, the task for checking the maximum rollover time ...) are proactively create a new ledger if the current ledger is closed except the ledger rollover by the entry size and entry count. With this solution we can get a better publish latency for the newly published message to a new Ledger (It requires to create a Ledger first and write the message to the Ledger). If we want to move to lazy ledger creation to reduce the ledgers count. We also need to apply the lazy creation to the managed ledger initialization and other places. It could be different case with this fix. ########## managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java: ########## @@ -4232,4 +4232,24 @@ public void testNoCleanupOffloadLedgerWhenMetadataExceptionHappens() throws Exce verify(ledgerOffloader, times(0)) .deleteOffloaded(eq(ledgerInfo.getLedgerId()), any(), anyMap()); } + + @Test + public void testDeleteCurrentLedgerWhenItIsClosed() throws Exception { + ManagedLedgerConfig config = spy(new ManagedLedgerConfig()); + ManagedLedgerImpl ml = spy((ManagedLedgerImpl) factory.open("testDeleteCurrentLedgerWhenItIsClosed", config)); + assertEquals(ml.ledgers.size(), 1); + ml.config.setMaximumRolloverTime(10, TimeUnit.MILLISECONDS); + Thread.sleep(10); + ml.addEntry(new byte[4]); + ml.internalTrimLedgers(false, Futures.NULL_PROMISE); + Awaitility.await().untilAsserted(() -> { + assertEquals(ml.state, ManagedLedgerImpl.State.ClosedLedger); + assertEquals(ml.ledgers.size(), 0); Review Comment: And the current ledger should not be removed? ########## managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java: ########## @@ -4232,4 +4232,24 @@ public void testNoCleanupOffloadLedgerWhenMetadataExceptionHappens() throws Exce verify(ledgerOffloader, times(0)) .deleteOffloaded(eq(ledgerInfo.getLedgerId()), any(), anyMap()); } + + @Test + public void testDeleteCurrentLedgerWhenItIsClosed() throws Exception { + ManagedLedgerConfig config = spy(new ManagedLedgerConfig()); + ManagedLedgerImpl ml = spy((ManagedLedgerImpl) factory.open("testDeleteCurrentLedgerWhenItIsClosed", config)); + assertEquals(ml.ledgers.size(), 1); + ml.config.setMaximumRolloverTime(10, TimeUnit.MILLISECONDS); Review Comment: It looks the change will not effect the ledger rollover by maximum rollover time, right? ########## managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java: ########## @@ -4232,4 +4232,24 @@ public void testNoCleanupOffloadLedgerWhenMetadataExceptionHappens() throws Exce verify(ledgerOffloader, times(0)) .deleteOffloaded(eq(ledgerInfo.getLedgerId()), any(), anyMap()); } + + @Test + public void testDeleteCurrentLedgerWhenItIsClosed() throws Exception { + ManagedLedgerConfig config = spy(new ManagedLedgerConfig()); + ManagedLedgerImpl ml = spy((ManagedLedgerImpl) factory.open("testDeleteCurrentLedgerWhenItIsClosed", config)); + assertEquals(ml.ledgers.size(), 1); + ml.config.setMaximumRolloverTime(10, TimeUnit.MILLISECONDS); + Thread.sleep(10); + ml.addEntry(new byte[4]); + ml.internalTrimLedgers(false, Futures.NULL_PROMISE); + Awaitility.await().untilAsserted(() -> { + assertEquals(ml.state, ManagedLedgerImpl.State.ClosedLedger); Review Comment: It should be wrong. A new ledger will be created after the rollover. ########## managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java: ########## @@ -4232,4 +4232,24 @@ public void testNoCleanupOffloadLedgerWhenMetadataExceptionHappens() throws Exce verify(ledgerOffloader, times(0)) .deleteOffloaded(eq(ledgerInfo.getLedgerId()), any(), anyMap()); } + + @Test + public void testDeleteCurrentLedgerWhenItIsClosed() throws Exception { + ManagedLedgerConfig config = spy(new ManagedLedgerConfig()); + ManagedLedgerImpl ml = spy((ManagedLedgerImpl) factory.open("testDeleteCurrentLedgerWhenItIsClosed", config)); + assertEquals(ml.ledgers.size(), 1); + ml.config.setMaximumRolloverTime(10, TimeUnit.MILLISECONDS); + Thread.sleep(10); + ml.addEntry(new byte[4]); + ml.internalTrimLedgers(false, Futures.NULL_PROMISE); + Awaitility.await().untilAsserted(() -> { + assertEquals(ml.state, ManagedLedgerImpl.State.ClosedLedger); + assertEquals(ml.ledgers.size(), 0); Review Comment: I guess @liangyepianzhou you need to refine your test to have a clear scope of what is tested in this method. ########## managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java: ########## @@ -4232,4 +4232,24 @@ public void testNoCleanupOffloadLedgerWhenMetadataExceptionHappens() throws Exce verify(ledgerOffloader, times(0)) .deleteOffloaded(eq(ledgerInfo.getLedgerId()), any(), anyMap()); } + + @Test + public void testDeleteCurrentLedgerWhenItIsClosed() throws Exception { + ManagedLedgerConfig config = spy(new ManagedLedgerConfig()); + ManagedLedgerImpl ml = spy((ManagedLedgerImpl) factory.open("testDeleteCurrentLedgerWhenItIsClosed", config)); Review Comment: Any reason to use `spy()` here? You can set any configurations you want via a new ManagedLedgerConfig and create a new Ledger with the config instance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org