http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
deleted file mode 100644
index df64505..0000000
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
+++ /dev/null
@@ -1,1348 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Stopwatch;
-import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
-import com.twitter.distributedlog.exceptions.BKTransmitException;
-import com.twitter.distributedlog.exceptions.EndOfStreamException;
-import com.twitter.distributedlog.exceptions.FlushException;
-import com.twitter.distributedlog.exceptions.LockingException;
-import com.twitter.distributedlog.exceptions.LogRecordTooLongException;
-import com.twitter.distributedlog.exceptions.TransactionIdOutOfOrderException;
-import com.twitter.distributedlog.exceptions.WriteCancelledException;
-import com.twitter.distributedlog.exceptions.WriteException;
-import com.twitter.distributedlog.exceptions.InvalidEnvelopedEntryException;
-import com.twitter.distributedlog.feature.CoreFeatureKeys;
-import com.twitter.distributedlog.injector.FailureInjector;
-import com.twitter.distributedlog.injector.RandomDelayFailureInjector;
-import com.twitter.distributedlog.io.Buffer;
-import com.twitter.distributedlog.io.CompressionCodec;
-import com.twitter.distributedlog.io.CompressionUtils;
-import com.twitter.distributedlog.lock.DistributedLock;
-import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter;
-import com.twitter.distributedlog.logsegment.LogSegmentWriter;
-import com.twitter.distributedlog.stats.BroadCastStatsLogger;
-import com.twitter.distributedlog.stats.OpStatsListener;
-import com.twitter.distributedlog.util.FailpointUtils;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.OrderedScheduler;
-import com.twitter.distributedlog.util.PermitLimiter;
-import com.twitter.distributedlog.util.SafeQueueingFuturePool;
-import com.twitter.distributedlog.util.SimplePermitLimiter;
-import com.twitter.distributedlog.util.Sizable;
-import com.twitter.util.Function0;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.FuturePool;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
-import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.feature.FeatureProvider;
-import org.apache.bookkeeper.stats.AlertStatsLogger;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.util.MathUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static com.twitter.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
-import static com.twitter.distributedlog.LogRecord.MAX_LOGRECORDSET_SIZE;
-
-/**
- * BookKeeper Based Log Segment Writer.
- *
- * Multiple log records are packed into a single bookkeeper
- * entry before sending it over the network. The fact that the log record 
entries
- * are complete in the bookkeeper entries means that each bookkeeper log entry
- * can be read as a complete edit log. This is useful for reading, as we don't
- * need to read through the entire log segment to get the last written entry.
- *
- * <h3>Metrics</h3>
- *
- * <ul>
- * <li> flush/periodic/{success,miss}: counters for periodic flushes.
- * <li> data/{success,miss}: counters for data transmits.
- * <li> transmit/packetsize: opstats. characteristics of packet size for 
transmits.
- * <li> control/success: counter of success transmit of control records
- * <li> seg_writer/write: opstats. latency characteristics of write operations 
in segment writer.
- * <li> seg_writer/add_complete/{callback,queued,deferred}: opstats. latency 
components of add completions.
- * <li> seg_writer/pendings: counter. the number of records pending by the 
segment writers.
- * <li> transmit/outstanding/requests: per stream gauge. the number of 
outstanding transmits each stream.
- * </ul>
- */
-class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, 
Sizable {
-    static final Logger LOG = 
LoggerFactory.getLogger(BKLogSegmentWriter.class);
-
-    private final String fullyQualifiedLogSegment;
-    private final String streamName;
-    private final int logSegmentMetadataVersion;
-    private BKTransmitPacket packetPrevious;
-    private Entry.Writer recordSetWriter;
-    private final AtomicInteger outstandingTransmits;
-    private final int transmissionThreshold;
-    protected final LogSegmentEntryWriter entryWriter;
-    private final CompressionCodec.Type compressionType;
-    private final ReentrantLock transmitLock = new ReentrantLock();
-    private final AtomicInteger transmitResult
-        = new AtomicInteger(BKException.Code.OK);
-    private final DistributedLock lock;
-    private final boolean isDurableWriteEnabled;
-    private DLSN lastDLSN = DLSN.InvalidDLSN;
-    private final long startTxId;
-    private long lastTxId = DistributedLogConstants.INVALID_TXID;
-    private long lastTxIdAcknowledged = DistributedLogConstants.INVALID_TXID;
-    private long outstandingBytes = 0;
-    private long numFlushesSinceRestart = 0;
-    private long numBytes = 0;
-    private long lastEntryId = Long.MIN_VALUE;
-    private long lastTransmitNanos = Long.MIN_VALUE;
-    private final int periodicKeepAliveMs;
-
-    // Indicates whether there are writes that have been successfully 
transmitted that would need
-    // a control record to be transmitted to make them visible to the readers 
by updating the last
-    // add confirmed
-    volatile private boolean controlFlushNeeded = false;
-    private boolean immediateFlushEnabled = false;
-    private int minDelayBetweenImmediateFlushMs = 0;
-    private Stopwatch lastTransmit;
-    private boolean streamEnded = false;
-    private final ScheduledFuture<?> periodicFlushSchedule;
-    private final ScheduledFuture<?> periodicKeepAliveSchedule;
-    final private AtomicReference<ScheduledFuture<?>> transmitSchedFutureRef = 
new AtomicReference<ScheduledFuture<?>>(null);
-    final private AtomicReference<ScheduledFuture<?>> immFlushSchedFutureRef = 
new AtomicReference<ScheduledFuture<?>>(null);
-    final private AtomicReference<Exception> scheduledFlushException = new 
AtomicReference<Exception>(null);
-    private boolean enforceLock = true;
-    private Promise<Void> closeFuture = null;
-    private final boolean enableRecordCounts;
-    private int positionWithinLogSegment = 0;
-    private final long logSegmentSequenceNumber;
-    // Used only for values that *could* change (e.g. buffer size etc.)
-    private final DistributedLogConfiguration conf;
-    private final OrderedScheduler scheduler;
-
-    // stats
-    private final StatsLogger envelopeStatsLogger;
-    private final StatsLogger transmitOutstandingLogger;
-    private final Counter transmitDataSuccesses;
-    private final Counter transmitDataMisses;
-    private final Gauge<Number> transmitOutstandingGauge;
-    private final OpStatsLogger transmitDataPacketSize;
-    private final Counter transmitControlSuccesses;
-    private final Counter pFlushSuccesses;
-    private final Counter pFlushMisses;
-    private final OpStatsLogger writeTime;
-    private final OpStatsLogger addCompleteTime;
-    private final OpStatsLogger addCompleteQueuedTime;
-    private final OpStatsLogger addCompleteDeferredTime;
-    private final Counter pendingWrites;
-
-    // add complete processing
-    private final SafeQueueingFuturePool<Void> addCompleteFuturePool;
-
-    // Functions
-    private final AbstractFunction1<Integer, Future<Long>> 
GET_LAST_TXID_ACKNOWLEDGED_AFTER_TRANSMIT_FUNC =
-            new AbstractFunction1<Integer, Future<Long>>() {
-                @Override
-                public Future<Long> apply(Integer transmitRc) {
-                    if (BKException.Code.OK == transmitRc) {
-                        return Future.value(getLastTxIdAcknowledged());
-                    } else {
-                        return Future.exception(new 
BKTransmitException("Failed to transmit entry", transmitRc));
-                    }
-                }
-            };
-    final AbstractFunction1<Long, Future<Long>> COMMIT_AFTER_FLUSH_FUNC =
-            new AbstractFunction1<Long, Future<Long>>() {
-                @Override
-                public Future<Long> apply(Long lastAckedTxId) {
-                    return commit();
-                }
-            };
-
-    private final AlertStatsLogger alertStatsLogger;
-    private final WriteLimiter writeLimiter;
-    private final FailureInjector writeDelayInjector;
-
-    /**
-     * Construct an edit log output stream which writes to a ledger.
-     */
-    protected BKLogSegmentWriter(String streamName,
-                                 String logSegmentName,
-                                 DistributedLogConfiguration conf,
-                                 int logSegmentMetadataVersion,
-                                 LogSegmentEntryWriter entryWriter,
-                                 DistributedLock lock, /** the lock needs to 
be acquired **/
-                                 long startTxId,
-                                 long logSegmentSequenceNumber,
-                                 OrderedScheduler scheduler,
-                                 StatsLogger statsLogger,
-                                 StatsLogger perLogStatsLogger,
-                                 AlertStatsLogger alertStatsLogger,
-                                 PermitLimiter globalWriteLimiter,
-                                 FeatureProvider featureProvider,
-                                 DynamicDistributedLogConfiguration dynConf)
-        throws IOException {
-        super();
-
-        // set up a write limiter
-        PermitLimiter streamWriteLimiter = null;
-        if (conf.getPerWriterOutstandingWriteLimit() < 0) {
-            streamWriteLimiter = PermitLimiter.NULL_PERMIT_LIMITER;
-        } else {
-            Feature disableWriteLimitFeature = featureProvider.getFeature(
-                CoreFeatureKeys.DISABLE_WRITE_LIMIT.name().toLowerCase());
-            streamWriteLimiter = new SimplePermitLimiter(
-                conf.getOutstandingWriteLimitDarkmode(),
-                conf.getPerWriterOutstandingWriteLimit(),
-                statsLogger.scope("streamWriteLimiter"),
-                false,
-                disableWriteLimitFeature);
-        }
-        this.writeLimiter = new WriteLimiter(streamName, streamWriteLimiter, 
globalWriteLimiter);
-        this.alertStatsLogger = alertStatsLogger;
-        this.envelopeStatsLogger = 
BroadCastStatsLogger.masterslave(statsLogger, perLogStatsLogger);
-
-        StatsLogger flushStatsLogger = statsLogger.scope("flush");
-        StatsLogger pFlushStatsLogger = flushStatsLogger.scope("periodic");
-        pFlushSuccesses = pFlushStatsLogger.getCounter("success");
-        pFlushMisses = pFlushStatsLogger.getCounter("miss");
-
-        // transmit
-        StatsLogger transmitDataStatsLogger = statsLogger.scope("data");
-        transmitDataSuccesses = transmitDataStatsLogger.getCounter("success");
-        transmitDataMisses = transmitDataStatsLogger.getCounter("miss");
-        StatsLogger transmitStatsLogger = statsLogger.scope("transmit");
-        transmitDataPacketSize =  
transmitStatsLogger.getOpStatsLogger("packetsize");
-        StatsLogger transmitControlStatsLogger = statsLogger.scope("control");
-        transmitControlSuccesses = 
transmitControlStatsLogger.getCounter("success");
-        StatsLogger segWriterStatsLogger = statsLogger.scope("seg_writer");
-        writeTime = segWriterStatsLogger.getOpStatsLogger("write");
-        addCompleteTime = 
segWriterStatsLogger.scope("add_complete").getOpStatsLogger("callback");
-        addCompleteQueuedTime = 
segWriterStatsLogger.scope("add_complete").getOpStatsLogger("queued");
-        addCompleteDeferredTime = 
segWriterStatsLogger.scope("add_complete").getOpStatsLogger("deferred");
-        pendingWrites = segWriterStatsLogger.getCounter("pending");
-
-        // outstanding transmit requests
-        transmitOutstandingLogger = 
perLogStatsLogger.scope("transmit").scope("outstanding");
-        transmitOutstandingGauge = new Gauge<Number>() {
-            @Override
-            public Number getDefaultValue() {
-                return 0;
-            }
-            @Override
-            public Number getSample() {
-                return outstandingTransmits.get();
-            }
-        };
-        transmitOutstandingLogger.registerGauge("requests", 
transmitOutstandingGauge);
-
-        outstandingTransmits = new AtomicInteger(0);
-        this.fullyQualifiedLogSegment = streamName + ":" + logSegmentName;
-        this.streamName = streamName;
-        this.logSegmentMetadataVersion = logSegmentMetadataVersion;
-        this.entryWriter = entryWriter;
-        this.lock = lock;
-        this.lock.checkOwnershipAndReacquire();
-
-        final int configuredTransmissionThreshold = 
dynConf.getOutputBufferSize();
-        if (configuredTransmissionThreshold > MAX_LOGRECORDSET_SIZE) {
-            LOG.warn("Setting output buffer size {} greater than max 
transmission size {} for log segment {}",
-                new Object[] {configuredTransmissionThreshold, 
MAX_LOGRECORDSET_SIZE, fullyQualifiedLogSegment});
-            this.transmissionThreshold = MAX_LOGRECORDSET_SIZE;
-        } else {
-            this.transmissionThreshold = configuredTransmissionThreshold;
-        }
-        this.compressionType = 
CompressionUtils.stringToType(conf.getCompressionType());
-
-        this.logSegmentSequenceNumber = logSegmentSequenceNumber;
-        this.recordSetWriter = Entry.newEntry(
-                streamName,
-                Math.max(transmissionThreshold, 1024),
-                envelopeBeforeTransmit(),
-                compressionType,
-                envelopeStatsLogger);
-        this.packetPrevious = null;
-        this.startTxId = startTxId;
-        this.lastTxId = startTxId;
-        this.lastTxIdAcknowledged = startTxId;
-        this.enableRecordCounts = conf.getEnableRecordCounts();
-        this.immediateFlushEnabled = conf.getImmediateFlushEnabled();
-        this.isDurableWriteEnabled = dynConf.isDurableWriteEnabled();
-        this.scheduler = scheduler;
-
-        // Failure injection
-        if (conf.getEIInjectWriteDelay()) {
-            this.writeDelayInjector = new RandomDelayFailureInjector(dynConf);
-        } else {
-            this.writeDelayInjector = FailureInjector.NULL;
-        }
-
-        // If we are transmitting immediately (threshold == 0) and if immediate
-        // flush is enabled, we don't need the periodic flush task
-        final int configuredPeriodicFlushFrequency = 
dynConf.getPeriodicFlushFrequencyMilliSeconds();
-        if (!immediateFlushEnabled || (0 != this.transmissionThreshold)) {
-            int periodicFlushFrequency = configuredPeriodicFlushFrequency;
-            if (periodicFlushFrequency > 0 && scheduler != null) {
-                periodicFlushSchedule = scheduler.scheduleAtFixedRate(this,
-                        periodicFlushFrequency/2, periodicFlushFrequency/2, 
TimeUnit.MILLISECONDS);
-            } else {
-                periodicFlushSchedule = null;
-            }
-        } else {
-            // Min delay heuristic applies only when immediate flush is enabled
-            // and transmission threshold is zero
-            minDelayBetweenImmediateFlushMs = 
conf.getMinDelayBetweenImmediateFlushMs();
-            periodicFlushSchedule = null;
-        }
-        this.periodicKeepAliveMs = conf.getPeriodicKeepAliveMilliSeconds();
-        if (periodicKeepAliveMs > 0 && scheduler != null) {
-            periodicKeepAliveSchedule = scheduler.scheduleAtFixedRate(new 
Runnable() {
-                @Override
-                public void run() {
-                    keepAlive();
-                }
-            }, periodicKeepAliveMs, periodicKeepAliveMs, 
TimeUnit.MILLISECONDS);
-        } else {
-            periodicKeepAliveSchedule = null;
-        }
-
-        this.conf = conf;
-        if (null != scheduler) {
-            this.addCompleteFuturePool = new 
SafeQueueingFuturePool<Void>(scheduler.getFuturePool(streamName));
-        } else {
-            this.addCompleteFuturePool = null;
-        }
-        assert(!this.immediateFlushEnabled || (null != this.scheduler));
-        this.lastTransmit = Stopwatch.createStarted();
-    }
-
-    String getFullyQualifiedLogSegment() {
-        return fullyQualifiedLogSegment;
-    }
-
-    @VisibleForTesting
-    DistributedLock getLock() {
-        return this.lock;
-    }
-
-    @VisibleForTesting
-    FuturePool getFuturePool() {
-        if (null == scheduler) {
-            return null;
-        }
-        return scheduler.getFuturePool(streamName);
-    }
-
-    @VisibleForTesting
-    void setTransmitResult(int rc) {
-        transmitResult.set(rc);
-    }
-
-    @VisibleForTesting
-    protected final LogSegmentEntryWriter getEntryWriter() {
-        return this.entryWriter;
-    }
-
-    @Override
-    public long getLogSegmentId() {
-        return this.entryWriter.getLogSegmentId();
-    }
-
-    protected final long getLogSegmentSequenceNumber() {
-        return logSegmentSequenceNumber;
-    }
-
-    /**
-     * Get the start tx id of the log segment.
-     *
-     * @return start tx id of the log segment.
-     */
-    protected final long getStartTxId() {
-        return startTxId;
-    }
-
-    /**
-     * Get the last tx id that has been written to the log segment buffer but 
not committed yet.
-     *
-     * @return last tx id that has been written to the log segment buffer but 
not committed yet.
-     * @see #getLastTxIdAcknowledged()
-     */
-    synchronized long getLastTxId() {
-        return lastTxId;
-    }
-
-    /**
-     * Get the last tx id that has been acknowledged.
-     *
-     * @return last tx id that has been acknowledged.
-     * @see #getLastTxId()
-     */
-    synchronized long getLastTxIdAcknowledged() {
-        return lastTxIdAcknowledged;
-    }
-
-    /**
-     * Get the position-within-logsemgnet of the last written log record.
-     *
-     * @return position-within-logsegment of the last written log record.
-     */
-    synchronized int getPositionWithinLogSegment() {
-        return positionWithinLogSegment;
-    }
-
-    @VisibleForTesting
-    long getLastEntryId() {
-        return lastEntryId;
-    }
-
-    /**
-     * Get the last dlsn of the last acknowledged record.
-     *
-     * @return last dlsn of the last acknowledged record.
-     */
-    synchronized DLSN getLastDLSN() {
-        return lastDLSN;
-    }
-
-    @Override
-    public long size() {
-        return entryWriter.size();
-    }
-
-    private synchronized int getAverageTransmitSize() {
-        if (numFlushesSinceRestart > 0) {
-            long ret = numBytes/numFlushesSinceRestart;
-
-            if (ret < Integer.MIN_VALUE || ret > Integer.MAX_VALUE) {
-                throw new IllegalArgumentException
-                    (ret + " transmit size should never exceed max transmit 
size");
-            }
-            return (int) ret;
-        }
-
-        return 0;
-    }
-
-    private Entry.Writer newRecordSetWriter() {
-        return Entry.newEntry(
-                streamName,
-                Math.max(transmissionThreshold, getAverageTransmitSize()),
-                envelopeBeforeTransmit(),
-                compressionType,
-                envelopeStatsLogger);
-    }
-
-    private boolean envelopeBeforeTransmit() {
-        return 
LogSegmentMetadata.supportsEnvelopedEntries(logSegmentMetadataVersion);
-    }
-
-    @Override
-    public Future<Void> asyncClose() {
-        return closeInternal(false);
-    }
-
-    @Override
-    public Future<Void> asyncAbort() {
-        return closeInternal(true);
-    }
-
-    private void flushAddCompletes() {
-        if (null != addCompleteFuturePool) {
-            addCompleteFuturePool.close();
-        }
-    }
-
-    private synchronized void abortPacket(BKTransmitPacket packet) {
-        long numRecords = 0;
-        if (null != packet) {
-            EntryBuffer recordSet = packet.getRecordSet();
-            numRecords = recordSet.getNumRecords();
-            int rc = transmitResult.get();
-            if (BKException.Code.OK == rc) {
-                rc = BKException.Code.InterruptedException;
-            }
-            Throwable reason = new WriteCancelledException(streamName, 
FutureUtils.transmitException(rc));
-            recordSet.abortTransmit(reason);
-        }
-        LOG.info("Stream {} aborted {} writes", fullyQualifiedLogSegment, 
numRecords);
-    }
-
-    private synchronized long getWritesPendingTransmit() {
-        if (null != recordSetWriter) {
-            return recordSetWriter.getNumRecords();
-        } else {
-            return 0;
-        }
-    }
-
-    private synchronized long getPendingAddCompleteCount() {
-        if (null != addCompleteFuturePool) {
-            return addCompleteFuturePool.size();
-        } else {
-            return 0;
-        }
-    }
-
-    private Future<Void> closeInternal(boolean abort) {
-        Promise<Void> closePromise;
-        synchronized (this) {
-            if (null != closeFuture) {
-                return closeFuture;
-            }
-            closePromise = closeFuture = new Promise<Void>();
-        }
-
-        AtomicReference<Throwable> throwExc = new 
AtomicReference<Throwable>(null);
-        closeInternal(abort, throwExc, closePromise);
-        return closePromise;
-    }
-
-    private void closeInternal(final boolean abort,
-                               final AtomicReference<Throwable> throwExc,
-                               final Promise<Void> closePromise) {
-        // clean stats resources
-        this.transmitOutstandingLogger.unregisterGauge("requests", 
transmitOutstandingGauge);
-        this.writeLimiter.close();
-
-        // Cancel the periodic keep alive schedule first
-        if (null != periodicKeepAliveSchedule) {
-            if (!periodicKeepAliveSchedule.cancel(false)) {
-                LOG.info("Periodic keepalive for log segment {} isn't 
cancelled.", getFullyQualifiedLogSegment());
-            }
-        }
-
-        // Cancel the periodic flush schedule first
-        // The task is allowed to exit gracefully
-        if (null != periodicFlushSchedule) {
-            // we don't need to care about the cancel result here. if the 
periodicl flush task couldn't
-            // be cancelled, it means that it is doing flushing. So following 
flushes would be synchronized
-            // to wait until background flush completed.
-            if (!periodicFlushSchedule.cancel(false)) {
-                LOG.info("Periodic flush for log segment {} isn't cancelled.", 
getFullyQualifiedLogSegment());
-            }
-        }
-
-        // If it is a normal close and the stream isn't in an error state, we 
attempt to flush any buffered data
-        if (!abort && !isLogSegmentInError()) {
-            this.enforceLock = false;
-            LOG.info("Flushing before closing log segment {}", 
getFullyQualifiedLogSegment());
-            flushAndCommit().addEventListener(new FutureEventListener<Long>() {
-                @Override
-                public void onSuccess(Long value) {
-                    abortTransmitPacketOnClose(abort, throwExc, closePromise);
-                }
-
-                @Override
-                public void onFailure(Throwable cause) {
-                    throwExc.set(cause);
-                    abortTransmitPacketOnClose(abort, throwExc, closePromise);
-                }
-            });
-        } else {
-            abortTransmitPacketOnClose(abort, throwExc, closePromise);
-        }
-
-    }
-
-    private void abortTransmitPacketOnClose(final boolean abort,
-                                            final AtomicReference<Throwable> 
throwExc,
-                                            final Promise<Void> closePromise) {
-        LOG.info("Closing BKPerStreamLogWriter (abort={}) for {} :" +
-                        " lastDLSN = {} outstandingTransmits = {} 
writesPendingTransmit = {} addCompletesPending = {}",
-                new Object[]{abort, fullyQualifiedLogSegment, getLastDLSN(),
-                        outstandingTransmits.get(), 
getWritesPendingTransmit(), getPendingAddCompleteCount()});
-
-        // Save the current packet to reset, leave a new empty packet to avoid 
a race with
-        // addCompleteDeferredProcessing.
-        final BKTransmitPacket packetPreviousSaved;
-        final BKTransmitPacket packetCurrentSaved;
-        synchronized (this) {
-            packetPreviousSaved = packetPrevious;
-            packetCurrentSaved = new BKTransmitPacket(recordSetWriter);
-            recordSetWriter = newRecordSetWriter();
-        }
-
-        // Once the last packet been transmitted, apply any remaining promises 
asynchronously
-        // to avoid blocking close if bk client is slow for some reason.
-        if (null != packetPreviousSaved) {
-            packetPreviousSaved.addTransmitCompleteListener(new 
FutureEventListener<Integer>() {
-                @Override
-                public void onSuccess(Integer transmitResult) {
-                    flushAddCompletes();
-                    abortPacket(packetCurrentSaved);
-                }
-                @Override
-                public void onFailure(Throwable cause) {
-                    LOG.error("Unexpected error on transmit completion ", 
cause);
-                }
-            });
-        } else {
-            // In this case there are no pending add completes, but we still 
need to abort the
-            // current packet.
-            abortPacket(packetCurrentSaved);
-        }
-        closeLedgerOnClose(abort, throwExc, closePromise);
-    }
-
-    private void closeLedgerOnClose(final boolean abort,
-                                    final AtomicReference<Throwable> throwExc,
-                                    final Promise<Void> closePromise) {
-        // close the log segment if it isn't in error state, so all the 
outstanding addEntry(s) will callback.
-        if (null == throwExc.get() && !isLogSegmentInError()) {
-            // Synchronous closing the ledger handle, if we couldn't close a 
ledger handle successfully.
-            // we should throw the exception to #closeToFinalize, so it would 
fail completing a log segment.
-            entryWriter.asyncClose(new CloseCallback() {
-                @Override
-                public void closeComplete(int rc, LedgerHandle lh, Object ctx) 
{
-                    if (BKException.Code.OK != rc && 
BKException.Code.LedgerClosedException != rc) {
-                        if (!abort) {
-                            throwExc.set(new IOException("Failed to close 
ledger for " + fullyQualifiedLogSegment + " : " +
-                                    BKException.getMessage(rc)));
-                        }
-                    }
-                    completeClosePromise(abort, throwExc, closePromise);
-                }
-            }, null);
-        } else {
-            completeClosePromise(abort, throwExc, closePromise);
-        }
-    }
-
-    private void completeClosePromise(final boolean abort,
-                                      final AtomicReference<Throwable> 
throwExc,
-                                      final Promise<Void> closePromise) {
-        // If add entry failed because of closing ledger above, we don't need 
to fail the close operation
-        if (!abort && null == throwExc.get() && 
shouldFailCompleteLogSegment()) {
-            throwExc.set(new BKTransmitException("Closing an errored stream : 
", transmitResult.get()));
-        }
-
-        if (null == throwExc.get()) {
-            FutureUtils.setValue(closePromise, null);
-        } else {
-            FutureUtils.setException(closePromise, throwExc.get());
-        }
-    }
-
-    @Override
-    synchronized public void write(LogRecord record) throws IOException {
-        writeUserRecord(record);
-        flushIfNeeded();
-    }
-
-    @Override
-    synchronized public Future<DLSN> asyncWrite(LogRecord record) {
-        return asyncWrite(record, true);
-    }
-
-    synchronized public Future<DLSN> asyncWrite(LogRecord record, boolean 
flush) {
-        Future<DLSN> result = null;
-        try {
-            if (record.isControl()) {
-                // we don't pack control records with user records together
-                // so transmit current output buffer if possible
-                try {
-                    transmit();
-                } catch (IOException ioe) {
-                    return Future.exception(new 
WriteCancelledException(fullyQualifiedLogSegment, ioe));
-                }
-                result = writeControlLogRecord(record);
-                transmit();
-            } else {
-                result = writeUserRecord(record);
-                if (!isDurableWriteEnabled) {
-                    // we have no idea about the DLSN if durability is turned 
off.
-                    result = Future.value(DLSN.InvalidDLSN);
-                }
-                if (flush) {
-                    flushIfNeeded();
-                }
-            }
-        } catch (IOException ioe) {
-            // We may incorrectly report transmit failure here, but only if we 
happened to hit
-            // packet/xmit size limit conditions AND fail flush above, which 
should happen rarely
-            if (null != result) {
-                LOG.error("Overriding first result with flush failure {}", 
result);
-            }
-            result = Future.exception(ioe);
-
-            // Flush to ensure any prev. writes with flush=false are flushed 
despite failure.
-            flushIfNeededNoThrow();
-        }
-        return result;
-    }
-
-    synchronized private Future<DLSN> writeUserRecord(LogRecord record) throws 
IOException {
-        if (null != closeFuture) {
-            throw new WriteException(fullyQualifiedLogSegment, 
BKException.getMessage(BKException.Code.LedgerClosedException));
-        }
-
-        if (BKException.Code.OK != transmitResult.get()) {
-            // Failfast if the stream already encountered error with safe 
retry on the client
-            throw new WriteException(fullyQualifiedLogSegment, 
BKException.getMessage(transmitResult.get()));
-        }
-
-        if (streamEnded) {
-            throw new EndOfStreamException("Writing to a stream after it has 
been marked as completed");
-        }
-
-        if ((record.getTransactionId() < 0) ||
-            (record.getTransactionId() == DistributedLogConstants.MAX_TXID)) {
-            throw new 
TransactionIdOutOfOrderException(record.getTransactionId());
-        }
-
-        // Inject write delay if configured to do so
-        writeDelayInjector.inject();
-
-        // Will check write rate limits and throw if exceeded.
-        writeLimiter.acquire();
-        pendingWrites.inc();
-
-        // The count represents the number of user records up to the
-        // current record
-        // Increment the record count only when writing a user log record
-        // Internally generated log records don't increment the count
-        // writeInternal will always set a count regardless of whether it was
-        // incremented or not.
-        Future<DLSN> future = null;
-        try {
-            // increment the position for the record to write
-            // if the record is failed to write, it would be decremented.
-            positionWithinLogSegment++;
-            int numRecords = 1;
-            if (record.isRecordSet()) {
-                numRecords = LogRecordSet.numRecords(record);
-            }
-            future = writeInternal(record);
-            // after the record (record set) is written, the position should be
-            // moved for {numRecords}, but since we already moved the record 
by 1
-            // so advance the position for other {numRecords - 1}.
-            positionWithinLogSegment += (numRecords - 1);
-        } catch (IOException ex) {
-            writeLimiter.release();
-            pendingWrites.dec();
-            positionWithinLogSegment--;
-            throw ex;
-        }
-
-        // Track outstanding requests and return the future.
-        return future.ensure(new Function0<BoxedUnit>() {
-            public BoxedUnit apply() {
-                pendingWrites.dec();
-                writeLimiter.release();
-                return null;
-            }
-        });
-    }
-
-    boolean isLogSegmentInError() {
-        return (transmitResult.get() != BKException.Code.OK);
-    }
-
-    boolean shouldFailCompleteLogSegment() {
-        return (transmitResult.get() != BKException.Code.OK) &&
-                (transmitResult.get() != 
BKException.Code.LedgerClosedException);
-    }
-
-    synchronized public Future<DLSN> writeInternal(LogRecord record)
-            throws LogRecordTooLongException, LockingException, 
BKTransmitException,
-                   WriteException, InvalidEnvelopedEntryException {
-        int logRecordSize = record.getPersistentSize();
-
-        if (logRecordSize > MAX_LOGRECORD_SIZE) {
-            throw new LogRecordTooLongException(String.format(
-                    "Log Record of size %d written when only %d is allowed",
-                    logRecordSize, MAX_LOGRECORD_SIZE));
-        }
-
-        // If we will exceed the max number of bytes allowed per entry
-        // initiate a transmit before accepting the new log record
-        if ((recordSetWriter.getNumBytes() + logRecordSize) > 
MAX_LOGRECORDSET_SIZE) {
-            checkStateAndTransmit();
-        }
-
-        checkWriteLock();
-
-        if (enableRecordCounts) {
-            // Set the count here. The caller would appropriately increment it
-            // if this log record is to be counted
-            record.setPositionWithinLogSegment(positionWithinLogSegment);
-        }
-
-        Promise<DLSN> writePromise = new Promise<DLSN>();
-        writePromise.addEventListener(new OpStatsListener<DLSN>(writeTime));
-        recordSetWriter.writeRecord(record, writePromise);
-
-        if (record.getTransactionId() < lastTxId) {
-            LOG.info("Log Segment {} TxId decreased Last: {} Record: {}",
-                    new Object[] {fullyQualifiedLogSegment, lastTxId, 
record.getTransactionId()});
-        }
-        if (!record.isControl()) {
-            // only update last tx id for user records
-            lastTxId = record.getTransactionId();
-            outstandingBytes += (20 + record.getPayload().length);
-        }
-        return writePromise;
-    }
-
-    synchronized private Future<DLSN> writeControlLogRecord()
-            throws BKTransmitException, WriteException, 
InvalidEnvelopedEntryException,
-                   LockingException, LogRecordTooLongException {
-        LogRecord controlRec = new LogRecord(lastTxId, 
DistributedLogConstants.CONTROL_RECORD_CONTENT);
-        controlRec.setControl();
-        return writeControlLogRecord(controlRec);
-    }
-
-    synchronized private Future<DLSN> writeControlLogRecord(LogRecord record)
-            throws BKTransmitException, WriteException, 
InvalidEnvelopedEntryException,
-                   LockingException, LogRecordTooLongException {
-        return writeInternal(record);
-    }
-
-    /**
-     * We write a special log record that marks the end of the stream. Since 
this is the last
-     * log record in the stream, it is marked with MAX_TXID. MAX_TXID also has 
the useful
-     * side-effect of disallowing future startLogSegment calls through the 
MaxTxID check
-     *
-     * @throws IOException
-     */
-    synchronized private void writeEndOfStreamMarker() throws IOException {
-        LogRecord endOfStreamRec = new 
LogRecord(DistributedLogConstants.MAX_TXID, "endOfStream".getBytes(UTF_8));
-        endOfStreamRec.setEndOfStream();
-        writeInternal(endOfStreamRec);
-    }
-
-    /**
-     * Flushes all the data up to this point,
-     * adds the end of stream marker and marks the stream
-     * as read-only in the metadata. No appends to the
-     * stream will be allowed after this point
-     */
-    public Future<Long> markEndOfStream() {
-        synchronized (this) {
-            try {
-                writeEndOfStreamMarker();
-            } catch (IOException e) {
-                return Future.exception(e);
-            }
-            streamEnded = true;
-        }
-        return flushAndCommit();
-    }
-
-    /**
-     * Write bulk of records.
-     *
-     * (TODO: moved this method to log writer level)
-     *
-     * @param records list of records to write
-     * @return number of records that has been written
-     * @throws IOException when there is I/O errors during writing records.
-     */
-    synchronized public int writeBulk(List<LogRecord> records) throws 
IOException {
-        int numRecords = 0;
-        for (LogRecord r : records) {
-            write(r);
-            numRecords++;
-        }
-        return numRecords;
-    }
-
-    private void checkStateBeforeTransmit() throws WriteException {
-        try {
-            
FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_TransmitBeforeAddEntry);
-        } catch (IOException e) {
-            throw new WriteException(streamName, "Fail transmit before adding 
entries");
-        }
-    }
-
-    /**
-     * Transmit the output buffer data to the backend.
-     *
-     * @return last txn id that already acknowledged
-     * @throws BKTransmitException if the segment writer is already in error 
state
-     * @throws LockingException if the segment writer lost lock before transmit
-     * @throws WriteException if failed to create the envelope for the data to 
transmit
-     * @throws InvalidEnvelopedEntryException when built an invalid enveloped 
entry
-     */
-    synchronized void checkStateAndTransmit()
-            throws BKTransmitException, WriteException, 
InvalidEnvelopedEntryException, LockingException {
-        checkStateBeforeTransmit();
-        transmit();
-    }
-
-    @Override
-    public synchronized Future<Long> flush() {
-        try {
-            checkStateBeforeTransmit();
-        } catch (WriteException e) {
-            return Future.exception(e);
-        }
-
-        Future<Integer> transmitFuture;
-        try {
-            transmitFuture = transmit();
-        } catch (BKTransmitException e) {
-            return Future.exception(e);
-        } catch (LockingException e) {
-            return Future.exception(e);
-        } catch (WriteException e) {
-            return Future.exception(e);
-        } catch (InvalidEnvelopedEntryException e) {
-            return Future.exception(e);
-        }
-
-        if (null == transmitFuture) {
-            if (null != packetPrevious) {
-                transmitFuture = packetPrevious.getTransmitFuture();
-            }  else {
-                return Future.value(getLastTxIdAcknowledged());
-            }
-        }
-
-        return 
transmitFuture.flatMap(GET_LAST_TXID_ACKNOWLEDGED_AFTER_TRANSMIT_FUNC);
-    }
-
-    @Override
-    public synchronized Future<Long> commit() {
-        // we don't pack control records with user records together
-        // so transmit current output buffer if possible
-        Future<Integer> transmitFuture;
-        try {
-            try {
-                transmitFuture = transmit();
-            } catch (IOException ioe) {
-                return Future.exception(ioe);
-            }
-            if (null == transmitFuture) {
-                writeControlLogRecord();
-                return flush();
-            }
-        } catch (IOException ioe) {
-            return Future.exception(ioe);
-        }
-        return 
transmitFuture.flatMap(GET_LAST_TXID_ACKNOWLEDGED_AFTER_TRANSMIT_FUNC);
-    }
-
-    Future<Long> flushAndCommit() {
-        return flush().flatMap(COMMIT_AFTER_FLUSH_FUNC);
-    }
-
-    void flushIfNeededNoThrow() {
-        try {
-            flushIfNeeded();
-        } catch (IOException ioe) {
-            LOG.error("Encountered exception while flushing log records to 
stream {}",
-                fullyQualifiedLogSegment, ioe);
-        }
-    }
-
-    void scheduleFlushWithDelayIfNeeded(final Callable<?> callable,
-                                        final 
AtomicReference<ScheduledFuture<?>> scheduledFutureRef) {
-        final long delayMs = Math.max(0, minDelayBetweenImmediateFlushMs - 
lastTransmit.elapsed(TimeUnit.MILLISECONDS));
-        final ScheduledFuture<?> scheduledFuture = scheduledFutureRef.get();
-        if ((null == scheduledFuture) || scheduledFuture.isDone()) {
-            scheduledFutureRef.set(scheduler.schedule(new Runnable() {
-                @Override
-                public void run() {
-                    synchronized(this) {
-                        scheduledFutureRef.set(null);
-                        try {
-                            callable.call();
-
-                            // Flush was successful or wasn't needed, the 
exception should be unset.
-                            scheduledFlushException.set(null);
-                        } catch (Exception exc) {
-                            scheduledFlushException.set(exc);
-                            LOG.error("Delayed flush failed", exc);
-                        }
-                    }
-                }
-            }, delayMs, TimeUnit.MILLISECONDS));
-        }
-    }
-
-    // Based on transmit buffer size, immediate flush, etc., should we flush 
the current
-    // packet now.
-    void flushIfNeeded() throws BKTransmitException, WriteException, 
InvalidEnvelopedEntryException,
-            LockingException, FlushException {
-        if (outstandingBytes > transmissionThreshold) {
-            // If flush delay is disabled, flush immediately, else schedule 
appropriately.
-            if (0 == minDelayBetweenImmediateFlushMs) {
-                checkStateAndTransmit();
-            } else {
-                scheduleFlushWithDelayIfNeeded(new Callable<Void>() {
-                    @Override
-                    public Void call() throws Exception {
-                        checkStateAndTransmit();
-                        return null;
-                    }
-                }, transmitSchedFutureRef);
-
-                // Timing here is not very important--the last flush failed 
and we should
-                // indicate this to the caller. The next flush may succeed and 
unset the
-                // scheduledFlushException in which case the next write will 
succeed (if the caller
-                // hasn't already closed the writer).
-                if (scheduledFlushException.get() != null) {
-                    throw new FlushException("Last flush encountered an error 
while writing data to the backend",
-                        getLastTxId(), getLastTxIdAcknowledged(), 
scheduledFlushException.get());
-                }
-            }
-        }
-    }
-
-    private void checkWriteLock() throws LockingException {
-        try {
-            if 
(FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_WriteInternalLostLock))
 {
-                throw new LockingException("/failpoint/lockpath", "failpoint 
is simulating a lost lock"
-                        + getFullyQualifiedLogSegment());
-            }
-        } catch (IOException e) {
-            throw new LockingException("/failpoint/lockpath", "failpoint is 
simulating a lost lock for "
-                    + getFullyQualifiedLogSegment());
-        }
-        if (enforceLock) {
-            lock.checkOwnershipAndReacquire();
-        }
-    }
-
-    /**
-     * Transmit the current buffer to bookkeeper.
-     * Synchronised at the class. #write() and #setReadyToFlush()
-     * are never called at the same time.
-     *
-     * NOTE: This method should only throw known exceptions so that we don't 
accidentally
-     *       add new code that throws in an inappropriate place.
-     *
-     * @return a transmit future for caller to wait for transmit result if we 
transmit successfully,
-     *         null if no data to transmit
-     * @throws BKTransmitException if the segment writer is already in error 
state
-     * @throws LockingException if the segment writer lost lock before transmit
-     * @throws WriteException if failed to create the envelope for the data to 
transmit
-     * @throws InvalidEnvelopedEntryException when built an invalid enveloped 
entry
-     */
-    private Future<Integer> transmit()
-        throws BKTransmitException, LockingException, WriteException, 
InvalidEnvelopedEntryException {
-        EntryBuffer recordSetToTransmit;
-        transmitLock.lock();
-        try {
-            synchronized (this) {
-                checkWriteLock();
-                // If transmitResult is anything other than 
BKException.Code.OK, it means that the
-                // stream has encountered an error and cannot be written to.
-                if (!transmitResult.compareAndSet(BKException.Code.OK,
-                                                  BKException.Code.OK)) {
-                    LOG.error("Log Segment {} Trying to write to an errored 
stream; Error is {}",
-                              fullyQualifiedLogSegment,
-                              BKException.getMessage(transmitResult.get()));
-                    throw new BKTransmitException("Trying to write to an 
errored stream;"
-                                                          + " Error code : (" 
+ transmitResult.get()
-                                                          + ") " + 
BKException.getMessage(transmitResult.get()), transmitResult.get());
-                }
-
-                if (recordSetWriter.getNumRecords() == 0) {
-                    // Control flushes always have at least the control record 
to flush
-                    transmitDataMisses.inc();
-                    return null;
-                }
-
-                recordSetToTransmit = recordSetWriter;
-                recordSetWriter = newRecordSetWriter();
-                outstandingBytes = 0;
-
-                if (recordSetToTransmit.hasUserRecords()) {
-                    numBytes += recordSetToTransmit.getNumBytes();
-                    numFlushesSinceRestart++;
-                }
-            }
-
-            Buffer toSend;
-            try {
-                toSend = recordSetToTransmit.getBuffer();
-                
FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_TransmitFailGetBuffer);
-            } catch (IOException e) {
-                if (e instanceof InvalidEnvelopedEntryException) {
-                    alertStatsLogger.raise("Invalid enveloped entry for 
segment {} : ", fullyQualifiedLogSegment, e);
-                }
-                LOG.error("Exception while enveloping entries for segment: {}",
-                          new Object[] {fullyQualifiedLogSegment}, e);
-                // If a write fails here, we need to set the transmit result 
to an error so that
-                // no future writes go through and violate ordering guarantees.
-                transmitResult.set(BKException.Code.WriteException);
-                if (e instanceof InvalidEnvelopedEntryException) {
-                    alertStatsLogger.raise("Invalid enveloped entry for 
segment {} : ", fullyQualifiedLogSegment, e);
-                    throw (InvalidEnvelopedEntryException) e;
-                } else {
-                    throw new WriteException(streamName, "Envelope Error");
-                }
-            }
-
-            synchronized (this) {
-                // update the transmit timestamp
-                lastTransmitNanos = MathUtils.nowInNano();
-
-                BKTransmitPacket packet = new 
BKTransmitPacket(recordSetToTransmit);
-                packetPrevious = packet;
-                entryWriter.asyncAddEntry(toSend.getData(), 0, toSend.size(),
-                                          this, packet);
-
-                if (recordSetToTransmit.hasUserRecords()) {
-                    transmitDataSuccesses.inc();
-                } else {
-                    transmitControlSuccesses.inc();
-                }
-
-                lastTransmit.reset().start();
-                outstandingTransmits.incrementAndGet();
-                controlFlushNeeded = false;
-                return packet.getTransmitFuture();
-            }
-        } finally {
-            transmitLock.unlock();
-        }
-    }
-
-    /**
-     *  Checks if there is any data to transmit so that the periodic flush
-     *  task can determine if there is anything it needs to do
-     */
-    synchronized private boolean haveDataToTransmit() {
-        if (!transmitResult.compareAndSet(BKException.Code.OK, 
BKException.Code.OK)) {
-            // Even if there is data it cannot be transmitted, so effectively 
nothing to send
-            return false;
-        }
-
-        return (recordSetWriter.getNumRecords() > 0);
-    }
-
-    @Override
-    public void addComplete(final int rc, LedgerHandle handle,
-                            final long entryId, final Object ctx) {
-        final AtomicReference<Integer> effectiveRC = new 
AtomicReference<Integer>(rc);
-        try {
-            if 
(FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_TransmitComplete))
 {
-                effectiveRC.set(BKException.Code.UnexpectedConditionException);
-            }
-        } catch (Exception exc) {
-            effectiveRC.set(BKException.Code.UnexpectedConditionException);
-        }
-
-        // Sanity check to make sure we're receiving these callbacks in order.
-        if (entryId > -1 && lastEntryId >= entryId) {
-            LOG.error("Log segment {} saw out of order entry {} lastEntryId 
{}",
-                new Object[] {fullyQualifiedLogSegment, entryId, lastEntryId});
-        }
-        lastEntryId = entryId;
-
-        assert (ctx instanceof BKTransmitPacket);
-        final BKTransmitPacket transmitPacket = (BKTransmitPacket) ctx;
-
-        // Time from transmit until receipt of addComplete callback
-        addCompleteTime.registerSuccessfulEvent(TimeUnit.MICROSECONDS.convert(
-            System.nanoTime() - transmitPacket.getTransmitTime(), 
TimeUnit.NANOSECONDS));
-
-        if (BKException.Code.OK == rc) {
-            EntryBuffer recordSet = transmitPacket.getRecordSet();
-            if (recordSet.hasUserRecords()) {
-                synchronized (this) {
-                    lastTxIdAcknowledged = Math.max(lastTxIdAcknowledged, 
recordSet.getMaxTxId());
-                }
-            }
-        }
-
-        if (null != addCompleteFuturePool) {
-            final Stopwatch queuedTime = Stopwatch.createStarted();
-            addCompleteFuturePool.apply(new Function0<Void>() {
-                public Void apply() {
-                    final Stopwatch deferredTime = Stopwatch.createStarted();
-                    
addCompleteQueuedTime.registerSuccessfulEvent(queuedTime.elapsed(TimeUnit.MICROSECONDS));
-                    addCompleteDeferredProcessing(transmitPacket, entryId, 
effectiveRC.get());
-                    
addCompleteDeferredTime.registerSuccessfulEvent(deferredTime.elapsed(TimeUnit.MICROSECONDS));
-                    return null;
-                }
-                @Override
-                public String toString() {
-                    return String.format("AddComplete(Stream=%s, entryId=%d, 
rc=%d)",
-                            fullyQualifiedLogSegment, entryId, rc);
-                }
-            }).addEventListener(new FutureEventListener<Void>() {
-                @Override
-                public void onSuccess(Void done) {
-                }
-                @Override
-                public void onFailure(Throwable cause) {
-                    LOG.error("addComplete processing failed for {} entry {} 
lastTxId {} rc {} with error",
-                        new Object[] {fullyQualifiedLogSegment, entryId, 
transmitPacket.getRecordSet().getMaxTxId(), rc, cause});
-                }
-            });
-            // Race condition if we notify before the addComplete is enqueued.
-            transmitPacket.notifyTransmitComplete(effectiveRC.get());
-            outstandingTransmits.getAndDecrement();
-        } else {
-            // Notify transmit complete must be called before deferred 
processing in the
-            // sync case since otherwise callbacks in deferred processing may 
deadlock.
-            transmitPacket.notifyTransmitComplete(effectiveRC.get());
-            outstandingTransmits.getAndDecrement();
-            addCompleteDeferredProcessing(transmitPacket, entryId, 
effectiveRC.get());
-        }
-    }
-
-    private void addCompleteDeferredProcessing(final BKTransmitPacket 
transmitPacket,
-                                               final long entryId,
-                                               final int rc) {
-        boolean cancelPendingPromises = false;
-        EntryBuffer recordSet = transmitPacket.getRecordSet();
-        synchronized (this) {
-            if (transmitResult.compareAndSet(BKException.Code.OK, rc)) {
-                // If this is the first time we are setting an error code in 
the transmitResult then
-                // we must cancel pending promises; once this error has been 
set, more records will not
-                // be enqueued; they will be failed with WriteException
-                cancelPendingPromises = (BKException.Code.OK != rc);
-            } else {
-                LOG.warn("Log segment {} entryId {}: Tried to set transmit 
result to ({}) but is already ({})",
-                    new Object[] {fullyQualifiedLogSegment, entryId, rc, 
transmitResult.get()});
-            }
-
-            if (transmitResult.get() != BKException.Code.OK) {
-                if (recordSet.hasUserRecords()) {
-                    
transmitDataPacketSize.registerFailedEvent(recordSet.getNumBytes());
-                }
-            } else {
-                // If we had data that we flushed then we need it to make sure 
that
-                // background flush in the next pass will make the previous 
writes
-                // visible by advancing the lastAck
-                if (recordSet.hasUserRecords()) {
-                    
transmitDataPacketSize.registerSuccessfulEvent(recordSet.getNumBytes());
-                    controlFlushNeeded = true;
-                    if (immediateFlushEnabled) {
-                        if (0 == minDelayBetweenImmediateFlushMs) {
-                            backgroundFlush(true);
-                        } else {
-                            scheduleFlushWithDelayIfNeeded(new 
Callable<Void>() {
-                                @Override
-                                public Void call() throws Exception {
-                                    backgroundFlush(true);
-                                    return null;
-                                }
-                            }, immFlushSchedFutureRef);
-                        }
-                    }
-                }
-            }
-
-            // update last dlsn before satisifying future
-            if (BKException.Code.OK == transmitResult.get()) {
-                DLSN lastDLSNInPacket = recordSet.finalizeTransmit(
-                        logSegmentSequenceNumber, entryId);
-                if (recordSet.hasUserRecords()) {
-                    if (null != lastDLSNInPacket && 
lastDLSN.compareTo(lastDLSNInPacket) < 0) {
-                        lastDLSN = lastDLSNInPacket;
-                    }
-                }
-            }
-        }
-
-        if (BKException.Code.OK == transmitResult.get()) {
-            recordSet.completeTransmit(logSegmentSequenceNumber, entryId);
-        } else {
-            
recordSet.abortTransmit(FutureUtils.transmitException(transmitResult.get()));
-        }
-
-        if (cancelPendingPromises) {
-            // Since the writer is in a bad state no more packets will be 
tramsitted, and its safe to
-            // assign a new empty packet. This is to avoid a race with 
closeInternal which may also
-            // try to cancel the current packet;
-            final BKTransmitPacket packetCurrentSaved;
-            synchronized (this) {
-                packetCurrentSaved = new BKTransmitPacket(recordSetWriter);
-                recordSetWriter = newRecordSetWriter();
-            }
-            packetCurrentSaved.getRecordSet().abortTransmit(
-                    new WriteCancelledException(streamName,
-                            
FutureUtils.transmitException(transmitResult.get())));
-        }
-    }
-
-    @Override
-    synchronized public void run()  {
-        backgroundFlush(false);
-    }
-
-    synchronized private void backgroundFlush(boolean controlFlushOnly)  {
-        if (null != closeFuture) {
-            // if the log segment is closing, skip any background flushing
-            LOG.debug("Skip background flushing since log segment {} is 
closing.", getFullyQualifiedLogSegment());
-            return;
-        }
-        try {
-            boolean newData = haveDataToTransmit();
-
-            if (controlFlushNeeded || (!controlFlushOnly && newData)) {
-                // If we need this periodic transmit to persist previously 
written data but
-                // there is no new data (which would cause the transmit to be 
skipped) generate
-                // a control record
-                if (!newData) {
-                    writeControlLogRecord();
-                }
-
-                transmit();
-                pFlushSuccesses.inc();
-            } else {
-                pFlushMisses.inc();
-            }
-        } catch (IOException exc) {
-            LOG.error("Log Segment {}: Error encountered by the periodic 
flush", fullyQualifiedLogSegment, exc);
-        }
-    }
-
-    synchronized private void keepAlive() {
-        if (null != closeFuture) {
-            // if the log segment is closing, skip sending any keep alive 
records.
-            LOG.debug("Skip sending keepAlive control record since log segment 
{} is closing.",
-                    getFullyQualifiedLogSegment());
-            return;
-        }
-
-        if (MathUtils.elapsedMSec(lastTransmitNanos) < periodicKeepAliveMs) {
-            return;
-        }
-
-        LogRecord controlRec = new LogRecord(lastTxId, 
DistributedLogConstants.KEEPALIVE_RECORD_CONTENT);
-        controlRec.setControl();
-        asyncWrite(controlRec);
-    }
-
-}

Reply via email to