BewareMyPower commented on a change in pull request #10706:
URL: https://github.com/apache/pulsar/pull/10706#discussion_r639357679



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
##########
@@ -80,31 +80,47 @@ public void onManagedLedgerPropertiesInitialize(Map<String, 
String> propertiesMa
     }
 
     @Override
-    public void onManagedLedgerLastLedgerInitialize(String name, LedgerHandle 
lh) {
-        LedgerEntries ledgerEntries = null;
-        try {
-            for (BrokerEntryMetadataInterceptor interceptor : 
brokerEntryMetadataInterceptors) {
-                if (interceptor instanceof AppendIndexMetadataInterceptor && 
lh.getLastAddConfirmed() >= 0) {
-                    ledgerEntries =
-                            lh.read(lh.getLastAddConfirmed(), 
lh.getLastAddConfirmed());
-                    for (LedgerEntry entry : ledgerEntries) {
-                        BrokerEntryMetadata brokerEntryMetadata =
-                                
Commands.parseBrokerEntryMetadataIfExist(entry.getEntryBuffer());
-                        if (brokerEntryMetadata != null && 
brokerEntryMetadata.hasIndex()) {
-                            ((AppendIndexMetadataInterceptor) interceptor)
-                                    
.recoveryIndexGenerator(brokerEntryMetadata.getIndex());
+    public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String 
name, LedgerHandle lh) {
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        boolean hasAppendIndexMetadataInterceptor = 
brokerEntryMetadataInterceptors.stream()
+                .anyMatch(interceptor -> interceptor instanceof 
AppendIndexMetadataInterceptor);
+        if (hasAppendIndexMetadataInterceptor && lh.getLastAddConfirmed() >= 
0) {
+            lh.readAsync(lh.getLastAddConfirmed(), 
lh.getLastAddConfirmed()).whenComplete((entries, ex) -> {
+                if (ex != null) {
+                    log.error("[{}] Read last entry error.", name, ex);
+                    promise.completeExceptionally(ex);
+                } else {
+                    if (entries != null) {
+                        try {
+                            LedgerEntry ledgerEntry = 
entries.getEntry(lh.getLastAddConfirmed());
+                            if (ledgerEntry != null) {
+                                BrokerEntryMetadata brokerEntryMetadata =
+                                        
Commands.parseBrokerEntryMetadataIfExist(ledgerEntry.getEntryBuffer());
+                                for (BrokerEntryMetadataInterceptor 
interceptor : brokerEntryMetadataInterceptors) {
+                                    if (interceptor instanceof 
AppendIndexMetadataInterceptor) {
+                                        if (brokerEntryMetadata != null && 
brokerEntryMetadata.hasIndex()) {
+                                            ((AppendIndexMetadataInterceptor) 
interceptor)
+                                                    
.recoveryIndexGenerator(brokerEntryMetadata.getIndex());
+                                        }
+                                    }
+                                }
+                            }
+                            entries.close();
+                            promise.complete(null);
+                        } catch (Exception e) {
+                            log.error("[{}] Failed to recover the index 
generator from the last add confirmed entry.",
+                                    name, e);
+                            promise.completeExceptionally(e);
                         }
+                    } else {
+                        promise.complete(null);
                     }

Review comment:
       When does `ReadHandle#readAsync` returns a null entries? Is it an 
exceptional case?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to