codelipenghui commented on code in PR #18265:
URL: https://github.com/apache/pulsar/pull/18265#discussion_r1030475063


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java:
##########
@@ -18,40 +18,363 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
+import static 
org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
+import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.AsyncCallback;
+import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.metadata.api.Stat;
 
 /**
  * Working in progress until <a 
href="https://github.com/apache/pulsar/issues/16153";>PIP-180</a> is finished.
- * Currently, it works nothing different with ManagedLedgerImpl.
  */
 @Slf4j
 public class ShadowManagedLedgerImpl extends ManagedLedgerImpl {
 
     private final TopicName shadowSource;
     private final String sourceMLName;
+    private volatile Stat sourceLedgersStat;
 
     public ShadowManagedLedgerImpl(ManagedLedgerFactoryImpl factory, 
BookKeeper bookKeeper,
                                    MetaStore store, ManagedLedgerConfig config,
                                    OrderedScheduler scheduledExecutor,
                                    String name, final Supplier<Boolean> 
mlOwnershipChecker) {
         super(factory, bookKeeper, store, config, scheduledExecutor, name, 
mlOwnershipChecker);
-        this.shadowSource = TopicName.get(config.getShadowSource());
-        this.sourceMLName = shadowSource.getPersistenceNamingEncoding();
+        if (config.getTopicName().isPartitioned() && 
TopicName.getPartitionIndex(config.getShadowSource()) == -1) {

Review Comment:
   Why the `name` is not able to be used? It's a little confusing that we need 
to introduce the topic name in `ManagedLedgerConfig`
   
   Or maybe we can introduce a `properties`?



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java:
##########
@@ -18,40 +18,363 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
+import static 
org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
+import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.AsyncCallback;
+import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.metadata.api.Stat;
 
 /**
  * Working in progress until <a 
href="https://github.com/apache/pulsar/issues/16153";>PIP-180</a> is finished.
- * Currently, it works nothing different with ManagedLedgerImpl.
  */
 @Slf4j
 public class ShadowManagedLedgerImpl extends ManagedLedgerImpl {
 
     private final TopicName shadowSource;
     private final String sourceMLName;
+    private volatile Stat sourceLedgersStat;
 
     public ShadowManagedLedgerImpl(ManagedLedgerFactoryImpl factory, 
BookKeeper bookKeeper,
                                    MetaStore store, ManagedLedgerConfig config,
                                    OrderedScheduler scheduledExecutor,
                                    String name, final Supplier<Boolean> 
mlOwnershipChecker) {
         super(factory, bookKeeper, store, config, scheduledExecutor, name, 
mlOwnershipChecker);
-        this.shadowSource = TopicName.get(config.getShadowSource());
-        this.sourceMLName = shadowSource.getPersistenceNamingEncoding();
+        if (config.getTopicName().isPartitioned() && 
TopicName.getPartitionIndex(config.getShadowSource()) == -1) {
+            this.shadowSource =
+                    
TopicName.get(config.getShadowSource()).getPartition(config.getTopicName().getPartitionIndex());
+        } else {
+            this.shadowSource = TopicName.get(config.getShadowSource());
+        }
+        this.sourceMLName =
+                shadowSource.getPersistenceNamingEncoding();
     }
 
+    /**
+     * ShadowManagedLedger init steps:
+     * 1. this.initialize : read source managedLedgerInfo
+     * 2. super.initialize : read its own read source managedLedgerInfo
+     * 3. this.initializeBookKeeper
+     * 4. super.initializeCursors
+     * @param callback
+     * @param ctx
+     */
     @Override
     synchronized void initialize(ManagedLedgerInitializeLedgerCallback 
callback, Object ctx) {
-        // TODO: ShadowManagedLedger has different initialize process from 
normal ManagedLedger,
-        //  which is complicated and will be implemented in the next PRs.
-        super.initialize(callback, ctx);
+        log.info("Opening shadow managed ledger {} with source={}", name, 
sourceMLName);
+
+        executor.executeOrdered(name, safeRun(() -> doInitialize(callback, 
ctx)));
+    }
+
+    private void doInitialize(ManagedLedgerInitializeLedgerCallback callback, 
Object ctx) {
+        // Fetch the list of existing ledgers in the source managed ledger
+        store.watchManagedLedgerInfo(sourceMLName, (managedLedgerInfo, stat) ->
+                executor.executeOrdered(name, safeRun(() -> 
processSourceManagedLedgerInfo(managedLedgerInfo, stat)))
+        );
+        store.getManagedLedgerInfo(sourceMLName, false, null, new 
MetaStore.MetaStoreCallback<>() {
+            @Override
+            public void operationComplete(MLDataFormats.ManagedLedgerInfo 
mlInfo, Stat stat) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}][{}] Source ML info:{}", name, 
sourceMLName, mlInfo);
+                }
+                sourceLedgersStat = stat;
+                if (mlInfo.getLedgerInfoCount() == 0) {
+                    // Small chance here, since shadow topic is created after 
source topic exists.
+                    log.warn("[{}] Source topic ledger list is empty! 
source={},mlInfo={},stat={}", name, sourceMLName,
+                            mlInfo, stat);
+                    ShadowManagedLedgerImpl.super.initialize(callback, ctx);
+                    return;
+                }
+
+                if (mlInfo.hasTerminatedPosition()) {
+                    state = State.Terminated;
+                    lastConfirmedEntry = new 
PositionImpl(mlInfo.getTerminatedPosition());
+                    log.info("[{}][{}] Recovering managed ledger terminated at 
{}", name, sourceMLName,
+                            lastConfirmedEntry);
+                }
+
+                for (LedgerInfo ls : mlInfo.getLedgerInfoList()) {
+                    ledgers.put(ls.getLedgerId(), ls);
+                }
+
+                final long lastLedgerId = ledgers.lastKey();
+                mbean.startDataLedgerOpenOp();
+                AsyncCallback.OpenCallback opencb = (rc, lh, ctx1) -> 
executor.executeOrdered(name, safeRun(() -> {
+                    mbean.endDataLedgerOpenOp();
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Opened source ledger {}", name, 
lastLedgerId);
+                    }
+                    if (rc == BKException.Code.OK) {
+                        LedgerInfo info =
+                                LedgerInfo.newBuilder()
+                                        .setLedgerId(lastLedgerId)
+                                        .setEntries(lh.getLastAddConfirmed() + 
1)
+                                        .setSize(lh.getLength())
+                                        .setTimestamp(clock.millis()).build();
+                        ledgers.put(lastLedgerId, info);
+
+                        //Always consider the last ledger is opened in source.
+                        STATE_UPDATER.set(ShadowManagedLedgerImpl.this, 
State.LedgerOpened);
+                        currentLedger = lh;
+
+                        if (managedLedgerInterceptor != null) {
+                            
managedLedgerInterceptor.onManagedLedgerLastLedgerInitialize(name, lh)
+                                    .thenRun(() -> 
ShadowManagedLedgerImpl.super.initialize(callback, ctx))
+                                    .exceptionally(ex -> {
+                                        callback.initializeFailed(
+                                                new 
ManagedLedgerException.ManagedLedgerInterceptException(
+                                                        ex.getCause()));
+                                        return null;
+                                    });
+                        } else {
+                            ShadowManagedLedgerImpl.super.initialize(callback, 
ctx);
+                        }
+                    } else if (isNoSuchLedgerExistsException(rc)) {
+                        log.warn("[{}] Source ledger not found: {}", name, 
lastLedgerId);
+                        ledgers.remove(lastLedgerId);
+                        ShadowManagedLedgerImpl.super.initialize(callback, 
ctx);
+                    } else {
+                        log.error("[{}] Failed to open source ledger {}: {}", 
name, lastLedgerId,
+                                BKException.getMessage(rc));
+                        
callback.initializeFailed(createManagedLedgerException(rc));
+                    }
+                }));
+                //open ledger in readonly mode.
+                bookKeeper.asyncOpenLedgerNoRecovery(lastLedgerId, digestType, 
config.getPassword(), opencb, null);
+
+            }
+
+            @Override
+            public void 
operationFailed(ManagedLedgerException.MetaStoreException e) {
+                if (e instanceof 
ManagedLedgerException.MetadataNotFoundException) {
+                    callback.initializeFailed(new 
ManagedLedgerException.ManagedLedgerNotFoundException(e));
+                } else {
+                    callback.initializeFailed(new ManagedLedgerException(e));
+                }
+            }
+        });
     }
 
     public TopicName getShadowSource() {
         return shadowSource;
     }
+
+    @Override
+    protected boolean isLedgersReadonly() {
+        return true;
+    }
+
+    @Override
+    protected synchronized void 
initializeBookKeeper(ManagedLedgerInitializeLedgerCallback callback) {
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] initializing bookkeeper for shadowManagedLedger; 
ledgers {}", name, ledgers);
+        }
+
+        // Calculate total entries and size
+        Iterator<LedgerInfo> iterator = ledgers.values().iterator();
+        while (iterator.hasNext()) {
+            LedgerInfo li = iterator.next();
+            if (li.getEntries() > 0) {
+                NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, li.getEntries());
+                TOTAL_SIZE_UPDATER.addAndGet(this, li.getSize());
+            } else if (li.getLedgerId() != currentLedger.getId()) {
+                //do not remove the last empty ledger.
+                iterator.remove();
+            }
+        }
+
+        initLastConfirmedEntry();
+        // Save it back to ensure all nodes exist and properties are persisted.
+        store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, 
new MetaStore.MetaStoreCallback<>() {
+            @Override
+            public void operationComplete(Void result, Stat stat) {
+                ledgersStat = stat;
+                initializeCursors(callback);
+            }
+
+            @Override
+            public void 
operationFailed(ManagedLedgerException.MetaStoreException e) {
+                handleBadVersion(e);
+                callback.initializeFailed(new ManagedLedgerException(e));
+            }
+        });
+    }
+
+    private void initLastConfirmedEntry() {
+        if (lastConfirmedEntry != null || currentLedger == null) {
+            return;
+        }
+        lastConfirmedEntry = new PositionImpl(currentLedger.getId(), 
currentLedger.getLastAddConfirmed());
+        // bypass empty ledgers, find last ledger with Message if possible.
+        while (lastConfirmedEntry.getEntryId() == -1) {
+            Map.Entry<Long, LedgerInfo> formerLedger = 
ledgers.lowerEntry(lastConfirmedEntry.getLedgerId());
+            if (formerLedger != null) {
+                LedgerInfo ledgerInfo = formerLedger.getValue();
+                lastConfirmedEntry = 
PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1);
+            } else {
+                break;
+            }
+        }
+    }
+
+    @Override
+    protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) 
{
+        if (!beforeAddEntry(addOperation)) {
+            return;
+        }
+        if (state == State.Terminated) {
+            addOperation.failed(new 
ManagedLedgerException.ManagedLedgerTerminatedException(
+                    "Managed ledger was already terminated"));
+            return;
+        }
+        if (state != State.LedgerOpened) {
+            addOperation.failed(new ManagedLedgerException("Managed ledger is 
not opened"));
+            return;
+        }
+
+        if (addOperation.getCtx() == null || !(addOperation.getCtx() 
instanceof Position position)) {
+            addOperation.failed(new ManagedLedgerException("Illegal 
addOperation context object."));
+            return;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Add entry into shadow ledger lh={} entries={}, 
pos=({},{})",
+                    name, currentLedger.getId(), currentLedgerEntries, 
position.getLedgerId(), position.getEntryId());
+        }
+        pendingAddEntries.add(addOperation);
+        if (position.getLedgerId() == currentLedger.getId()) {
+            // Write into lastLedger
+            addOperation.setLedger(currentLedger);
+            currentLedgerEntries = position.getEntryId();
+            currentLedgerSize += addOperation.data.readableBytes();
+            addOperation.initiateShadowWrite();
+        }
+        lastAddEntryTimeMs = System.currentTimeMillis();
+    }
+
+    /**
+     * Handle source ManagedLedgerInfo updates.
+     * Update types:
+     * 1. new ledgers.
+     * 2. old ledgers deleted.
+     * 3. old ledger offload info updated (including ledger deleted from 
bookie by offloader)
+     *
+     */
+    private synchronized void 
processSourceManagedLedgerInfo(MLDataFormats.ManagedLedgerInfo mlInfo, Stat 
stat) {
+
+        if (log.isDebugEnabled()) {
+            log.debug("[{}][{}] new SourceManagedLedgerInfo:{}, 
prevStat={},stat={}", name, sourceMLName, mlInfo,
+                    sourceLedgersStat, stat);
+        }
+
+        sourceLedgersStat = stat;

Review Comment:
   Do we need to check the data version?
   Maybe the watcher gets a notification first, and then we get the response 
from get managed ledger info operation. Will we add the deleted ledger back?



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java:
##########
@@ -18,40 +18,363 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
+import static 
org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
+import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.AsyncCallback;
+import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.metadata.api.Stat;
 
 /**
  * Working in progress until <a 
href="https://github.com/apache/pulsar/issues/16153";>PIP-180</a> is finished.
- * Currently, it works nothing different with ManagedLedgerImpl.
  */
 @Slf4j
 public class ShadowManagedLedgerImpl extends ManagedLedgerImpl {
 
     private final TopicName shadowSource;
     private final String sourceMLName;
+    private volatile Stat sourceLedgersStat;
 
     public ShadowManagedLedgerImpl(ManagedLedgerFactoryImpl factory, 
BookKeeper bookKeeper,
                                    MetaStore store, ManagedLedgerConfig config,
                                    OrderedScheduler scheduledExecutor,
                                    String name, final Supplier<Boolean> 
mlOwnershipChecker) {
         super(factory, bookKeeper, store, config, scheduledExecutor, name, 
mlOwnershipChecker);
-        this.shadowSource = TopicName.get(config.getShadowSource());
-        this.sourceMLName = shadowSource.getPersistenceNamingEncoding();
+        if (config.getTopicName().isPartitioned() && 
TopicName.getPartitionIndex(config.getShadowSource()) == -1) {
+            this.shadowSource =
+                    
TopicName.get(config.getShadowSource()).getPartition(config.getTopicName().getPartitionIndex());
+        } else {
+            this.shadowSource = TopicName.get(config.getShadowSource());
+        }
+        this.sourceMLName =
+                shadowSource.getPersistenceNamingEncoding();
     }
 
+    /**
+     * ShadowManagedLedger init steps:
+     * 1. this.initialize : read source managedLedgerInfo
+     * 2. super.initialize : read its own read source managedLedgerInfo
+     * 3. this.initializeBookKeeper
+     * 4. super.initializeCursors
+     * @param callback
+     * @param ctx
+     */
     @Override
     synchronized void initialize(ManagedLedgerInitializeLedgerCallback 
callback, Object ctx) {
-        // TODO: ShadowManagedLedger has different initialize process from 
normal ManagedLedger,
-        //  which is complicated and will be implemented in the next PRs.
-        super.initialize(callback, ctx);
+        log.info("Opening shadow managed ledger {} with source={}", name, 
sourceMLName);
+
+        executor.executeOrdered(name, safeRun(() -> doInitialize(callback, 
ctx)));
+    }
+
+    private void doInitialize(ManagedLedgerInitializeLedgerCallback callback, 
Object ctx) {
+        // Fetch the list of existing ledgers in the source managed ledger
+        store.watchManagedLedgerInfo(sourceMLName, (managedLedgerInfo, stat) ->
+                executor.executeOrdered(name, safeRun(() -> 
processSourceManagedLedgerInfo(managedLedgerInfo, stat)))
+        );
+        store.getManagedLedgerInfo(sourceMLName, false, null, new 
MetaStore.MetaStoreCallback<>() {
+            @Override
+            public void operationComplete(MLDataFormats.ManagedLedgerInfo 
mlInfo, Stat stat) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}][{}] Source ML info:{}", name, 
sourceMLName, mlInfo);
+                }
+                sourceLedgersStat = stat;
+                if (mlInfo.getLedgerInfoCount() == 0) {
+                    // Small chance here, since shadow topic is created after 
source topic exists.
+                    log.warn("[{}] Source topic ledger list is empty! 
source={},mlInfo={},stat={}", name, sourceMLName,
+                            mlInfo, stat);
+                    ShadowManagedLedgerImpl.super.initialize(callback, ctx);
+                    return;
+                }
+
+                if (mlInfo.hasTerminatedPosition()) {
+                    state = State.Terminated;
+                    lastConfirmedEntry = new 
PositionImpl(mlInfo.getTerminatedPosition());
+                    log.info("[{}][{}] Recovering managed ledger terminated at 
{}", name, sourceMLName,
+                            lastConfirmedEntry);
+                }
+
+                for (LedgerInfo ls : mlInfo.getLedgerInfoList()) {
+                    ledgers.put(ls.getLedgerId(), ls);
+                }
+
+                final long lastLedgerId = ledgers.lastKey();
+                mbean.startDataLedgerOpenOp();
+                AsyncCallback.OpenCallback opencb = (rc, lh, ctx1) -> 
executor.executeOrdered(name, safeRun(() -> {
+                    mbean.endDataLedgerOpenOp();
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Opened source ledger {}", name, 
lastLedgerId);
+                    }
+                    if (rc == BKException.Code.OK) {
+                        LedgerInfo info =
+                                LedgerInfo.newBuilder()
+                                        .setLedgerId(lastLedgerId)
+                                        .setEntries(lh.getLastAddConfirmed() + 
1)
+                                        .setSize(lh.getLength())
+                                        .setTimestamp(clock.millis()).build();
+                        ledgers.put(lastLedgerId, info);
+
+                        //Always consider the last ledger is opened in source.
+                        STATE_UPDATER.set(ShadowManagedLedgerImpl.this, 
State.LedgerOpened);
+                        currentLedger = lh;
+
+                        if (managedLedgerInterceptor != null) {
+                            
managedLedgerInterceptor.onManagedLedgerLastLedgerInitialize(name, lh)
+                                    .thenRun(() -> 
ShadowManagedLedgerImpl.super.initialize(callback, ctx))
+                                    .exceptionally(ex -> {
+                                        callback.initializeFailed(
+                                                new 
ManagedLedgerException.ManagedLedgerInterceptException(
+                                                        ex.getCause()));
+                                        return null;
+                                    });
+                        } else {
+                            ShadowManagedLedgerImpl.super.initialize(callback, 
ctx);
+                        }
+                    } else if (isNoSuchLedgerExistsException(rc)) {
+                        log.warn("[{}] Source ledger not found: {}", name, 
lastLedgerId);
+                        ledgers.remove(lastLedgerId);
+                        ShadowManagedLedgerImpl.super.initialize(callback, 
ctx);
+                    } else {
+                        log.error("[{}] Failed to open source ledger {}: {}", 
name, lastLedgerId,
+                                BKException.getMessage(rc));
+                        
callback.initializeFailed(createManagedLedgerException(rc));
+                    }
+                }));
+                //open ledger in readonly mode.
+                bookKeeper.asyncOpenLedgerNoRecovery(lastLedgerId, digestType, 
config.getPassword(), opencb, null);
+
+            }
+
+            @Override
+            public void 
operationFailed(ManagedLedgerException.MetaStoreException e) {
+                if (e instanceof 
ManagedLedgerException.MetadataNotFoundException) {
+                    callback.initializeFailed(new 
ManagedLedgerException.ManagedLedgerNotFoundException(e));
+                } else {
+                    callback.initializeFailed(new ManagedLedgerException(e));
+                }
+            }
+        });
     }
 
     public TopicName getShadowSource() {
         return shadowSource;
     }
+
+    @Override
+    protected boolean isLedgersReadonly() {
+        return true;
+    }
+
+    @Override
+    protected synchronized void 
initializeBookKeeper(ManagedLedgerInitializeLedgerCallback callback) {
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] initializing bookkeeper for shadowManagedLedger; 
ledgers {}", name, ledgers);
+        }
+
+        // Calculate total entries and size
+        Iterator<LedgerInfo> iterator = ledgers.values().iterator();
+        while (iterator.hasNext()) {
+            LedgerInfo li = iterator.next();
+            if (li.getEntries() > 0) {
+                NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, li.getEntries());
+                TOTAL_SIZE_UPDATER.addAndGet(this, li.getSize());
+            } else if (li.getLedgerId() != currentLedger.getId()) {
+                //do not remove the last empty ledger.
+                iterator.remove();
+            }
+        }
+
+        initLastConfirmedEntry();
+        // Save it back to ensure all nodes exist and properties are persisted.
+        store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, 
new MetaStore.MetaStoreCallback<>() {
+            @Override
+            public void operationComplete(Void result, Stat stat) {
+                ledgersStat = stat;
+                initializeCursors(callback);
+            }
+
+            @Override
+            public void 
operationFailed(ManagedLedgerException.MetaStoreException e) {
+                handleBadVersion(e);
+                callback.initializeFailed(new ManagedLedgerException(e));
+            }
+        });
+    }
+
+    private void initLastConfirmedEntry() {
+        if (lastConfirmedEntry != null || currentLedger == null) {
+            return;
+        }
+        lastConfirmedEntry = new PositionImpl(currentLedger.getId(), 
currentLedger.getLastAddConfirmed());
+        // bypass empty ledgers, find last ledger with Message if possible.
+        while (lastConfirmedEntry.getEntryId() == -1) {
+            Map.Entry<Long, LedgerInfo> formerLedger = 
ledgers.lowerEntry(lastConfirmedEntry.getLedgerId());
+            if (formerLedger != null) {
+                LedgerInfo ledgerInfo = formerLedger.getValue();
+                lastConfirmedEntry = 
PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1);
+            } else {
+                break;
+            }
+        }
+    }
+
+    @Override
+    protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) 
{
+        if (!beforeAddEntry(addOperation)) {
+            return;
+        }
+        if (state == State.Terminated) {
+            addOperation.failed(new 
ManagedLedgerException.ManagedLedgerTerminatedException(
+                    "Managed ledger was already terminated"));
+            return;
+        }

Review Comment:
   Looks like we don't need to check if the topic is terminated.
   And if the broker receives the zookeeper notification first, then the shadow 
topic changes to the terminated state. But the shadow replicator hasn't reached 
the end of the source topic. In this case, the consumer connected to the shadow 
topic will have a different view.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java:
##########
@@ -18,40 +18,363 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
+import static 
org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
+import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.AsyncCallback;
+import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.metadata.api.Stat;
 
 /**
  * Working in progress until <a 
href="https://github.com/apache/pulsar/issues/16153";>PIP-180</a> is finished.
- * Currently, it works nothing different with ManagedLedgerImpl.
  */
 @Slf4j
 public class ShadowManagedLedgerImpl extends ManagedLedgerImpl {
 
     private final TopicName shadowSource;
     private final String sourceMLName;
+    private volatile Stat sourceLedgersStat;
 
     public ShadowManagedLedgerImpl(ManagedLedgerFactoryImpl factory, 
BookKeeper bookKeeper,
                                    MetaStore store, ManagedLedgerConfig config,
                                    OrderedScheduler scheduledExecutor,
                                    String name, final Supplier<Boolean> 
mlOwnershipChecker) {
         super(factory, bookKeeper, store, config, scheduledExecutor, name, 
mlOwnershipChecker);
-        this.shadowSource = TopicName.get(config.getShadowSource());
-        this.sourceMLName = shadowSource.getPersistenceNamingEncoding();
+        if (config.getTopicName().isPartitioned() && 
TopicName.getPartitionIndex(config.getShadowSource()) == -1) {
+            this.shadowSource =
+                    
TopicName.get(config.getShadowSource()).getPartition(config.getTopicName().getPartitionIndex());
+        } else {
+            this.shadowSource = TopicName.get(config.getShadowSource());
+        }
+        this.sourceMLName =
+                shadowSource.getPersistenceNamingEncoding();
     }
 
+    /**
+     * ShadowManagedLedger init steps:
+     * 1. this.initialize : read source managedLedgerInfo
+     * 2. super.initialize : read its own read source managedLedgerInfo
+     * 3. this.initializeBookKeeper
+     * 4. super.initializeCursors
+     * @param callback
+     * @param ctx
+     */
     @Override
     synchronized void initialize(ManagedLedgerInitializeLedgerCallback 
callback, Object ctx) {
-        // TODO: ShadowManagedLedger has different initialize process from 
normal ManagedLedger,
-        //  which is complicated and will be implemented in the next PRs.
-        super.initialize(callback, ctx);
+        log.info("Opening shadow managed ledger {} with source={}", name, 
sourceMLName);
+
+        executor.executeOrdered(name, safeRun(() -> doInitialize(callback, 
ctx)));
+    }
+
+    private void doInitialize(ManagedLedgerInitializeLedgerCallback callback, 
Object ctx) {
+        // Fetch the list of existing ledgers in the source managed ledger
+        store.watchManagedLedgerInfo(sourceMLName, (managedLedgerInfo, stat) ->
+                executor.executeOrdered(name, safeRun(() -> 
processSourceManagedLedgerInfo(managedLedgerInfo, stat)))
+        );
+        store.getManagedLedgerInfo(sourceMLName, false, null, new 
MetaStore.MetaStoreCallback<>() {
+            @Override
+            public void operationComplete(MLDataFormats.ManagedLedgerInfo 
mlInfo, Stat stat) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}][{}] Source ML info:{}", name, 
sourceMLName, mlInfo);
+                }
+                sourceLedgersStat = stat;
+                if (mlInfo.getLedgerInfoCount() == 0) {
+                    // Small chance here, since shadow topic is created after 
source topic exists.
+                    log.warn("[{}] Source topic ledger list is empty! 
source={},mlInfo={},stat={}", name, sourceMLName,
+                            mlInfo, stat);
+                    ShadowManagedLedgerImpl.super.initialize(callback, ctx);
+                    return;
+                }
+
+                if (mlInfo.hasTerminatedPosition()) {
+                    state = State.Terminated;
+                    lastConfirmedEntry = new 
PositionImpl(mlInfo.getTerminatedPosition());
+                    log.info("[{}][{}] Recovering managed ledger terminated at 
{}", name, sourceMLName,
+                            lastConfirmedEntry);
+                }
+
+                for (LedgerInfo ls : mlInfo.getLedgerInfoList()) {
+                    ledgers.put(ls.getLedgerId(), ls);
+                }
+
+                final long lastLedgerId = ledgers.lastKey();
+                mbean.startDataLedgerOpenOp();
+                AsyncCallback.OpenCallback opencb = (rc, lh, ctx1) -> 
executor.executeOrdered(name, safeRun(() -> {
+                    mbean.endDataLedgerOpenOp();
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Opened source ledger {}", name, 
lastLedgerId);
+                    }
+                    if (rc == BKException.Code.OK) {
+                        LedgerInfo info =
+                                LedgerInfo.newBuilder()
+                                        .setLedgerId(lastLedgerId)
+                                        .setEntries(lh.getLastAddConfirmed() + 
1)
+                                        .setSize(lh.getLength())
+                                        .setTimestamp(clock.millis()).build();
+                        ledgers.put(lastLedgerId, info);
+
+                        //Always consider the last ledger is opened in source.
+                        STATE_UPDATER.set(ShadowManagedLedgerImpl.this, 
State.LedgerOpened);
+                        currentLedger = lh;
+
+                        if (managedLedgerInterceptor != null) {
+                            
managedLedgerInterceptor.onManagedLedgerLastLedgerInitialize(name, lh)
+                                    .thenRun(() -> 
ShadowManagedLedgerImpl.super.initialize(callback, ctx))
+                                    .exceptionally(ex -> {
+                                        callback.initializeFailed(
+                                                new 
ManagedLedgerException.ManagedLedgerInterceptException(
+                                                        ex.getCause()));
+                                        return null;
+                                    });
+                        } else {
+                            ShadowManagedLedgerImpl.super.initialize(callback, 
ctx);
+                        }
+                    } else if (isNoSuchLedgerExistsException(rc)) {
+                        log.warn("[{}] Source ledger not found: {}", name, 
lastLedgerId);
+                        ledgers.remove(lastLedgerId);
+                        ShadowManagedLedgerImpl.super.initialize(callback, 
ctx);
+                    } else {
+                        log.error("[{}] Failed to open source ledger {}: {}", 
name, lastLedgerId,
+                                BKException.getMessage(rc));
+                        
callback.initializeFailed(createManagedLedgerException(rc));
+                    }
+                }));
+                //open ledger in readonly mode.
+                bookKeeper.asyncOpenLedgerNoRecovery(lastLedgerId, digestType, 
config.getPassword(), opencb, null);
+
+            }
+
+            @Override
+            public void 
operationFailed(ManagedLedgerException.MetaStoreException e) {
+                if (e instanceof 
ManagedLedgerException.MetadataNotFoundException) {
+                    callback.initializeFailed(new 
ManagedLedgerException.ManagedLedgerNotFoundException(e));
+                } else {
+                    callback.initializeFailed(new ManagedLedgerException(e));
+                }
+            }
+        });
     }
 
     public TopicName getShadowSource() {
         return shadowSource;
     }
+
+    @Override
+    protected boolean isLedgersReadonly() {
+        return true;
+    }
+
+    @Override
+    protected synchronized void 
initializeBookKeeper(ManagedLedgerInitializeLedgerCallback callback) {
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] initializing bookkeeper for shadowManagedLedger; 
ledgers {}", name, ledgers);
+        }
+
+        // Calculate total entries and size
+        Iterator<LedgerInfo> iterator = ledgers.values().iterator();
+        while (iterator.hasNext()) {
+            LedgerInfo li = iterator.next();
+            if (li.getEntries() > 0) {
+                NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, li.getEntries());
+                TOTAL_SIZE_UPDATER.addAndGet(this, li.getSize());
+            } else if (li.getLedgerId() != currentLedger.getId()) {
+                //do not remove the last empty ledger.
+                iterator.remove();
+            }
+        }
+
+        initLastConfirmedEntry();
+        // Save it back to ensure all nodes exist and properties are persisted.
+        store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, 
new MetaStore.MetaStoreCallback<>() {
+            @Override
+            public void operationComplete(Void result, Stat stat) {
+                ledgersStat = stat;
+                initializeCursors(callback);
+            }
+
+            @Override
+            public void 
operationFailed(ManagedLedgerException.MetaStoreException e) {
+                handleBadVersion(e);
+                callback.initializeFailed(new ManagedLedgerException(e));
+            }
+        });
+    }
+
+    private void initLastConfirmedEntry() {
+        if (lastConfirmedEntry != null || currentLedger == null) {
+            return;
+        }
+        lastConfirmedEntry = new PositionImpl(currentLedger.getId(), 
currentLedger.getLastAddConfirmed());
+        // bypass empty ledgers, find last ledger with Message if possible.
+        while (lastConfirmedEntry.getEntryId() == -1) {
+            Map.Entry<Long, LedgerInfo> formerLedger = 
ledgers.lowerEntry(lastConfirmedEntry.getLedgerId());
+            if (formerLedger != null) {
+                LedgerInfo ledgerInfo = formerLedger.getValue();
+                lastConfirmedEntry = 
PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1);
+            } else {
+                break;
+            }
+        }
+    }
+
+    @Override
+    protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) 
{
+        if (!beforeAddEntry(addOperation)) {
+            return;
+        }
+        if (state == State.Terminated) {
+            addOperation.failed(new 
ManagedLedgerException.ManagedLedgerTerminatedException(
+                    "Managed ledger was already terminated"));
+            return;
+        }
+        if (state != State.LedgerOpened) {
+            addOperation.failed(new ManagedLedgerException("Managed ledger is 
not opened"));
+            return;
+        }
+
+        if (addOperation.getCtx() == null || !(addOperation.getCtx() 
instanceof Position position)) {
+            addOperation.failed(new ManagedLedgerException("Illegal 
addOperation context object."));
+            return;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Add entry into shadow ledger lh={} entries={}, 
pos=({},{})",
+                    name, currentLedger.getId(), currentLedgerEntries, 
position.getLedgerId(), position.getEntryId());
+        }
+        pendingAddEntries.add(addOperation);
+        if (position.getLedgerId() == currentLedger.getId()) {
+            // Write into lastLedger
+            addOperation.setLedger(currentLedger);
+            currentLedgerEntries = position.getEntryId();
+            currentLedgerSize += addOperation.data.readableBytes();
+            addOperation.initiateShadowWrite();
+        }

Review Comment:
   How about the position's ledger ID that is not equal to the current ledger 
ID? It looks like the `addOperation` will never be complete.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java:
##########
@@ -18,40 +18,363 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
+import static 
org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
+import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.AsyncCallback;
+import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.metadata.api.Stat;
 
 /**
  * Working in progress until <a 
href="https://github.com/apache/pulsar/issues/16153";>PIP-180</a> is finished.
- * Currently, it works nothing different with ManagedLedgerImpl.
  */
 @Slf4j
 public class ShadowManagedLedgerImpl extends ManagedLedgerImpl {
 
     private final TopicName shadowSource;
     private final String sourceMLName;
+    private volatile Stat sourceLedgersStat;
 
     public ShadowManagedLedgerImpl(ManagedLedgerFactoryImpl factory, 
BookKeeper bookKeeper,
                                    MetaStore store, ManagedLedgerConfig config,
                                    OrderedScheduler scheduledExecutor,
                                    String name, final Supplier<Boolean> 
mlOwnershipChecker) {
         super(factory, bookKeeper, store, config, scheduledExecutor, name, 
mlOwnershipChecker);
-        this.shadowSource = TopicName.get(config.getShadowSource());
-        this.sourceMLName = shadowSource.getPersistenceNamingEncoding();
+        if (config.getTopicName().isPartitioned() && 
TopicName.getPartitionIndex(config.getShadowSource()) == -1) {
+            this.shadowSource =
+                    
TopicName.get(config.getShadowSource()).getPartition(config.getTopicName().getPartitionIndex());
+        } else {
+            this.shadowSource = TopicName.get(config.getShadowSource());
+        }
+        this.sourceMLName =
+                shadowSource.getPersistenceNamingEncoding();
     }
 
+    /**
+     * ShadowManagedLedger init steps:
+     * 1. this.initialize : read source managedLedgerInfo
+     * 2. super.initialize : read its own read source managedLedgerInfo
+     * 3. this.initializeBookKeeper
+     * 4. super.initializeCursors
+     * @param callback
+     * @param ctx
+     */
     @Override
     synchronized void initialize(ManagedLedgerInitializeLedgerCallback 
callback, Object ctx) {
-        // TODO: ShadowManagedLedger has different initialize process from 
normal ManagedLedger,
-        //  which is complicated and will be implemented in the next PRs.
-        super.initialize(callback, ctx);
+        log.info("Opening shadow managed ledger {} with source={}", name, 
sourceMLName);
+
+        executor.executeOrdered(name, safeRun(() -> doInitialize(callback, 
ctx)));
+    }
+
+    private void doInitialize(ManagedLedgerInitializeLedgerCallback callback, 
Object ctx) {
+        // Fetch the list of existing ledgers in the source managed ledger
+        store.watchManagedLedgerInfo(sourceMLName, (managedLedgerInfo, stat) ->
+                executor.executeOrdered(name, safeRun(() -> 
processSourceManagedLedgerInfo(managedLedgerInfo, stat)))
+        );
+        store.getManagedLedgerInfo(sourceMLName, false, null, new 
MetaStore.MetaStoreCallback<>() {
+            @Override
+            public void operationComplete(MLDataFormats.ManagedLedgerInfo 
mlInfo, Stat stat) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}][{}] Source ML info:{}", name, 
sourceMLName, mlInfo);
+                }
+                sourceLedgersStat = stat;
+                if (mlInfo.getLedgerInfoCount() == 0) {
+                    // Small chance here, since shadow topic is created after 
source topic exists.
+                    log.warn("[{}] Source topic ledger list is empty! 
source={},mlInfo={},stat={}", name, sourceMLName,
+                            mlInfo, stat);
+                    ShadowManagedLedgerImpl.super.initialize(callback, ctx);
+                    return;
+                }
+
+                if (mlInfo.hasTerminatedPosition()) {
+                    state = State.Terminated;
+                    lastConfirmedEntry = new 
PositionImpl(mlInfo.getTerminatedPosition());
+                    log.info("[{}][{}] Recovering managed ledger terminated at 
{}", name, sourceMLName,
+                            lastConfirmedEntry);
+                }
+
+                for (LedgerInfo ls : mlInfo.getLedgerInfoList()) {
+                    ledgers.put(ls.getLedgerId(), ls);
+                }
+
+                final long lastLedgerId = ledgers.lastKey();
+                mbean.startDataLedgerOpenOp();
+                AsyncCallback.OpenCallback opencb = (rc, lh, ctx1) -> 
executor.executeOrdered(name, safeRun(() -> {
+                    mbean.endDataLedgerOpenOp();
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Opened source ledger {}", name, 
lastLedgerId);
+                    }
+                    if (rc == BKException.Code.OK) {
+                        LedgerInfo info =
+                                LedgerInfo.newBuilder()
+                                        .setLedgerId(lastLedgerId)
+                                        .setEntries(lh.getLastAddConfirmed() + 
1)
+                                        .setSize(lh.getLength())
+                                        .setTimestamp(clock.millis()).build();
+                        ledgers.put(lastLedgerId, info);
+
+                        //Always consider the last ledger is opened in source.
+                        STATE_UPDATER.set(ShadowManagedLedgerImpl.this, 
State.LedgerOpened);
+                        currentLedger = lh;
+
+                        if (managedLedgerInterceptor != null) {
+                            
managedLedgerInterceptor.onManagedLedgerLastLedgerInitialize(name, lh)
+                                    .thenRun(() -> 
ShadowManagedLedgerImpl.super.initialize(callback, ctx))
+                                    .exceptionally(ex -> {
+                                        callback.initializeFailed(
+                                                new 
ManagedLedgerException.ManagedLedgerInterceptException(
+                                                        ex.getCause()));
+                                        return null;
+                                    });
+                        } else {
+                            ShadowManagedLedgerImpl.super.initialize(callback, 
ctx);
+                        }
+                    } else if (isNoSuchLedgerExistsException(rc)) {
+                        log.warn("[{}] Source ledger not found: {}", name, 
lastLedgerId);
+                        ledgers.remove(lastLedgerId);
+                        ShadowManagedLedgerImpl.super.initialize(callback, 
ctx);
+                    } else {
+                        log.error("[{}] Failed to open source ledger {}: {}", 
name, lastLedgerId,
+                                BKException.getMessage(rc));
+                        
callback.initializeFailed(createManagedLedgerException(rc));
+                    }
+                }));
+                //open ledger in readonly mode.
+                bookKeeper.asyncOpenLedgerNoRecovery(lastLedgerId, digestType, 
config.getPassword(), opencb, null);
+
+            }
+
+            @Override
+            public void 
operationFailed(ManagedLedgerException.MetaStoreException e) {
+                if (e instanceof 
ManagedLedgerException.MetadataNotFoundException) {
+                    callback.initializeFailed(new 
ManagedLedgerException.ManagedLedgerNotFoundException(e));
+                } else {
+                    callback.initializeFailed(new ManagedLedgerException(e));
+                }
+            }
+        });
     }
 
     public TopicName getShadowSource() {
         return shadowSource;
     }
+
+    @Override
+    protected boolean isLedgersReadonly() {
+        return true;
+    }
+
+    @Override
+    protected synchronized void 
initializeBookKeeper(ManagedLedgerInitializeLedgerCallback callback) {
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] initializing bookkeeper for shadowManagedLedger; 
ledgers {}", name, ledgers);
+        }
+
+        // Calculate total entries and size
+        Iterator<LedgerInfo> iterator = ledgers.values().iterator();
+        while (iterator.hasNext()) {
+            LedgerInfo li = iterator.next();
+            if (li.getEntries() > 0) {
+                NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, li.getEntries());
+                TOTAL_SIZE_UPDATER.addAndGet(this, li.getSize());
+            } else if (li.getLedgerId() != currentLedger.getId()) {
+                //do not remove the last empty ledger.
+                iterator.remove();
+            }
+        }
+
+        initLastConfirmedEntry();
+        // Save it back to ensure all nodes exist and properties are persisted.
+        store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, 
new MetaStore.MetaStoreCallback<>() {
+            @Override
+            public void operationComplete(Void result, Stat stat) {
+                ledgersStat = stat;
+                initializeCursors(callback);
+            }
+
+            @Override
+            public void 
operationFailed(ManagedLedgerException.MetaStoreException e) {
+                handleBadVersion(e);
+                callback.initializeFailed(new ManagedLedgerException(e));
+            }
+        });
+    }
+
+    private void initLastConfirmedEntry() {
+        if (lastConfirmedEntry != null || currentLedger == null) {
+            return;
+        }
+        lastConfirmedEntry = new PositionImpl(currentLedger.getId(), 
currentLedger.getLastAddConfirmed());
+        // bypass empty ledgers, find last ledger with Message if possible.
+        while (lastConfirmedEntry.getEntryId() == -1) {
+            Map.Entry<Long, LedgerInfo> formerLedger = 
ledgers.lowerEntry(lastConfirmedEntry.getLedgerId());
+            if (formerLedger != null) {
+                LedgerInfo ledgerInfo = formerLedger.getValue();
+                lastConfirmedEntry = 
PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1);
+            } else {
+                break;
+            }
+        }
+    }
+
+    @Override
+    protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) 
{
+        if (!beforeAddEntry(addOperation)) {
+            return;
+        }
+        if (state == State.Terminated) {
+            addOperation.failed(new 
ManagedLedgerException.ManagedLedgerTerminatedException(
+                    "Managed ledger was already terminated"));
+            return;
+        }
+        if (state != State.LedgerOpened) {
+            addOperation.failed(new ManagedLedgerException("Managed ledger is 
not opened"));
+            return;
+        }
+
+        if (addOperation.getCtx() == null || !(addOperation.getCtx() 
instanceof Position position)) {
+            addOperation.failed(new ManagedLedgerException("Illegal 
addOperation context object."));
+            return;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Add entry into shadow ledger lh={} entries={}, 
pos=({},{})",
+                    name, currentLedger.getId(), currentLedgerEntries, 
position.getLedgerId(), position.getEntryId());
+        }
+        pendingAddEntries.add(addOperation);
+        if (position.getLedgerId() == currentLedger.getId()) {
+            // Write into lastLedger
+            addOperation.setLedger(currentLedger);
+            currentLedgerEntries = position.getEntryId();
+            currentLedgerSize += addOperation.data.readableBytes();
+            addOperation.initiateShadowWrite();
+        }
+        lastAddEntryTimeMs = System.currentTimeMillis();
+    }
+
+    /**
+     * Handle source ManagedLedgerInfo updates.
+     * Update types:
+     * 1. new ledgers.
+     * 2. old ledgers deleted.
+     * 3. old ledger offload info updated (including ledger deleted from 
bookie by offloader)
+     *
+     */
+    private synchronized void 
processSourceManagedLedgerInfo(MLDataFormats.ManagedLedgerInfo mlInfo, Stat 
stat) {
+
+        if (log.isDebugEnabled()) {
+            log.debug("[{}][{}] new SourceManagedLedgerInfo:{}, 
prevStat={},stat={}", name, sourceMLName, mlInfo,
+                    sourceLedgersStat, stat);
+        }
+
+        sourceLedgersStat = stat;
+
+        if (mlInfo.hasTerminatedPosition()) {
+            state = State.Terminated;

Review Comment:
   Do you need Terminated state for a shadow ledger? If the source topic 
changes to the terminated state, the shadow topic will not get any new messages.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to