BewareMyPower commented on a change in pull request #10706: URL: https://github.com/apache/pulsar/pull/10706#discussion_r639357679
########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java ########## @@ -80,31 +80,47 @@ public void onManagedLedgerPropertiesInitialize(Map<String, String> propertiesMa } @Override - public void onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) { - LedgerEntries ledgerEntries = null; - try { - for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) { - if (interceptor instanceof AppendIndexMetadataInterceptor && lh.getLastAddConfirmed() >= 0) { - ledgerEntries = - lh.read(lh.getLastAddConfirmed(), lh.getLastAddConfirmed()); - for (LedgerEntry entry : ledgerEntries) { - BrokerEntryMetadata brokerEntryMetadata = - Commands.parseBrokerEntryMetadataIfExist(entry.getEntryBuffer()); - if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) { - ((AppendIndexMetadataInterceptor) interceptor) - .recoveryIndexGenerator(brokerEntryMetadata.getIndex()); + public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) { + CompletableFuture<Void> promise = new CompletableFuture<>(); + boolean hasAppendIndexMetadataInterceptor = brokerEntryMetadataInterceptors.stream() + .anyMatch(interceptor -> interceptor instanceof AppendIndexMetadataInterceptor); + if (hasAppendIndexMetadataInterceptor && lh.getLastAddConfirmed() >= 0) { + lh.readAsync(lh.getLastAddConfirmed(), lh.getLastAddConfirmed()).whenComplete((entries, ex) -> { + if (ex != null) { + log.error("[{}] Read last entry error.", name, ex); + promise.completeExceptionally(ex); + } else { + if (entries != null) { + try { + LedgerEntry ledgerEntry = entries.getEntry(lh.getLastAddConfirmed()); + if (ledgerEntry != null) { + BrokerEntryMetadata brokerEntryMetadata = + Commands.parseBrokerEntryMetadataIfExist(ledgerEntry.getEntryBuffer()); + for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) { + if (interceptor instanceof AppendIndexMetadataInterceptor) { + if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) { + ((AppendIndexMetadataInterceptor) interceptor) + .recoveryIndexGenerator(brokerEntryMetadata.getIndex()); + } + } + } + } + entries.close(); + promise.complete(null); + } catch (Exception e) { + log.error("[{}] Failed to recover the index generator from the last add confirmed entry.", + name, e); + promise.completeExceptionally(e); } + } else { + promise.complete(null); } Review comment: When does `ReadHandle#readAsync` returns a null entries? Is it an exceptional case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org