BewareMyPower commented on a change in pull request #10706:
URL: https://github.com/apache/pulsar/pull/10706#discussion_r639361210



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
##########
@@ -80,31 +80,47 @@ public void onManagedLedgerPropertiesInitialize(Map<String, 
String> propertiesMa
     }
 
     @Override
-    public void onManagedLedgerLastLedgerInitialize(String name, LedgerHandle 
lh) {
-        LedgerEntries ledgerEntries = null;
-        try {
-            for (BrokerEntryMetadataInterceptor interceptor : 
brokerEntryMetadataInterceptors) {
-                if (interceptor instanceof AppendIndexMetadataInterceptor && 
lh.getLastAddConfirmed() >= 0) {
-                    ledgerEntries =
-                            lh.read(lh.getLastAddConfirmed(), 
lh.getLastAddConfirmed());
-                    for (LedgerEntry entry : ledgerEntries) {
-                        BrokerEntryMetadata brokerEntryMetadata =
-                                
Commands.parseBrokerEntryMetadataIfExist(entry.getEntryBuffer());
-                        if (brokerEntryMetadata != null && 
brokerEntryMetadata.hasIndex()) {
-                            ((AppendIndexMetadataInterceptor) interceptor)
-                                    
.recoveryIndexGenerator(brokerEntryMetadata.getIndex());
+    public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String 
name, LedgerHandle lh) {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        boolean hasAppendIndexMetadataInterceptor = 
brokerEntryMetadataInterceptors.stream()
+                .anyMatch(interceptor -> interceptor instanceof 
AppendIndexMetadataInterceptor);
+        if (hasAppendIndexMetadataInterceptor && lh.getLastAddConfirmed() >= 
0) {
+            lh.readAsync(lh.getLastAddConfirmed(), 
lh.getLastAddConfirmed()).whenComplete((entries, ex) -> {
+                if (ex != null) {
+                    log.error("[{}] Read last entry error.", name, ex);
+                    promise.completeExceptionally(ex);
+                } else {
+                    if (entries != null) {
+                        try {
+                            LedgerEntry ledgerEntry = 
entries.getEntry(lh.getLastAddConfirmed());
+                            if (ledgerEntry != null) {
+                                BrokerEntryMetadata brokerEntryMetadata =
+                                        
Commands.parseBrokerEntryMetadataIfExist(ledgerEntry.getEntryBuffer());
+                                for (BrokerEntryMetadataInterceptor 
interceptor : brokerEntryMetadataInterceptors) {
+                                    if (interceptor instanceof 
AppendIndexMetadataInterceptor) {
+                                        if (brokerEntryMetadata != null && 
brokerEntryMetadata.hasIndex()) {
+                                            ((AppendIndexMetadataInterceptor) 
interceptor)
+                                                    
.recoveryIndexGenerator(brokerEntryMetadata.getIndex());
+                                        }
+                                    }
+                                }
+                            }
+                            entries.close();
+                            promise.complete(null);
+                        } catch (Exception e) {
+                            log.error("[{}] Failed to recover the index 
generator from the last add confirmed entry.",
+                                    name, e);
+                            promise.completeExceptionally(e);
                         }
+                    } else {
+                        promise.complete(null);
                     }

Review comment:
       After reading BK side code, the `entries` seems never to be null. Though 
checking null here doesn't affect.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to