AnonHxy commented on code in PR #18265:
URL: https://github.com/apache/pulsar/pull/18265#discussion_r1023716092


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java:
##########
@@ -18,40 +18,365 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
+import static 
org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
+import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.AsyncCallback;
+import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.metadata.api.Stat;
 
 /**
  * Working in progress until <a 
href="https://github.com/apache/pulsar/issues/16153";>PIP-180</a> is finished.
- * Currently, it works nothing different with ManagedLedgerImpl.
  */
 @Slf4j
 public class ShadowManagedLedgerImpl extends ManagedLedgerImpl {
 
     private final TopicName shadowSource;
     private final String sourceMLName;
+    private volatile Stat sourceLedgersStat;

Review Comment:
   It seems that this variable is unused



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java:
##########
@@ -335,7 +348,7 @@ public TransportCnx getCnx() {
         return this.cnx;
     }
 
-    private static final class MessagePublishContext implements 
PublishContext, Runnable {
+    private static final class MessagePublishContext implements 
PublishContext, Runnable, Position {

Review Comment:
   OK. I see. Should we write some comments to explain this?



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java:
##########
@@ -18,40 +18,365 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
+import static 
org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
+import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.AsyncCallback;
+import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.metadata.api.Stat;
 
 /**
  * Working in progress until <a 
href="https://github.com/apache/pulsar/issues/16153";>PIP-180</a> is finished.
- * Currently, it works nothing different with ManagedLedgerImpl.
  */
 @Slf4j
 public class ShadowManagedLedgerImpl extends ManagedLedgerImpl {
 
     private final TopicName shadowSource;
     private final String sourceMLName;
+    private volatile Stat sourceLedgersStat;
 
     public ShadowManagedLedgerImpl(ManagedLedgerFactoryImpl factory, 
BookKeeper bookKeeper,
                                    MetaStore store, ManagedLedgerConfig config,
                                    OrderedScheduler scheduledExecutor,
                                    String name, final Supplier<Boolean> 
mlOwnershipChecker) {
         super(factory, bookKeeper, store, config, scheduledExecutor, name, 
mlOwnershipChecker);
-        this.shadowSource = TopicName.get(config.getShadowSource());
-        this.sourceMLName = shadowSource.getPersistenceNamingEncoding();
+        if (config.getTopicName().isPartitioned() && 
TopicName.getPartitionIndex(config.getShadowSource()) == -1) {
+            this.shadowSource =
+                    
TopicName.get(config.getShadowSource()).getPartition(config.getTopicName().getPartitionIndex());
+        } else {
+            this.shadowSource = TopicName.get(config.getShadowSource());
+        }
+        this.sourceMLName =
+                shadowSource.getPersistenceNamingEncoding();
     }
 
+    /**
+     * ShadowManagedLedger init steps:
+     * 1. this.initialize : read source managedLedgerInfo
+     * 2. super.initialize : read its own read source managedLedgerInfo
+     * 3. this.initializeBookKeeper
+     * 4. super.initializeCursors
+     * @param callback
+     * @param ctx
+     */
     @Override
     synchronized void initialize(ManagedLedgerInitializeLedgerCallback 
callback, Object ctx) {
-        // TODO: ShadowManagedLedger has different initialize process from 
normal ManagedLedger,
-        //  which is complicated and will be implemented in the next PRs.
-        super.initialize(callback, ctx);
+        log.info("Opening shadow managed ledger {} with source={}", name, 
sourceMLName);
+
+        executor.executeOrdered(name, safeRun(() -> doInitialize(callback, 
ctx)));
+    }
+
+    private void doInitialize(ManagedLedgerInitializeLedgerCallback callback, 
Object ctx) {
+        // Fetch the list of existing ledgers in the source managed ledger
+        store.watchManagedLedgerInfo(sourceMLName, (managedLedgerInfo, stat) ->
+                executor.executeOrdered(name, safeRun(() -> 
processSourceManagedLedgerInfo(managedLedgerInfo, stat)))
+        );
+        store.getManagedLedgerInfo(sourceMLName, false, null, new 
MetaStore.MetaStoreCallback<>() {
+            @Override
+            public void operationComplete(MLDataFormats.ManagedLedgerInfo 
mlInfo, Stat stat) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}][{}] Source ML info:{}", name, 
sourceMLName, mlInfo);
+                }
+                sourceLedgersStat = stat;
+                // Fails if init with empty ledger. Very small chance here, 
since shadow topic is
+                // created when source topic exists.
+                if (mlInfo.getLedgerInfoCount() == 0) {
+                    log.warn("[{}] Source topic ledger list is empty! 
source={},mlInfo={},stat={}", name, sourceMLName,
+                            mlInfo, stat);
+//                    callback.initializeFailed(new 
ManagedLedgerException.ManagedLedgerSourceNotReadyException(

Review Comment:
   Cleanup this commented code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to