diff --git 
deleted file mode 100644
index 2486297..0000000
+++ /dev/null
@@ -1,1325 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
-import com.twitter.distributedlog.exceptions.DLIllegalStateException;
-import com.twitter.distributedlog.exceptions.EndOfStreamException;
-import com.twitter.distributedlog.exceptions.LockingException;
-import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException;
-import com.twitter.distributedlog.exceptions.TransactionIdOutOfOrderException;
-import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.function.GetLastTxIdFunction;
-import com.twitter.distributedlog.logsegment.LogSegmentEntryStore;
-import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter;
-import com.twitter.distributedlog.metadata.LogMetadataForWriter;
-import com.twitter.distributedlog.lock.DistributedLock;
-import com.twitter.distributedlog.logsegment.LogSegmentFilter;
-import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache;
-import com.twitter.distributedlog.logsegment.RollingPolicy;
-import com.twitter.distributedlog.logsegment.SizeBasedRollingPolicy;
-import com.twitter.distributedlog.logsegment.TimeBasedRollingPolicy;
-import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
-import com.twitter.distributedlog.metadata.MetadataUpdater;
-import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
-import com.twitter.distributedlog.util.Allocator;
-import com.twitter.distributedlog.util.DLUtils;
-import com.twitter.distributedlog.util.FailpointUtils;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.FutureUtils.FutureEventListenerRunnable;
-import com.twitter.distributedlog.util.OrderedScheduler;
-import com.twitter.distributedlog.util.Transaction;
-import com.twitter.distributedlog.util.PermitLimiter;
-import com.twitter.distributedlog.util.Utils;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.feature.FeatureProvider;
-import org.apache.bookkeeper.stats.AlertStatsLogger;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import static 
- * Log Handler for Writers.
- *
- * <h3>Metrics</h3>
- * All the metrics about log write handler are exposed under scope `segments`.
- * <ul>
- * <li> `segments`/open : opstats. latency characteristics on starting a new 
log segment.
- * <li> `segments`/close : opstats. latency characteristics on completing an 
inprogress log segment.
- * <li> `segments`/recover : opstats. latency characteristics on recovering a 
log segment.
- * <li> `segments`/delete : opstats. latency characteristics on deleting a log 
- * </ul>
- */
-class BKLogWriteHandler extends BKLogHandler {
-    static final Logger LOG = LoggerFactory.getLogger(BKLogReadHandler.class);
-    private static Transaction.OpListener<LogSegmentEntryWriter> 
-            new Transaction.OpListener<LogSegmentEntryWriter>() {
-        @Override
-        public void onCommit(LogSegmentEntryWriter r) {
-            // no-op
-        }
-        @Override
-        public void onAbort(Throwable t) {
-            // no-op
-        }
-    };
-    protected final LogMetadataForWriter logMetadataForWriter;
-    protected final Allocator<LogSegmentEntryWriter, Object> 
-    protected final DistributedLock lock;
-    protected final MaxTxId maxTxId;
-    protected final MaxLogSegmentSequenceNo maxLogSegmentSequenceNo;
-    protected final boolean validateLogSegmentSequenceNumber;
-    protected final int regionId;
-    protected final RollingPolicy rollingPolicy;
-    protected Future<? extends DistributedLock> lockFuture = null;
-    protected final PermitLimiter writeLimiter;
-    protected final FeatureProvider featureProvider;
-    protected final DynamicDistributedLogConfiguration dynConf;
-    protected final MetadataUpdater metadataUpdater;
-    // tracking the inprogress log segments
-    protected final LinkedList<Long> inprogressLSSNs;
-    // Fetch LogSegments State: write can continue without full list of log 
segments while truncation needs
-    private final Future<Versioned<List<LogSegmentMetadata>>> fetchForWrite;
-    private Future<Versioned<List<LogSegmentMetadata>>> fetchForTruncation;
-    // Recover Functions
-    private final RecoverLogSegmentFunction recoverLogSegmentFunction =
-            new RecoverLogSegmentFunction();
-    private final AbstractFunction1<List<LogSegmentMetadata>, Future<Long>> 
recoverLogSegmentsFunction =
-            new AbstractFunction1<List<LogSegmentMetadata>, Future<Long>>() {
-                @Override
-                public Future<Long> apply(List<LogSegmentMetadata> 
segmentList) {
-          "Initiating Recovery For {} : {}", 
getFullyQualifiedName(), segmentList);
-                    // if lastLedgerRollingTimeMillis is not updated, we set 
it to now.
-                    synchronized (BKLogWriteHandler.this) {
-                        if (lastLedgerRollingTimeMillis < 0) {
-                            lastLedgerRollingTimeMillis = Utils.nowInMillis();
-                        }
-                    }
-                    if (validateLogSegmentSequenceNumber) {
-                        synchronized (inprogressLSSNs) {
-                            for (LogSegmentMetadata segment : segmentList) {
-                                if (segment.isInProgress()) {
-                                }
-                            }
-                        }
-                    }
-                    return FutureUtils.processList(segmentList, 
recoverLogSegmentFunction, scheduler).map(
-                            GetLastTxIdFunction.INSTANCE);
-                }
-            };
-    // Stats
-    private final StatsLogger perLogStatsLogger;
-    private final OpStatsLogger closeOpStats;
-    private final OpStatsLogger openOpStats;
-    private final OpStatsLogger recoverOpStats;
-    private final OpStatsLogger deleteOpStats;
-    /**
-     * Construct a Bookkeeper journal manager.
-     */
-    BKLogWriteHandler(LogMetadataForWriter logMetadata,
-                      DistributedLogConfiguration conf,
-                      LogStreamMetadataStore streamMetadataStore,
-                      LogSegmentMetadataCache metadataCache,
-                      LogSegmentEntryStore entryStore,
-                      OrderedScheduler scheduler,
-                      Allocator<LogSegmentEntryWriter, Object> 
-                      StatsLogger statsLogger,
-                      StatsLogger perLogStatsLogger,
-                      AlertStatsLogger alertStatsLogger,
-                      String clientId,
-                      int regionId,
-                      PermitLimiter writeLimiter,
-                      FeatureProvider featureProvider,
-                      DynamicDistributedLogConfiguration dynConf,
-                      DistributedLock lock /** owned by handler **/) {
-        super(logMetadata,
-                conf,
-                streamMetadataStore,
-                metadataCache,
-                entryStore,
-                scheduler,
-                statsLogger,
-                alertStatsLogger,
-                clientId);
-        this.logMetadataForWriter = logMetadata;
-        this.logSegmentAllocator = segmentAllocator;
-        this.perLogStatsLogger = perLogStatsLogger;
-        this.writeLimiter = writeLimiter;
-        this.featureProvider = featureProvider;
-        this.dynConf = dynConf;
-        this.lock = lock;
-        this.metadataUpdater = 
LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, metadataStore);
-        if (conf.getEncodeRegionIDInLogSegmentMetadata()) {
-            this.regionId = regionId;
-        } else {
-            this.regionId = DistributedLogConstants.LOCAL_REGION_ID;
-        }
-        this.validateLogSegmentSequenceNumber = 
-        // Construct the max sequence no
-        maxLogSegmentSequenceNo = new 
-        inprogressLSSNs = new LinkedList<Long>();
-        // Construct the max txn id.
-        maxTxId = new MaxTxId(logMetadata.getMaxTxIdData());
-        // Schedule fetching log segment list in background before we access 
-        // We don't need to watch the log segment list changes for writer, as 
it manages log segment list.
-        fetchForWrite = readLogSegmentsFromStore(
-                LogSegmentMetadata.COMPARATOR,
-                WRITE_HANDLE_FILTER,
-                null);
-        // Initialize other parameters.
-        setLastLedgerRollingTimeMillis(Utils.nowInMillis());
-        // Rolling Policy
-        if (conf.getLogSegmentRollingIntervalMinutes() > 0) {
-            rollingPolicy = new 
TimeBasedRollingPolicy(conf.getLogSegmentRollingIntervalMinutes() * 60 * 1000L);
-        } else {
-            rollingPolicy = new 
-        }
-        // Stats
-        StatsLogger segmentsStatsLogger = statsLogger.scope("segments");
-        openOpStats = segmentsStatsLogger.getOpStatsLogger("open");
-        closeOpStats = segmentsStatsLogger.getOpStatsLogger("close");
-        recoverOpStats = segmentsStatsLogger.getOpStatsLogger("recover");
-        deleteOpStats = segmentsStatsLogger.getOpStatsLogger("delete");
-    }
-    private Future<List<LogSegmentMetadata>> 
-            final Comparator<LogSegmentMetadata> comparator) {
-        final Promise<List<LogSegmentMetadata>> promise = new 
-        fetchForWrite.addEventListener(new 
FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
-            @Override
-            public void onFailure(Throwable cause) {
-                FutureUtils.setException(promise, cause);
-            }
-            @Override
-            public void onSuccess(Versioned<List<LogSegmentMetadata>> result) {
-                try {
-                    FutureUtils.setValue(promise, 
-                } catch (UnexpectedException e) {
-                    FutureUtils.setException(promise, e);
-                }
-            }
-        });
-        return promise;
-    }
-    private Future<List<LogSegmentMetadata>> 
-            final Comparator<LogSegmentMetadata> comparator) {
-        Future<Versioned<List<LogSegmentMetadata>>> result;
-        synchronized (this) {
-            if (null == fetchForTruncation) {
-                fetchForTruncation = readLogSegmentsFromStore(
-                        LogSegmentMetadata.COMPARATOR,
-                        LogSegmentFilter.DEFAULT_FILTER,
-                        null);
-            }
-            result = fetchForTruncation;
-        }
-        final Promise<List<LogSegmentMetadata>> promise = new 
-        result.addEventListener(new 
FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
-            @Override
-            public void onFailure(Throwable cause) {
-                FutureUtils.setException(promise, cause);
-            }
-            @Override
-            public void onSuccess(Versioned<List<LogSegmentMetadata>> result) {
-                try {
-                    FutureUtils.setValue(promise, 
-                } catch (UnexpectedException e) {
-                    FutureUtils.setException(promise, e);
-                }
-            }
-        });
-        return promise;
-    }
-    // Transactional operations for MaxLogSegmentSequenceNo
-    void storeMaxSequenceNumber(final Transaction<Object> txn,
-                                final MaxLogSegmentSequenceNo maxSeqNo,
-                                final long seqNo,
-                                final boolean isInprogress) {
-        metadataStore.storeMaxLogSegmentSequenceNumber(txn, logMetadata, 
-                new Transaction.OpListener<Version>() {
-            @Override
-            public void onCommit(Version version) {
-                if (validateLogSegmentSequenceNumber) {
-                    synchronized (inprogressLSSNs) {
-                        if (isInprogress) {
-                            inprogressLSSNs.add(seqNo);
-                        } else {
-                            inprogressLSSNs.removeFirst();
-                        }
-                    }
-                }
-                maxSeqNo.update(version, seqNo);
-            }
-            @Override
-            public void onAbort(Throwable t) {
-                // no-op
-            }
-        });
-    }
-    // Transactional operations for MaxTxId
-    void storeMaxTxId(final Transaction<Object> txn,
-                      final MaxTxId maxTxId,
-                      final long txId) {
-        metadataStore.storeMaxTxnId(txn, logMetadataForWriter, 
-                new Transaction.OpListener<Version>() {
-                    @Override
-                    public void onCommit(Version version) {
maxTxId.update(version, txId);
-                    @Override
-                    public void onAbort(Throwable t) {
-                        // no-op
-                    }
-                });
-    }
-    // Transactional operations for logsegment
-    void writeLogSegment(final Transaction<Object> txn,
-                         final LogSegmentMetadata metadata) {
-        metadataStore.createLogSegment(txn, metadata, new 
Transaction.OpListener<Void>() {
-            @Override
-            public void onCommit(Void r) {
-                addLogSegmentToCache(metadata.getSegmentName(), metadata);
-            }
-            @Override
-            public void onAbort(Throwable t) {
-                // no-op
-            }
-        });
-    }
-    void deleteLogSegment(final Transaction<Object> txn,
-                          final LogSegmentMetadata metadata) {
-        metadataStore.deleteLogSegment(txn, metadata, new 
Transaction.OpListener<Void>() {
-            @Override
-            public void onCommit(Void r) {
-                removeLogSegmentFromCache(metadata.getSegmentName());
-            }
-            @Override
-            public void onAbort(Throwable t) {
-                // no-op
-            }
-        });
-    }
-    /**
-     * The caller could call this before any actions, which to hold the lock 
-     * the write handler of its whole lifecycle. The lock will only be released
-     * when closing the write handler.
-     *
-     * This method is useful to prevent releasing underlying zookeeper lock 
-     * recovering/completing log segments. Releasing underlying zookeeper lock 
-     * 1) increase latency when re-lock on starting new log segment. 2) 
increase the
-     * possibility of a stream being re-acquired by other instances.
-     *
-     * @return future represents the lock result
-     */
-    Future<? extends DistributedLock> lockHandler() {
-        if (null != lockFuture) {
-            return lockFuture;
-        }
-        lockFuture = lock.asyncAcquire();
-        return lockFuture;
-    }
-    Future<Void> unlockHandler() {
-        if (null != lockFuture) {
-            return lock.asyncClose();
-        } else {
-            return Future.Void();
-        }
-    }
-    /**
-     * Start a new log segment in a BookKeeper ledger.
-     * First ensure that we have the write lock for this journal.
-     * Then create a ledger and stream based on that ledger.
-     * The ledger id is written to the inprogress znode, so that in the
-     * case of a crash, a recovery process can find the ledger we were writing
-     * to when we crashed.
-     *
-     * @param txId First transaction id to be written to the stream
-     * @return
-     * @throws IOException
-     */
-    public BKLogSegmentWriter startLogSegment(long txId) throws IOException {
-        return startLogSegment(txId, false, false);
-    }
-    /**
-     * Start a new log segment in a BookKeeper ledger.
-     * First ensure that we have the write lock for this journal.
-     * Then create a ledger and stream based on that ledger.
-     * The ledger id is written to the inprogress znode, so that in the
-     * case of a crash, a recovery process can find the ledger we were writing
-     * to when we crashed.
-     *
-     * @param txId First transaction id to be written to the stream
-     * @param bestEffort
-     * @param allowMaxTxID
-     *          allow using max tx id to start log segment
-     * @return
-     * @throws IOException
-     */
-    public BKLogSegmentWriter startLogSegment(long txId, boolean bestEffort, 
boolean allowMaxTxID)
-            throws IOException {
-        Stopwatch stopwatch = Stopwatch.createStarted();
-        boolean success = false;
-        try {
-            BKLogSegmentWriter writer = doStartLogSegment(txId, bestEffort, 
-            success = true;
-            return writer;
-        } finally {
-            if (success) {
-            } else {
-            }
-        }
-    }
-    protected long assignLogSegmentSequenceNumber() throws IOException {
-        // For any active stream we will always make sure that there is at 
least one
-        // active ledger (except when the stream first starts out). Therefore 
when we
-        // see no ledger metadata for a stream, we assume that this is the 
first ledger
-        // in the stream
-        long logSegmentSeqNo = 
-        boolean logSegmentsFound = false;
-        if 
-            List<LogSegmentMetadata> ledgerListDesc = 
-            Long nextLogSegmentSeqNo = 
-            if (null == nextLogSegmentSeqNo) {
-                logSegmentsFound = false;
-                // we don't find last assigned log segment sequence number
-                // then we start the log segment with configured 
-                logSegmentSeqNo = conf.getFirstLogSegmentSequenceNumber();
-            } else {
-                logSegmentsFound = true;
-                // latest log segment is assigned with a sequence number, 
start with next sequence number
-                logSegmentSeqNo = nextLogSegmentSeqNo;
-            }
-        }
-        // We only skip log segment sequence number validation only when no 
log segments found &
-        // the maximum log segment sequence number is "UNASSIGNED".
-        if (!logSegmentsFound &&
-            (DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO == 
maxLogSegmentSequenceNo.getSequenceNumber())) {
-            // no ledger seqno stored in /ledgers before
-  "No max ledger sequence number found while creating log 
segment {} for {}.",
-                logSegmentSeqNo, getFullyQualifiedName());
-        } else if (maxLogSegmentSequenceNo.getSequenceNumber() + 1 != 
logSegmentSeqNo) {
-            LOG.warn("Unexpected max log segment sequence number {} for {} : 
list of cached segments = {}",
-                new Object[]{maxLogSegmentSequenceNo.getSequenceNumber(), 
-                    getCachedLogSegments(LogSegmentMetadata.DESC_COMPARATOR)});
-            // there is max log segment number recorded there and it isn't 
match. throw exception.
-            throw new DLIllegalStateException("Unexpected max log segment 
sequence number "
-                + maxLogSegmentSequenceNo.getSequenceNumber() + " for " + 
-                + ", expected " + (logSegmentSeqNo - 1));
-        }
-        return logSegmentSeqNo;
-    }
-    protected BKLogSegmentWriter doStartLogSegment(long txId, boolean 
bestEffort, boolean allowMaxTxID) throws IOException {
-        return FutureUtils.result(asyncStartLogSegment(txId, bestEffort, 
-    }
-    protected Future<BKLogSegmentWriter> asyncStartLogSegment(final long txId,
-                                                              final boolean 
-                                                              final boolean 
allowMaxTxID) {
-        final Promise<BKLogSegmentWriter> promise = new 
-        try {
-            lock.checkOwnershipAndReacquire();
-        } catch (LockingException e) {
-            FutureUtils.setException(promise, e);
-            return promise;
-        }
-        fetchForWrite.addEventListener(new 
FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
-            @Override
-            public void onFailure(Throwable cause) {
-                FutureUtils.setException(promise, cause);
-            }
-            @Override
-            public void onSuccess(Versioned<List<LogSegmentMetadata>> list) {
-                doStartLogSegment(txId, bestEffort, allowMaxTxID, promise);
-            }
-        });
-        return promise;
-    }
-    protected void doStartLogSegment(final long txId,
-                                     final boolean bestEffort,
-                                     final boolean allowMaxTxID,
-                                     final Promise<BKLogSegmentWriter> 
promise) {
-        // validate the tx id
-        if ((txId < 0) ||
-                (!allowMaxTxID && (txId == DistributedLogConstants.MAX_TXID))) 
-            FutureUtils.setException(promise, new IOException("Invalid 
Transaction Id " + txId));
-            return;
-        }
-        long highestTxIdWritten = maxTxId.get();
-        if (txId < highestTxIdWritten) {
-            if (highestTxIdWritten == DistributedLogConstants.MAX_TXID) {
-                LOG.error("We've already marked the stream as ended and 
attempting to start a new log segment");
-                FutureUtils.setException(promise, new 
EndOfStreamException("Writing to a stream after it has been marked as 
-                return;
-            } else {
-                LOG.error("We've already seen TxId {} the max TXId is {}", 
txId, highestTxIdWritten);
-                FutureUtils.setException(promise, new 
TransactionIdOutOfOrderException(txId, highestTxIdWritten));
-                return;
-            }
-        }
-        try {
-            logSegmentAllocator.allocate();
-        } catch (IOException e) {
-            // failed to issue an allocation request
-            failStartLogSegment(promise, bestEffort, e);
-            return;
-        }
-        // start the transaction from zookeeper
-        final Transaction<Object> txn = streamMetadataStore.newTransaction();
-        // failpoint injected before creating ledger
-        try {
-        } catch (IOException ioe) {
-            failStartLogSegment(promise, bestEffort, ioe);
-            return;
-        }
-        logSegmentAllocator.tryObtain(txn, NULL_OP_LISTENER)
-                .addEventListener(new 
FutureEventListener<LogSegmentEntryWriter>() {
-            @Override
-            public void onSuccess(LogSegmentEntryWriter entryWriter) {
-                // try-obtain succeed
-                createInprogressLogSegment(
-                        txn,
-                        txId,
-                        entryWriter,
-                        bestEffort,
-                        promise);
-            }
-            @Override
-            public void onFailure(Throwable cause) {
-                failStartLogSegment(promise, bestEffort, cause);
-            }
-        });
-    }
-    private void failStartLogSegment(Promise<BKLogSegmentWriter> promise,
-                                     boolean bestEffort,
-                                     Throwable cause) {
-        if (bestEffort) {
-            FutureUtils.setValue(promise, null);
-        } else {
-            FutureUtils.setException(promise, cause);
-        }
-    }
-    // once the ledger handle is obtained from allocator, this function should 
-    // either the transaction is executed or aborted. Otherwise, the ledger 
handle will
-    // just leak from the allocation pool - hence cause "No Ledger Allocator"
-    private void createInprogressLogSegment(Transaction<Object> txn,
-                                            final long txId,
-                                            final LogSegmentEntryWriter 
-                                            boolean bestEffort,
-                                            final Promise<BKLogSegmentWriter> 
promise) {
-        final long logSegmentSeqNo;
-        try {
-            FailpointUtils.checkFailPoint(
-            logSegmentSeqNo = assignLogSegmentSequenceNumber();
-        } catch (IOException e) {
-            // abort the current prepared transaction
-            txn.abort(e);
-            failStartLogSegment(promise, bestEffort, e);
-            return;
-        }
-        final String inprogressZnodePath = inprogressZNode(
-                entryWriter.getLogSegmentId(), txId, logSegmentSeqNo);
-        final LogSegmentMetadata l =
-            new 
-                conf.getDLLedgerMetadataLayoutVersion(), 
entryWriter.getLogSegmentId(), txId)
-                    .setLogSegmentSequenceNo(logSegmentSeqNo)
-                    .setRegionId(regionId)
-                    .setEnvelopeEntries(
-                    .build();
-        // Create an inprogress segment
-        writeLogSegment(txn, l);
-        // Try storing max sequence number.
-        LOG.debug("Try storing max sequence number in startLogSegment {} : 
{}", inprogressZnodePath, logSegmentSeqNo);
-        storeMaxSequenceNumber(txn, maxLogSegmentSequenceNo, logSegmentSeqNo, 
-        // Try storing max tx id.
-        LOG.debug("Try storing MaxTxId in startLogSegment  {} {}", 
inprogressZnodePath, txId);
-        storeMaxTxId(txn, maxTxId, txId);
-        txn.execute().addEventListener(FutureEventListenerRunnable.of(new 
FutureEventListener<Void>() {
-            @Override
-            public void onSuccess(Void value) {
-                try {
-                    FutureUtils.setValue(promise, new BKLogSegmentWriter(
-                            getFullyQualifiedName(),
-                            l.getSegmentName(),
-                            conf,
-                            conf.getDLLedgerMetadataLayoutVersion(),
-                            entryWriter,
-                            lock,
-                            txId,
-                            logSegmentSeqNo,
-                            scheduler,
-                            statsLogger,
-                            perLogStatsLogger,
-                            alertStatsLogger,
-                            writeLimiter,
-                            featureProvider,
-                            dynConf));
-                } catch (IOException ioe) {
-                    failStartLogSegment(promise, false, ioe);
-                }
-            }
-            @Override
-            public void onFailure(Throwable cause) {
-                failStartLogSegment(promise, false, cause);
-            }
-        }, scheduler));
-    }
-    boolean shouldStartNewSegment(BKLogSegmentWriter writer) {
-        return rollingPolicy.shouldRollover(writer, 
-    }
-    /**
-     * Finalize a log segment. If the journal manager is currently
-     * writing to a ledger, ensure that this is the ledger of the log segment
-     * being finalized.
-     * <p/>
-     * Otherwise this is the recovery case. In the recovery case, ensure that
-     * the firstTxId of the ledger matches firstTxId for the segment we are
-     * trying to finalize.
-     */
-    Future<LogSegmentMetadata> completeAndCloseLogSegment(final 
BKLogSegmentWriter writer) {
-        final Promise<LogSegmentMetadata> promise = new 
-        completeAndCloseLogSegment(writer, promise);
-        return promise;
-    }
-    private void completeAndCloseLogSegment(final BKLogSegmentWriter writer,
-                                            final Promise<LogSegmentMetadata> 
promise) {
-        writer.asyncClose().addEventListener(new FutureEventListener<Void>() {
-            @Override
-            public void onSuccess(Void value) {
-                // in theory closeToFinalize should throw exception if a 
stream is in error.
-                // just in case, add another checking here to make sure we 
don't close log segment is a stream is in error.
-                if (writer.shouldFailCompleteLogSegment()) {
-                    FutureUtils.setException(promise,
-                            new IOException("LogSegmentWriter for " + 
writer.getFullyQualifiedLogSegment() + " is already in error."));
-                    return;
-                }
-                doCompleteAndCloseLogSegment(
-                        inprogressZNodeName(writer.getLogSegmentId(), 
writer.getStartTxId(), writer.getLogSegmentSequenceNumber()),
-                        writer.getLogSegmentSequenceNumber(),
-                        writer.getLogSegmentId(),
-                        writer.getStartTxId(),
-                        writer.getLastTxId(),
-                        writer.getPositionWithinLogSegment(),
-                        writer.getLastDLSN().getEntryId(),
-                        writer.getLastDLSN().getSlotId(),
-                        promise);
-            }
-            @Override
-            public void onFailure(Throwable cause) {
-                FutureUtils.setException(promise, cause);
-            }
-        });
-    }
-    @VisibleForTesting
-    LogSegmentMetadata completeAndCloseLogSegment(long logSegmentSeqNo,
-                                                  long logSegmentId,
-                                                  long firstTxId,
-                                                  long lastTxId,
-                                                  int recordCount)
-        throws IOException {
-        return completeAndCloseLogSegment(inprogressZNodeName(logSegmentId, 
firstTxId, logSegmentSeqNo), logSegmentSeqNo,
-            logSegmentId, firstTxId, lastTxId, recordCount, -1, -1);
-    }
-    /**
-     * Finalize a log segment. If the journal manager is currently
-     * writing to a ledger, ensure that this is the ledger of the log segment
-     * being finalized.
-     * <p/>
-     * Otherwise this is the recovery case. In the recovery case, ensure that
-     * the firstTxId of the ledger matches firstTxId for the segment we are
-     * trying to finalize.
-     */
-    LogSegmentMetadata completeAndCloseLogSegment(String inprogressZnodeName, 
long logSegmentSeqNo,
-                                                  long logSegmentId, long 
firstTxId, long lastTxId,
-                                                  int recordCount, long 
lastEntryId, long lastSlotId)
-            throws IOException {
-        Stopwatch stopwatch = Stopwatch.createStarted();
-        boolean success = false;
-        try {
-            LogSegmentMetadata completedLogSegment =
-                    doCompleteAndCloseLogSegment(inprogressZnodeName, 
-                            logSegmentId, firstTxId, lastTxId, recordCount,
-                            lastEntryId, lastSlotId);
-            success = true;
-            return completedLogSegment;
-        } finally {
-            if (success) {
-            } else {
-            }
-        }
-    }
-    protected long computeStartSequenceId(LogSegmentMetadata segment) throws 
IOException {
-        if (!segment.isInProgress()) {
-            return segment.getStartSequenceId();
-        }
-        long startSequenceId = DistributedLogConstants.UNASSIGNED_SEQUENCE_ID;
-        // we only record sequence id when both write version and logsegment's 
version support sequence id
-        if 
-                && segment.supportsSequenceId()) {
-            List<LogSegmentMetadata> logSegmentDescList =
-                    getCachedLogSegments(LogSegmentMetadata.DESC_COMPARATOR);
-            startSequenceId = 
DLUtils.computeStartSequenceId(logSegmentDescList, segment);
-        }
-        return startSequenceId;
-    }
-    /**
-     * Close log segment
-     *
-     * @param inprogressZnodeName
-     * @param logSegmentSeqNo
-     * @param logSegmentId
-     * @param firstTxId
-     * @param lastTxId
-     * @param recordCount
-     * @param lastEntryId
-     * @param lastSlotId
-     * @throws IOException
-     */
-    protected LogSegmentMetadata doCompleteAndCloseLogSegment(
-            String inprogressZnodeName,
-            long logSegmentSeqNo,
-            long logSegmentId,
-            long firstTxId,
-            long lastTxId,
-            int recordCount,
-            long lastEntryId,
-            long lastSlotId) throws IOException {
-        Promise<LogSegmentMetadata> promise = new 
-        doCompleteAndCloseLogSegment(
-                inprogressZnodeName,
-                logSegmentSeqNo,
-                logSegmentId,
-                firstTxId,
-                lastTxId,
-                recordCount,
-                lastEntryId,
-                lastSlotId,
-                promise);
-        return FutureUtils.result(promise);
-    }
-    protected void doCompleteAndCloseLogSegment(final String 
-                                                final long logSegmentSeqNo,
-                                                final long logSegmentId,
-                                                final long firstTxId,
-                                                final long lastTxId,
-                                                final int recordCount,
-                                                final long lastEntryId,
-                                                final long lastSlotId,
-                                                final 
Promise<LogSegmentMetadata> promise) {
-        fetchForWrite.addEventListener(new 
FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
-            @Override
-            public void onFailure(Throwable cause) {
-                FutureUtils.setException(promise, cause);
-            }
-            @Override
-            public void onSuccess(Versioned<List<LogSegmentMetadata>> 
segments) {
-                doCompleteAndCloseLogSegmentAfterLogSegmentListFetched(
-                        inprogressZnodeName,
-                        logSegmentSeqNo,
-                        logSegmentId,
-                        firstTxId,
-                        lastTxId,
-                        recordCount,
-                        lastEntryId,
-                        lastSlotId,
-                        promise);
-            }
-        });
-    }
-    private void doCompleteAndCloseLogSegmentAfterLogSegmentListFetched(
-            final String inprogressZnodeName,
-            long logSegmentSeqNo,
-            long logSegmentId,
-            long firstTxId,
-            long lastTxId,
-            int recordCount,
-            long lastEntryId,
-            long lastSlotId,
-            final Promise<LogSegmentMetadata> promise) {
-        try {
-            lock.checkOwnershipAndReacquire();
-        } catch (IOException ioe) {
-            FutureUtils.setException(promise, ioe);
-            return;
-        }
-        LOG.debug("Completing and Closing Log Segment {} {}", firstTxId, 
-        LogSegmentMetadata inprogressLogSegment = 
-        // validate log segment
-        if (inprogressLogSegment.getLogSegmentId() != logSegmentId) {
-            FutureUtils.setException(promise, new IOException(
-                "Active ledger has different ID to inprogress. "
-                    + inprogressLogSegment.getLogSegmentId() + " found, "
-                    + logSegmentId + " expected"));
-            return;
-        }
-        // validate the transaction id
-        if (inprogressLogSegment.getFirstTxId() != firstTxId) {
-            FutureUtils.setException(promise, new IOException("Transaction id 
not as expected, "
-                + inprogressLogSegment.getFirstTxId() + " found, " + firstTxId 
+ " expected"));
-            return;
-        }
-        // validate the log sequence number
-        if (validateLogSegmentSequenceNumber) {
-            synchronized (inprogressLSSNs) {
-                if (inprogressLSSNs.isEmpty()) {
-                    FutureUtils.setException(promise, new UnexpectedException(
-                            "Didn't find matched inprogress log segments when 
completing inprogress "
-                                    + inprogressLogSegment));
-                    return;
-                }
-                long leastInprogressLSSN = inprogressLSSNs.getFirst();
-                // the log segment sequence number in metadata {@link 
-                // should be same as the sequence number we are completing 
-                // and
-                // it should also be same as the least inprogress log segment 
sequence number tracked in {@link inprogressLSSNs}
-                if ((inprogressLogSegment.getLogSegmentSequenceNumber() != 
logSegmentSeqNo) ||
-                        (leastInprogressLSSN != logSegmentSeqNo)) {
-                    FutureUtils.setException(promise, new UnexpectedException(
-                            "Didn't find matched inprogress log segments when 
completing inprogress "
-                                    + inprogressLogSegment));
-                    return;
-                }
-            }
-        }
-        // store max sequence number.
-        long maxSeqNo= Math.max(logSegmentSeqNo, 
-        if (maxLogSegmentSequenceNo.getSequenceNumber() == logSegmentSeqNo ||
-                (maxLogSegmentSequenceNo.getSequenceNumber() == 
logSegmentSeqNo + 1)) {
-            // ignore the case that a new inprogress log segment is 
-            // before completing current inprogress one
-  "Try storing max sequence number {} in completing {}.",
-                    new Object[] { logSegmentSeqNo, 
inprogressLogSegment.getZkPath() });
-        } else {
-            LOG.warn("Unexpected max ledger sequence number {} found while 
completing log segment {} for {}",
-                    new Object[] { 
maxLogSegmentSequenceNo.getSequenceNumber(), logSegmentSeqNo, 
getFullyQualifiedName() });
-            if (validateLogSegmentSequenceNumber) {
-                FutureUtils.setException(promise, new 
DLIllegalStateException("Unexpected max log segment sequence number "
-                        + maxLogSegmentSequenceNo.getSequenceNumber() + " for 
" + getFullyQualifiedName()
-                        + ", expected " + (logSegmentSeqNo - 1)));
-                return;
-            }
-        }
-        // Prepare the completion
-        final String pathForCompletedLedger = completedLedgerZNode(firstTxId, 
lastTxId, logSegmentSeqNo);
-        long startSequenceId;
-        try {
-            startSequenceId = computeStartSequenceId(inprogressLogSegment);
-        } catch (IOException ioe) {
-            FutureUtils.setException(promise, ioe);
-            return;
-        }
-        // write completed ledger znode
-        final LogSegmentMetadata completedLogSegment =
-                inprogressLogSegment.completeLogSegment(
-                        pathForCompletedLedger,
-                        lastTxId,
-                        recordCount,
-                        lastEntryId,
-                        lastSlotId,
-                        startSequenceId);
-        // prepare the transaction
-        Transaction<Object> txn = streamMetadataStore.newTransaction();
-        // create completed log segment
-        writeLogSegment(txn, completedLogSegment);
-        // delete inprogress log segment
-        deleteLogSegment(txn, inprogressLogSegment);
-        // store max sequence number
-        storeMaxSequenceNumber(txn, maxLogSegmentSequenceNo, maxSeqNo, false);
-        // update max txn id.
-        LOG.debug("Trying storing LastTxId in Finalize Path {} LastTxId {}", 
pathForCompletedLedger, lastTxId);
-        storeMaxTxId(txn, maxTxId, lastTxId);
-        txn.execute().addEventListener(FutureEventListenerRunnable.of(new 
FutureEventListener<Void>() {
-            @Override
-            public void onSuccess(Void value) {
-      "Completed {} to {} for {} : {}",
-                        new Object[] { inprogressZnodeName, 
-                                getFullyQualifiedName(), completedLogSegment 
-                FutureUtils.setValue(promise, completedLogSegment);
-            }
-            @Override
-            public void onFailure(Throwable cause) {
-                FutureUtils.setException(promise, cause);
-            }
-        }, scheduler));
-    }
-    public Future<Long> recoverIncompleteLogSegments() {
-        try {
-        } catch (IOException ioe) {
-            return Future.exception(ioe);
-        }
-        return 
-    }
-    class RecoverLogSegmentFunction extends Function<LogSegmentMetadata, 
Future<LogSegmentMetadata>> {
-        @Override
-        public Future<LogSegmentMetadata> apply(final LogSegmentMetadata l) {
-            if (!l.isInProgress()) {
-                return Future.value(l);
-            }
-  "Recovering last record in log segment {} for {}.", l, 
-            return asyncReadLastRecord(l, true, true, true).flatMap(
-                    new AbstractFunction1<LogRecordWithDLSN, 
Future<LogSegmentMetadata>>() {
-                        @Override
-                        public Future<LogSegmentMetadata> 
apply(LogRecordWithDLSN lastRecord) {
-                            return completeLogSegment(l, lastRecord);
-                        }
-                    });
-        }
-        private Future<LogSegmentMetadata> 
completeLogSegment(LogSegmentMetadata l,
LogRecordWithDLSN lastRecord) {
-  "Recovered last record in log segment {} for {}.", l, 
-            long endTxId = DistributedLogConstants.EMPTY_LOGSEGMENT_TX_ID;
-            int recordCount = 0;
-            long lastEntryId = -1;
-            long lastSlotId = -1;
-            if (null != lastRecord) {
-                endTxId = lastRecord.getTransactionId();
-                recordCount = lastRecord.getLastPositionWithinLogSegment();
-                lastEntryId = lastRecord.getDlsn().getEntryId();
-                lastSlotId = lastRecord.getDlsn().getSlotId();
-            }
-            if (endTxId == DistributedLogConstants.INVALID_TXID) {
-                LOG.error("Unrecoverable corruption has occurred in segment "
-                    + l.toString() + " at path " + l.getZkPath()
-                    + ". Unable to continue recovery.");
-                return Future.exception(new IOException("Unrecoverable 
-                    + " please check logs."));
-            } else if (endTxId == 
DistributedLogConstants.EMPTY_LOGSEGMENT_TX_ID) {
-                // TODO: Empty ledger - Ideally we should just remove it?
-                endTxId = l.getFirstTxId();
-            }
-            Promise<LogSegmentMetadata> promise = new 
-            doCompleteAndCloseLogSegment(
-                    l.getZNodeName(),
-                    l.getLogSegmentSequenceNumber(),
-                    l.getLogSegmentId(),
-                    l.getFirstTxId(),
-                    endTxId,
-                    recordCount,
-                    lastEntryId,
-                    lastSlotId,
-                    promise);
-            return promise;
-        }
-    }
-    Future<List<LogSegmentMetadata>> 
setLogSegmentsOlderThanDLSNTruncated(final DLSN dlsn) {
-        if (DLSN.InvalidDLSN == dlsn) {
-            List<LogSegmentMetadata> emptyList = new 
-            return Future.value(emptyList);
-        }
-        return 
-                new AbstractFunction1<List<LogSegmentMetadata>, 
Future<List<LogSegmentMetadata>>>() {
-                    @Override
-                    public Future<List<LogSegmentMetadata>> 
apply(List<LogSegmentMetadata> logSegments) {
-                        return 
setLogSegmentsOlderThanDLSNTruncated(logSegments, dlsn);
-                    }
-                });
-    }
-    private Future<List<LogSegmentMetadata>> 
setLogSegmentsOlderThanDLSNTruncated(List<LogSegmentMetadata> logSegments,
   final DLSN dlsn) {
-        LOG.debug("Setting truncation status on logs older than {} from {} for 
-                new Object[]{dlsn, logSegments, getFullyQualifiedName()});
-        List<LogSegmentMetadata> truncateList = new 
-        LogSegmentMetadata partialTruncate = null;
-"{}: Truncating log segments older than {}", 
getFullyQualifiedName(), dlsn);
-        for (int i = 0; i < logSegments.size(); i++) {
-            LogSegmentMetadata l = logSegments.get(i);
-            if (!l.isInProgress()) {
-                if (l.getLastDLSN().compareTo(dlsn) < 0) {
-                    LOG.debug("{}: Truncating log segment {} ", 
getFullyQualifiedName(), l);
-                    truncateList.add(l);
-                } else if (l.getFirstDLSN().compareTo(dlsn) < 0) {
-                    // Can be satisfied by at most one segment
-                    if (null != partialTruncate) {
-                        String logMsg = String.format("Potential metadata 
inconsistency for stream %s at segment %s", getFullyQualifiedName(), l);
-                        LOG.error(logMsg);
-                        return Future.exception(new 
-                    }
-          "{}: Partially truncating log segment {} older 
than {}.", new Object[] {getFullyQualifiedName(), l, dlsn});
-                    partialTruncate = l;
-                } else {
-                    break;
-                }
-            } else {
-                break;
-            }
-        }
-        return setLogSegmentTruncationStatus(truncateList, partialTruncate, 
-    }
-    private int getNumCandidateLogSegmentsToPurge(List<LogSegmentMetadata> 
logSegments) {
-        if (logSegments.isEmpty()) {
-            return 0;
-        } else {
-            // we have to keep at least one completed log segment for sequence 
-            int numCandidateLogSegments = 0;
-            for (LogSegmentMetadata segment : logSegments) {
-                if (segment.isInProgress()) {
-                    break;
-                } else {
-                    ++numCandidateLogSegments;
-                }
-            }
-            return numCandidateLogSegments - 1;
-        }
-    }
-    Future<List<LogSegmentMetadata>> purgeLogSegmentsOlderThanTimestamp(final 
long minTimestampToKeep) {
-        if (minTimestampToKeep >= Utils.nowInMillis()) {
-            return Future.exception(new IllegalArgumentException(
-                    "Invalid timestamp " + minTimestampToKeep + " to purge 
logs for " + getFullyQualifiedName()));
-        }
-        return 
-                new Function<List<LogSegmentMetadata>, 
Future<List<LogSegmentMetadata>>>() {
-            @Override
-            public Future<List<LogSegmentMetadata>> 
apply(List<LogSegmentMetadata> logSegments) {
-                List<LogSegmentMetadata> purgeList = new 
-                int numCandidates = 
-                for (int iterator = 0; iterator < numCandidates; iterator++) {
-                    LogSegmentMetadata l = logSegments.get(iterator);
-                    // When application explicitly truncates segments; 
timestamp based purge is
-                    // only used to cleanup log segments that have been marked 
for truncation
-                    if ((l.isTruncated() || 
!conf.getExplicitTruncationByApplication()) &&
-                        !l.isInProgress() && (l.getCompletionTime() < 
minTimestampToKeep)) {
-                        purgeList.add(l);
-                    } else {
-                        // stop truncating log segments if we find either an 
inprogress or a partially
-                        // truncated log segment
-                        break;
-                    }
-                }
-      "Deleting log segments older than {} for {} : {}",
-                        new Object[] { minTimestampToKeep, 
getFullyQualifiedName(), purgeList });
-                return deleteLogSegments(purgeList);
-            }
-        });
-    }
-    Future<List<LogSegmentMetadata>> purgeLogSegmentsOlderThanTxnId(final long 
minTxIdToKeep) {
-        return 
-            new AbstractFunction1<List<LogSegmentMetadata>, 
Future<List<LogSegmentMetadata>>>() {
-                @Override
-                public Future<List<LogSegmentMetadata>> 
apply(List<LogSegmentMetadata> logSegments) {
-                    int numLogSegmentsToProcess;
-                    if (minTxIdToKeep < 0) {
-                        // we are deleting the log, we can remove whole log 
-                        numLogSegmentsToProcess = logSegments.size();
-                    } else {
-                        numLogSegmentsToProcess = 
-                    }
-                    List<LogSegmentMetadata> purgeList = 
-                    for (int iterator = 0; iterator < numLogSegmentsToProcess; 
iterator++) {
-                        LogSegmentMetadata l = logSegments.get(iterator);
-                        if ((minTxIdToKeep < 0) ||
-                            ((l.isTruncated() || 
!conf.getExplicitTruncationByApplication()) &&
-                            !l.isInProgress() && (l.getLastTxId() < 
minTxIdToKeep))) {
-                            purgeList.add(l);
-                        } else {
-                            // stop truncating log segments if we find either 
an inprogress or a partially
-                            // truncated log segment
-                            break;
-                        }
-                    }
-                    return deleteLogSegments(purgeList);
-                }
-            });
-    }
-    private Future<List<LogSegmentMetadata>> setLogSegmentTruncationStatus(
-            final List<LogSegmentMetadata> truncateList,
-            LogSegmentMetadata partialTruncate,
-            DLSN minActiveDLSN) {
-        final List<LogSegmentMetadata> listToTruncate = 
Lists.newArrayListWithCapacity(truncateList.size() + 1);
-        final List<LogSegmentMetadata> listAfterTruncated = 
Lists.newArrayListWithCapacity(truncateList.size() + 1);
-        Transaction<Object> updateTxn = metadataUpdater.transaction();
-        for(LogSegmentMetadata l : truncateList) {
-            if (!l.isTruncated()) {
-                LogSegmentMetadata newSegment = 
metadataUpdater.setLogSegmentTruncated(updateTxn, l);
-                listToTruncate.add(l);
-                listAfterTruncated.add(newSegment);
-            }
-        }
-        if (null != partialTruncate && (partialTruncate.isNonTruncated() ||
-                (partialTruncate.isPartiallyTruncated() && 
(partialTruncate.getMinActiveDLSN().compareTo(minActiveDLSN) < 0)))) {
-            LogSegmentMetadata newSegment = 
-                    updateTxn, partialTruncate, minActiveDLSN);
-            listToTruncate.add(partialTruncate);
-            listAfterTruncated.add(newSegment);
-        }
-        return updateTxn.execute().map(new AbstractFunction1<Void, 
List<LogSegmentMetadata>>() {
-            @Override
-            public List<LogSegmentMetadata> apply(Void value) {
-                for (int i = 0; i < listToTruncate.size(); i++) {
-                    LogSegmentMetadata newSegment = listAfterTruncated.get(i);
-                    addLogSegmentToCache(newSegment.getSegmentName(), 
-                }
-                return listAfterTruncated;
-            }
-        });
-    }
-    private Future<List<LogSegmentMetadata>> deleteLogSegments(
-            final List<LogSegmentMetadata> logs) {
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("Purging logs for {} : {}", getFullyQualifiedName(), 
-        }
-        return FutureUtils.processList(logs,
-                new Function<LogSegmentMetadata, Future<LogSegmentMetadata>>() 
-            @Override
-            public Future<LogSegmentMetadata> apply(LogSegmentMetadata 
segment) {
-                return deleteLogSegment(segment);
-            }
-        }, scheduler);
-    }
-    private Future<LogSegmentMetadata> deleteLogSegment(
-            final LogSegmentMetadata ledgerMetadata) {
-"Deleting ledger {} for {}", ledgerMetadata, 
-        final Promise<LogSegmentMetadata> promise = new 
-        final Stopwatch stopwatch = Stopwatch.createStarted();
-        promise.addEventListener(new FutureEventListener<LogSegmentMetadata>() 
-            @Override
-            public void onSuccess(LogSegmentMetadata segment) {
-            }
-            @Override
-            public void onFailure(Throwable cause) {
-            }
-        });
-        entryStore.deleteLogSegment(ledgerMetadata)
-                .addEventListener(new 
FutureEventListener<LogSegmentMetadata>() {
-            @Override
-            public void onFailure(Throwable cause) {
-                FutureUtils.setException(promise, cause);
-            }
-            @Override
-            public void onSuccess(LogSegmentMetadata segment) {
-                deleteLogSegmentMetadata(segment, promise);
-            }
-        });
-        return promise;
-    }
-    private void deleteLogSegmentMetadata(final LogSegmentMetadata 
-                                          final Promise<LogSegmentMetadata> 
promise) {
-        Transaction<Object> deleteTxn = metadataStore.transaction();
-        metadataStore.deleteLogSegment(deleteTxn, segmentMetadata, new 
Transaction.OpListener<Void>() {
-            @Override
-            public void onCommit(Void r) {
-                // purge log segment
-                removeLogSegmentFromCache(segmentMetadata.getZNodeName());
-                promise.setValue(segmentMetadata);
-            }
-            @Override
-            public void onAbort(Throwable t) {
-                if (t instanceof LogSegmentNotFoundException) {
-                    // purge log segment
-                    removeLogSegmentFromCache(segmentMetadata.getZNodeName());
-                    promise.setValue(segmentMetadata);
-                    return;
-                } else {
-                    LOG.error("Couldn't purge {} for {}: with error {}",
-                            new Object[]{ segmentMetadata, 
getFullyQualifiedName(), t });
-                    promise.setException(t);
-                }
-            }
-        });
-        deleteTxn.execute();
-    }
-    @Override
-    public Future<Void> asyncClose() {
-        return Utils.closeSequence(scheduler,
-                lock,
-                logSegmentAllocator);
-    }
-    @Override
-    public Future<Void> asyncAbort() {
-        return asyncClose();
-    }
-    String completedLedgerZNodeName(long firstTxId, long lastTxId, long 
logSegmentSeqNo) {
-        if (DistributedLogConstants.LOGSEGMENT_NAME_VERSION == 
conf.getLogSegmentNameVersion()) {
-            return String.format("%s_%018d", 
DistributedLogConstants.COMPLETED_LOGSEGMENT_PREFIX, logSegmentSeqNo);
-        } else {
-            return String.format("%s_%018d_%018d", 
-                    firstTxId, lastTxId);
-        }
-    }
-    /**
-     * Get the znode path for a finalize ledger
-     */
-    String completedLedgerZNode(long firstTxId, long lastTxId, long 
logSegmentSeqNo) {
-        return String.format("%s/%s", logMetadata.getLogSegmentsPath(),
-                completedLedgerZNodeName(firstTxId, lastTxId, 
-    }
-    /**
-     * Get the name of the inprogress znode.
-     *
-     * @return name of the inprogress znode.
-     */
-    String inprogressZNodeName(long logSegmentId, long firstTxId, long 
logSegmentSeqNo) {
-        if (DistributedLogConstants.LOGSEGMENT_NAME_VERSION == 
conf.getLogSegmentNameVersion()) {
-            // Lots of the problems are introduced due to different inprogress 
names with same ledger sequence number.
-            return String.format("%s_%018d", 
DistributedLogConstants.INPROGRESS_LOGSEGMENT_PREFIX, logSegmentSeqNo);
-        } else {
-            return DistributedLogConstants.INPROGRESS_LOGSEGMENT_PREFIX + "_" 
+ Long.toString(firstTxId, 16);
-        }
-    }
-    /**
-     * Get the znode path for the inprogressZNode
-     */
-    String inprogressZNode(long logSegmentId, long firstTxId, long 
logSegmentSeqNo) {
-        return logMetadata.getLogSegmentsPath() + "/" + 
inprogressZNodeName(logSegmentId, firstTxId, logSegmentSeqNo);
-    }
-    String inprogressZNode(String inprogressZNodeName) {
-        return logMetadata.getLogSegmentsPath() + "/" + inprogressZNodeName;
-    }
diff --git 
deleted file mode 100644
index 308f42a..0000000
+++ /dev/null
@@ -1,276 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-import com.twitter.distributedlog.exceptions.EndOfStreamException;
-import com.twitter.distributedlog.exceptions.IdleReaderException;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.Utils;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.versioning.Versioned;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
- * Synchronous Log Reader based on {@link AsyncLogReader}
- */
-class BKSyncLogReader implements LogReader, AsyncNotification {
-    private final BKDistributedLogManager bkdlm;
-    private final BKLogReadHandler readHandler;
-    private final AtomicReference<IOException> readerException =
-            new AtomicReference<IOException>(null);
-    private final int maxReadAheadWaitTime;
-    private Promise<Void> closeFuture;
-    private final Optional<Long> startTransactionId;
-    private boolean positioned = false;
-    private Entry.Reader currentEntry = null;
-    // readahead reader
-    ReadAheadEntryReader readAheadReader = null;
-    // idle reader settings
-    private final boolean shouldCheckIdleReader;
-    private final int idleErrorThresholdMillis;
-    // Stats
-    private final Counter idleReaderError;
-    BKSyncLogReader(DistributedLogConfiguration conf,
-                    BKDistributedLogManager bkdlm,
-                    DLSN startDLSN,
-                    Optional<Long> startTransactionId,
-                    StatsLogger statsLogger) throws IOException {
-        this.bkdlm = bkdlm;
-        this.readHandler = bkdlm.createReadHandler(
-                Optional.<String>absent(),
-                this,
-                true);
-        this.maxReadAheadWaitTime = conf.getReadAheadWaitTime();
-        this.idleErrorThresholdMillis = 
-        this.shouldCheckIdleReader = idleErrorThresholdMillis > 0 && 
idleErrorThresholdMillis < Integer.MAX_VALUE;
-        this.startTransactionId = startTransactionId;
-        // start readahead
-        startReadAhead(startDLSN);
-        if (!startTransactionId.isPresent()) {
-            positioned = true;
-        }
-        // Stats
-        StatsLogger syncReaderStatsLogger = statsLogger.scope("sync_reader");
-        idleReaderError = 
-    }
-    private void startReadAhead(DLSN startDLSN) throws IOException {
-        readAheadReader = new ReadAheadEntryReader(
-                    bkdlm.getStreamName(),
-                    startDLSN,
-                    bkdlm.getConf(),
-                    readHandler,
-                    bkdlm.getReaderEntryStore(),
-                    bkdlm.getScheduler(),
-                    Ticker.systemTicker(),
-                    bkdlm.alertStatsLogger);
-        readHandler.registerListener(readAheadReader);
-        readHandler.asyncStartFetchLogSegments()
-                .map(new 
AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() {
-                    @Override
-                    public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> 
logSegments) {
-                        readAheadReader.start(logSegments.getValue());
-                        return BoxedUnit.UNIT;
-                    }
-                });
-    }
-    @VisibleForTesting
-    ReadAheadEntryReader getReadAheadReader() {
-        return readAheadReader;
-    }
-    @VisibleForTesting
-    BKLogReadHandler getReadHandler() {
-        return readHandler;
-    }
-    private Entry.Reader readNextEntry(boolean nonBlocking) throws IOException 
-        Entry.Reader entry = null;
-        if (nonBlocking) {
-            return readAheadReader.getNextReadAheadEntry(0L, 
-        } else {
-            while (!readAheadReader.isReadAheadCaughtUp()
-                    && null == readerException.get()
-                    && null == entry) {
-                entry = 
-            }
-            if (null != entry) {
-                return entry;
-            }
-            // reader is caught up
-            if (readAheadReader.isReadAheadCaughtUp()
-                    && null == readerException.get()) {
-                entry = 
-            }
-            return entry;
-        }
-    }
-    private void markReaderAsIdle() throws IdleReaderException {
-        IdleReaderException ire = new IdleReaderException("Sync reader on 
stream "
-                + readHandler.getFullyQualifiedName()
-                + " is idle for more than " + idleErrorThresholdMillis + " 
-        readerException.compareAndSet(null, ire);
-        throw ire;
-    }
-    @Override
-    public synchronized LogRecordWithDLSN readNext(boolean nonBlocking)
-            throws IOException {
-        if (null != readerException.get()) {
-            throw readerException.get();
-        }
-        LogRecordWithDLSN record = doReadNext(nonBlocking);
-        // no record is returned, check if the reader becomes idle
-        if (null == record && shouldCheckIdleReader) {
-            if (readAheadReader.getNumCachedEntries() <= 0 &&
-                    readAheadReader.isReaderIdle(idleErrorThresholdMillis, 
-                markReaderAsIdle();
-            }
-        }
-        return record;
-    }
-    private LogRecordWithDLSN doReadNext(boolean nonBlocking) throws 
IOException {
-        LogRecordWithDLSN record = null;
-        do {
-            // fetch one record until we don't find any entry available in the 
readahead cache
-            while (null == record) {
-                if (null == currentEntry) {
-                    currentEntry = readNextEntry(nonBlocking);
-                    if (null == currentEntry) {
-                        return null;
-                    }
-                }
-                record = currentEntry.nextRecord();
-                if (null == record) {
-                    currentEntry = null;
-                }
-            }
-            // check if we reached the end of stream
-            if (record.isEndOfStream()) {
-                EndOfStreamException eos = new EndOfStreamException("End of 
Stream Reached for "
-                        + readHandler.getFullyQualifiedName());
-                readerException.compareAndSet(null, eos);
-                throw eos;
-            }
-            // skip control records
-            if (record.isControl()) {
-                record = null;
-                continue;
-            }
-            if (!positioned) {
-                if (record.getTransactionId() < startTransactionId.get()) {
-                    record = null;
-                    continue;
-                } else {
-                    positioned = true;
-                    break;
-                }
-            } else {
-                break;
-            }
-        } while (true);
-        return record;
-    }
-    @Override
-    public synchronized List<LogRecordWithDLSN> readBulk(boolean nonBlocking, 
int numLogRecords)
-            throws IOException {
-        LinkedList<LogRecordWithDLSN> retList =
-                new LinkedList<LogRecordWithDLSN>();
-        int numRead = 0;
-        LogRecordWithDLSN record = readNext(nonBlocking);
-        while ((null != record)) {
-            retList.add(record);
-            numRead++;
-            if (numRead >= numLogRecords) {
-                break;
-            }
-            record = readNext(nonBlocking);
-        }
-        return retList;
-    }
-    @Override
-    public Future<Void> asyncClose() {
-        Promise<Void> closePromise;
-        synchronized (this) {
-            if (null != closeFuture) {
-                return closeFuture;
-            }
-            closeFuture = closePromise = new Promise<Void>();
-        }
-        readHandler.unregisterListener(readAheadReader);
-        readAheadReader.removeStateChangeNotification(this);
-        Utils.closeSequence(bkdlm.getScheduler(), true,
-                readAheadReader,
-                readHandler
-        ).proxyTo(closePromise);
-        return closePromise;
-    }
-    @Override
-    public void close() throws IOException {
-        FutureUtils.result(asyncClose());
-    }
-    //
-    // Notification From ReadHandler
-    //
-    @Override
-    public void notifyOnError(Throwable cause) {
-        if (cause instanceof IOException) {
-            readerException.compareAndSet(null, (IOException) cause);
-        } else {
-            readerException.compareAndSet(null, new IOException(cause));
-        }
-    }
-    @Override
-    public void notifyOnOperationComplete() {
-        // no-op
-    }
diff --git 
deleted file mode 100644
index b638020..0000000
+++ /dev/null
@@ -1,113 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
-import com.twitter.distributedlog.util.FutureUtils;
-import java.util.List;
-class BKSyncLogWriter extends BKAbstractLogWriter implements LogWriter {
-    public BKSyncLogWriter(DistributedLogConfiguration conf,
-                           DynamicDistributedLogConfiguration dynConf,
-                           BKDistributedLogManager bkdlm) {
-        super(conf, dynConf, bkdlm);
-    }
-    /**
-     * Write log records to the stream.
-     *
-     * @param record operation
-     */
-    @Override
-    public void write(LogRecord record) throws IOException {
-        getLedgerWriter(record.getTransactionId(), false).write(record);
-    }
-    /**
-     * Write edits logs operation to the stream.
-     *
-     * @param records list of records
-     */
-    @Override
-    @Deprecated
-    public int writeBulk(List<LogRecord> records) throws IOException {
-        return getLedgerWriter(records.get(0).getTransactionId(), 
-    }
-    /**
-     * Flushes all the data up to this point,
-     * adds the end of stream marker and marks the stream
-     * as read-only in the metadata. No appends to the
-     * stream will be allowed after this point
-     */
-    @Override
-    public void markEndOfStream() throws IOException {
-        FutureUtils.result(getLedgerWriter(DistributedLogConstants.MAX_TXID, 
-        closeAndComplete();
-    }
-    /**
-     * All data that has been written to the stream so far will be flushed.
-     * New data can be still written to the stream while flush is ongoing.
-     */
-    @Override
-    public long setReadyToFlush() throws IOException {
-        checkClosedOrInError("setReadyToFlush");
-        long highestTransactionId = 0;
-        BKLogSegmentWriter writer = getCachedLogWriter();
-        if (null != writer) {
-            highestTransactionId = Math.max(highestTransactionId, 
-        }
-        return highestTransactionId;
-    }
-    /**
-     * Commit data that is already flushed.
-     * <p/>
-     * This API is optional as the writer implements a policy for 
automatically syncing
-     * the log records in the buffer. The buffered edits can be flushed when 
the buffer
-     * becomes full or a certain period of time is elapsed.
-     */
-    @Override
-    public long flushAndSync() throws IOException {
-        checkClosedOrInError("flushAndSync");
-        LOG.debug("FlushAndSync Started");
-        long highestTransactionId = 0;
-        BKLogSegmentWriter writer = getCachedLogWriter();
-        if (null != writer) {
-            highestTransactionId = Math.max(highestTransactionId, 
-            LOG.debug("FlushAndSync Completed");
-        } else {
-            LOG.debug("FlushAndSync Completed - Nothing to Flush");
-        }
-        return highestTransactionId;
-    }
-    /**
-     * Close the stream without necessarily flushing immediately.
-     * This may be called if the stream is in error such as after a
-     * previous write or close threw an exception.
-     */
-    @Override
-    public void abort() throws IOException {
-        super.abort();
-    }
diff --git 
deleted file mode 100644
index 4586602..0000000
+++ /dev/null
@@ -1,90 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import java.util.concurrent.TimeUnit;
-class BKTransmitPacket {
-    private final EntryBuffer recordSet;
-    private final long transmitTime;
-    private final Promise<Integer> transmitComplete;
-    BKTransmitPacket(EntryBuffer recordSet) {
-        this.recordSet = recordSet;
-        this.transmitTime = System.nanoTime();
-        this.transmitComplete = new Promise<Integer>();
-    }
-    EntryBuffer getRecordSet() {
-        return recordSet;
-    }
-    Promise<Integer> getTransmitFuture() {
-        return transmitComplete;
-    }
-    /**
-     * Complete the transmit with result code <code>transmitRc</code>.
-     * <p>It would notify all the waiters that are waiting via {@link 
#awaitTransmitComplete(long, TimeUnit)}
-     * or {@link #addTransmitCompleteListener(FutureEventListener)}.
-     *
-     * @param transmitResult
-     *          transmit result code.
-     */
-    public void notifyTransmitComplete(int transmitResult) {
-        transmitComplete.setValue(transmitResult);
-    }
-    /**
-     * Register a transmit complete listener.
-     * <p>The listener will be triggered with transmit result when transmit 
-     * The method should be non-blocking.
-     *
-     * @param transmitCompleteListener
-     *          listener on transmit completion
-     * @see #awaitTransmitComplete(long, TimeUnit)
-     */
-    void addTransmitCompleteListener(FutureEventListener<Integer> 
transmitCompleteListener) {
-        transmitComplete.addEventListener(transmitCompleteListener);
-    }
-    /**
-     * Await for the transmit to be complete
-     *
-     * @param timeout
-     *          wait timeout
-     * @param unit
-     *          wait timeout unit
-     */
-    int awaitTransmitComplete(long timeout, TimeUnit unit)
-        throws Exception {
-        return Await.result(transmitComplete,
-                Duration.fromTimeUnit(timeout, unit));
-    }
-    public long getTransmitTime() {
-        return transmitTime;
-    }

Reply via email to