http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
----------------------------------------------------------------------
diff --git
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
deleted file mode 100644
index df64505..0000000
---
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
+++ /dev/null
@@ -1,1348 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Stopwatch;
-import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
-import com.twitter.distributedlog.exceptions.BKTransmitException;
-import com.twitter.distributedlog.exceptions.EndOfStreamException;
-import com.twitter.distributedlog.exceptions.FlushException;
-import com.twitter.distributedlog.exceptions.LockingException;
-import com.twitter.distributedlog.exceptions.LogRecordTooLongException;
-import com.twitter.distributedlog.exceptions.TransactionIdOutOfOrderException;
-import com.twitter.distributedlog.exceptions.WriteCancelledException;
-import com.twitter.distributedlog.exceptions.WriteException;
-import com.twitter.distributedlog.exceptions.InvalidEnvelopedEntryException;
-import com.twitter.distributedlog.feature.CoreFeatureKeys;
-import com.twitter.distributedlog.injector.FailureInjector;
-import com.twitter.distributedlog.injector.RandomDelayFailureInjector;
-import com.twitter.distributedlog.io.Buffer;
-import com.twitter.distributedlog.io.CompressionCodec;
-import com.twitter.distributedlog.io.CompressionUtils;
-import com.twitter.distributedlog.lock.DistributedLock;
-import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter;
-import com.twitter.distributedlog.logsegment.LogSegmentWriter;
-import com.twitter.distributedlog.stats.BroadCastStatsLogger;
-import com.twitter.distributedlog.stats.OpStatsListener;
-import com.twitter.distributedlog.util.FailpointUtils;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.OrderedScheduler;
-import com.twitter.distributedlog.util.PermitLimiter;
-import com.twitter.distributedlog.util.SafeQueueingFuturePool;
-import com.twitter.distributedlog.util.SimplePermitLimiter;
-import com.twitter.distributedlog.util.Sizable;
-import com.twitter.util.Function0;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.FuturePool;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
-import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.feature.Feature;
-import org.apache.bookkeeper.feature.FeatureProvider;
-import org.apache.bookkeeper.stats.AlertStatsLogger;
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.util.MathUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static com.twitter.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
-import static com.twitter.distributedlog.LogRecord.MAX_LOGRECORDSET_SIZE;
-
-/**
- * BookKeeper Based Log Segment Writer.
- *
- * Multiple log records are packed into a single bookkeeper
- * entry before sending it over the network. The fact that the log record
entries
- * are complete in the bookkeeper entries means that each bookkeeper log entry
- * can be read as a complete edit log. This is useful for reading, as we don't
- * need to read through the entire log segment to get the last written entry.
- *
- * <h3>Metrics</h3>
- *
- * <ul>
- * <li> flush/periodic/{success,miss}: counters for periodic flushes.
- * <li> data/{success,miss}: counters for data transmits.
- * <li> transmit/packetsize: opstats. characteristics of packet size for
transmits.
- * <li> control/success: counter of success transmit of control records
- * <li> seg_writer/write: opstats. latency characteristics of write operations
in segment writer.
- * <li> seg_writer/add_complete/{callback,queued,deferred}: opstats. latency
components of add completions.
- * <li> seg_writer/pendings: counter. the number of records pending by the
segment writers.
- * <li> transmit/outstanding/requests: per stream gauge. the number of
outstanding transmits each stream.
- * </ul>
- */
-class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable,
Sizable {
- static final Logger LOG =
LoggerFactory.getLogger(BKLogSegmentWriter.class);
-
- private final String fullyQualifiedLogSegment;
- private final String streamName;
- private final int logSegmentMetadataVersion;
- private BKTransmitPacket packetPrevious;
- private Entry.Writer recordSetWriter;
- private final AtomicInteger outstandingTransmits;
- private final int transmissionThreshold;
- protected final LogSegmentEntryWriter entryWriter;
- private final CompressionCodec.Type compressionType;
- private final ReentrantLock transmitLock = new ReentrantLock();
- private final AtomicInteger transmitResult
- = new AtomicInteger(BKException.Code.OK);
- private final DistributedLock lock;
- private final boolean isDurableWriteEnabled;
- private DLSN lastDLSN = DLSN.InvalidDLSN;
- private final long startTxId;
- private long lastTxId = DistributedLogConstants.INVALID_TXID;
- private long lastTxIdAcknowledged = DistributedLogConstants.INVALID_TXID;
- private long outstandingBytes = 0;
- private long numFlushesSinceRestart = 0;
- private long numBytes = 0;
- private long lastEntryId = Long.MIN_VALUE;
- private long lastTransmitNanos = Long.MIN_VALUE;
- private final int periodicKeepAliveMs;
-
- // Indicates whether there are writes that have been successfully
transmitted that would need
- // a control record to be transmitted to make them visible to the readers
by updating the last
- // add confirmed
- volatile private boolean controlFlushNeeded = false;
- private boolean immediateFlushEnabled = false;
- private int minDelayBetweenImmediateFlushMs = 0;
- private Stopwatch lastTransmit;
- private boolean streamEnded = false;
- private final ScheduledFuture<?> periodicFlushSchedule;
- private final ScheduledFuture<?> periodicKeepAliveSchedule;
- final private AtomicReference<ScheduledFuture<?>> transmitSchedFutureRef =
new AtomicReference<ScheduledFuture<?>>(null);
- final private AtomicReference<ScheduledFuture<?>> immFlushSchedFutureRef =
new AtomicReference<ScheduledFuture<?>>(null);
- final private AtomicReference<Exception> scheduledFlushException = new
AtomicReference<Exception>(null);
- private boolean enforceLock = true;
- private Promise<Void> closeFuture = null;
- private final boolean enableRecordCounts;
- private int positionWithinLogSegment = 0;
- private final long logSegmentSequenceNumber;
- // Used only for values that *could* change (e.g. buffer size etc.)
- private final DistributedLogConfiguration conf;
- private final OrderedScheduler scheduler;
-
- // stats
- private final StatsLogger envelopeStatsLogger;
- private final StatsLogger transmitOutstandingLogger;
- private final Counter transmitDataSuccesses;
- private final Counter transmitDataMisses;
- private final Gauge<Number> transmitOutstandingGauge;
- private final OpStatsLogger transmitDataPacketSize;
- private final Counter transmitControlSuccesses;
- private final Counter pFlushSuccesses;
- private final Counter pFlushMisses;
- private final OpStatsLogger writeTime;
- private final OpStatsLogger addCompleteTime;
- private final OpStatsLogger addCompleteQueuedTime;
- private final OpStatsLogger addCompleteDeferredTime;
- private final Counter pendingWrites;
-
- // add complete processing
- private final SafeQueueingFuturePool<Void> addCompleteFuturePool;
-
- // Functions
- private final AbstractFunction1<Integer, Future<Long>>
GET_LAST_TXID_ACKNOWLEDGED_AFTER_TRANSMIT_FUNC =
- new AbstractFunction1<Integer, Future<Long>>() {
- @Override
- public Future<Long> apply(Integer transmitRc) {
- if (BKException.Code.OK == transmitRc) {
- return Future.value(getLastTxIdAcknowledged());
- } else {
- return Future.exception(new
BKTransmitException("Failed to transmit entry", transmitRc));
- }
- }
- };
- final AbstractFunction1<Long, Future<Long>> COMMIT_AFTER_FLUSH_FUNC =
- new AbstractFunction1<Long, Future<Long>>() {
- @Override
- public Future<Long> apply(Long lastAckedTxId) {
- return commit();
- }
- };
-
- private final AlertStatsLogger alertStatsLogger;
- private final WriteLimiter writeLimiter;
- private final FailureInjector writeDelayInjector;
-
- /**
- * Construct an edit log output stream which writes to a ledger.
- */
- protected BKLogSegmentWriter(String streamName,
- String logSegmentName,
- DistributedLogConfiguration conf,
- int logSegmentMetadataVersion,
- LogSegmentEntryWriter entryWriter,
- DistributedLock lock, /** the lock needs to
be acquired **/
- long startTxId,
- long logSegmentSequenceNumber,
- OrderedScheduler scheduler,
- StatsLogger statsLogger,
- StatsLogger perLogStatsLogger,
- AlertStatsLogger alertStatsLogger,
- PermitLimiter globalWriteLimiter,
- FeatureProvider featureProvider,
- DynamicDistributedLogConfiguration dynConf)
- throws IOException {
- super();
-
- // set up a write limiter
- PermitLimiter streamWriteLimiter = null;
- if (conf.getPerWriterOutstandingWriteLimit() < 0) {
- streamWriteLimiter = PermitLimiter.NULL_PERMIT_LIMITER;
- } else {
- Feature disableWriteLimitFeature = featureProvider.getFeature(
- CoreFeatureKeys.DISABLE_WRITE_LIMIT.name().toLowerCase());
- streamWriteLimiter = new SimplePermitLimiter(
- conf.getOutstandingWriteLimitDarkmode(),
- conf.getPerWriterOutstandingWriteLimit(),
- statsLogger.scope("streamWriteLimiter"),
- false,
- disableWriteLimitFeature);
- }
- this.writeLimiter = new WriteLimiter(streamName, streamWriteLimiter,
globalWriteLimiter);
- this.alertStatsLogger = alertStatsLogger;
- this.envelopeStatsLogger =
BroadCastStatsLogger.masterslave(statsLogger, perLogStatsLogger);
-
- StatsLogger flushStatsLogger = statsLogger.scope("flush");
- StatsLogger pFlushStatsLogger = flushStatsLogger.scope("periodic");
- pFlushSuccesses = pFlushStatsLogger.getCounter("success");
- pFlushMisses = pFlushStatsLogger.getCounter("miss");
-
- // transmit
- StatsLogger transmitDataStatsLogger = statsLogger.scope("data");
- transmitDataSuccesses = transmitDataStatsLogger.getCounter("success");
- transmitDataMisses = transmitDataStatsLogger.getCounter("miss");
- StatsLogger transmitStatsLogger = statsLogger.scope("transmit");
- transmitDataPacketSize =
transmitStatsLogger.getOpStatsLogger("packetsize");
- StatsLogger transmitControlStatsLogger = statsLogger.scope("control");
- transmitControlSuccesses =
transmitControlStatsLogger.getCounter("success");
- StatsLogger segWriterStatsLogger = statsLogger.scope("seg_writer");
- writeTime = segWriterStatsLogger.getOpStatsLogger("write");
- addCompleteTime =
segWriterStatsLogger.scope("add_complete").getOpStatsLogger("callback");
- addCompleteQueuedTime =
segWriterStatsLogger.scope("add_complete").getOpStatsLogger("queued");
- addCompleteDeferredTime =
segWriterStatsLogger.scope("add_complete").getOpStatsLogger("deferred");
- pendingWrites = segWriterStatsLogger.getCounter("pending");
-
- // outstanding transmit requests
- transmitOutstandingLogger =
perLogStatsLogger.scope("transmit").scope("outstanding");
- transmitOutstandingGauge = new Gauge<Number>() {
- @Override
- public Number getDefaultValue() {
- return 0;
- }
- @Override
- public Number getSample() {
- return outstandingTransmits.get();
- }
- };
- transmitOutstandingLogger.registerGauge("requests",
transmitOutstandingGauge);
-
- outstandingTransmits = new AtomicInteger(0);
- this.fullyQualifiedLogSegment = streamName + ":" + logSegmentName;
- this.streamName = streamName;
- this.logSegmentMetadataVersion = logSegmentMetadataVersion;
- this.entryWriter = entryWriter;
- this.lock = lock;
- this.lock.checkOwnershipAndReacquire();
-
- final int configuredTransmissionThreshold =
dynConf.getOutputBufferSize();
- if (configuredTransmissionThreshold > MAX_LOGRECORDSET_SIZE) {
- LOG.warn("Setting output buffer size {} greater than max
transmission size {} for log segment {}",
- new Object[] {configuredTransmissionThreshold,
MAX_LOGRECORDSET_SIZE, fullyQualifiedLogSegment});
- this.transmissionThreshold = MAX_LOGRECORDSET_SIZE;
- } else {
- this.transmissionThreshold = configuredTransmissionThreshold;
- }
- this.compressionType =
CompressionUtils.stringToType(conf.getCompressionType());
-
- this.logSegmentSequenceNumber = logSegmentSequenceNumber;
- this.recordSetWriter = Entry.newEntry(
- streamName,
- Math.max(transmissionThreshold, 1024),
- envelopeBeforeTransmit(),
- compressionType,
- envelopeStatsLogger);
- this.packetPrevious = null;
- this.startTxId = startTxId;
- this.lastTxId = startTxId;
- this.lastTxIdAcknowledged = startTxId;
- this.enableRecordCounts = conf.getEnableRecordCounts();
- this.immediateFlushEnabled = conf.getImmediateFlushEnabled();
- this.isDurableWriteEnabled = dynConf.isDurableWriteEnabled();
- this.scheduler = scheduler;
-
- // Failure injection
- if (conf.getEIInjectWriteDelay()) {
- this.writeDelayInjector = new RandomDelayFailureInjector(dynConf);
- } else {
- this.writeDelayInjector = FailureInjector.NULL;
- }
-
- // If we are transmitting immediately (threshold == 0) and if immediate
- // flush is enabled, we don't need the periodic flush task
- final int configuredPeriodicFlushFrequency =
dynConf.getPeriodicFlushFrequencyMilliSeconds();
- if (!immediateFlushEnabled || (0 != this.transmissionThreshold)) {
- int periodicFlushFrequency = configuredPeriodicFlushFrequency;
- if (periodicFlushFrequency > 0 && scheduler != null) {
- periodicFlushSchedule = scheduler.scheduleAtFixedRate(this,
- periodicFlushFrequency/2, periodicFlushFrequency/2,
TimeUnit.MILLISECONDS);
- } else {
- periodicFlushSchedule = null;
- }
- } else {
- // Min delay heuristic applies only when immediate flush is enabled
- // and transmission threshold is zero
- minDelayBetweenImmediateFlushMs =
conf.getMinDelayBetweenImmediateFlushMs();
- periodicFlushSchedule = null;
- }
- this.periodicKeepAliveMs = conf.getPeriodicKeepAliveMilliSeconds();
- if (periodicKeepAliveMs > 0 && scheduler != null) {
- periodicKeepAliveSchedule = scheduler.scheduleAtFixedRate(new
Runnable() {
- @Override
- public void run() {
- keepAlive();
- }
- }, periodicKeepAliveMs, periodicKeepAliveMs,
TimeUnit.MILLISECONDS);
- } else {
- periodicKeepAliveSchedule = null;
- }
-
- this.conf = conf;
- if (null != scheduler) {
- this.addCompleteFuturePool = new
SafeQueueingFuturePool<Void>(scheduler.getFuturePool(streamName));
- } else {
- this.addCompleteFuturePool = null;
- }
- assert(!this.immediateFlushEnabled || (null != this.scheduler));
- this.lastTransmit = Stopwatch.createStarted();
- }
-
- String getFullyQualifiedLogSegment() {
- return fullyQualifiedLogSegment;
- }
-
- @VisibleForTesting
- DistributedLock getLock() {
- return this.lock;
- }
-
- @VisibleForTesting
- FuturePool getFuturePool() {
- if (null == scheduler) {
- return null;
- }
- return scheduler.getFuturePool(streamName);
- }
-
- @VisibleForTesting
- void setTransmitResult(int rc) {
- transmitResult.set(rc);
- }
-
- @VisibleForTesting
- protected final LogSegmentEntryWriter getEntryWriter() {
- return this.entryWriter;
- }
-
- @Override
- public long getLogSegmentId() {
- return this.entryWriter.getLogSegmentId();
- }
-
- protected final long getLogSegmentSequenceNumber() {
- return logSegmentSequenceNumber;
- }
-
- /**
- * Get the start tx id of the log segment.
- *
- * @return start tx id of the log segment.
- */
- protected final long getStartTxId() {
- return startTxId;
- }
-
- /**
- * Get the last tx id that has been written to the log segment buffer but
not committed yet.
- *
- * @return last tx id that has been written to the log segment buffer but
not committed yet.
- * @see #getLastTxIdAcknowledged()
- */
- synchronized long getLastTxId() {
- return lastTxId;
- }
-
- /**
- * Get the last tx id that has been acknowledged.
- *
- * @return last tx id that has been acknowledged.
- * @see #getLastTxId()
- */
- synchronized long getLastTxIdAcknowledged() {
- return lastTxIdAcknowledged;
- }
-
- /**
- * Get the position-within-logsemgnet of the last written log record.
- *
- * @return position-within-logsegment of the last written log record.
- */
- synchronized int getPositionWithinLogSegment() {
- return positionWithinLogSegment;
- }
-
- @VisibleForTesting
- long getLastEntryId() {
- return lastEntryId;
- }
-
- /**
- * Get the last dlsn of the last acknowledged record.
- *
- * @return last dlsn of the last acknowledged record.
- */
- synchronized DLSN getLastDLSN() {
- return lastDLSN;
- }
-
- @Override
- public long size() {
- return entryWriter.size();
- }
-
- private synchronized int getAverageTransmitSize() {
- if (numFlushesSinceRestart > 0) {
- long ret = numBytes/numFlushesSinceRestart;
-
- if (ret < Integer.MIN_VALUE || ret > Integer.MAX_VALUE) {
- throw new IllegalArgumentException
- (ret + " transmit size should never exceed max transmit
size");
- }
- return (int) ret;
- }
-
- return 0;
- }
-
- private Entry.Writer newRecordSetWriter() {
- return Entry.newEntry(
- streamName,
- Math.max(transmissionThreshold, getAverageTransmitSize()),
- envelopeBeforeTransmit(),
- compressionType,
- envelopeStatsLogger);
- }
-
- private boolean envelopeBeforeTransmit() {
- return
LogSegmentMetadata.supportsEnvelopedEntries(logSegmentMetadataVersion);
- }
-
- @Override
- public Future<Void> asyncClose() {
- return closeInternal(false);
- }
-
- @Override
- public Future<Void> asyncAbort() {
- return closeInternal(true);
- }
-
- private void flushAddCompletes() {
- if (null != addCompleteFuturePool) {
- addCompleteFuturePool.close();
- }
- }
-
- private synchronized void abortPacket(BKTransmitPacket packet) {
- long numRecords = 0;
- if (null != packet) {
- EntryBuffer recordSet = packet.getRecordSet();
- numRecords = recordSet.getNumRecords();
- int rc = transmitResult.get();
- if (BKException.Code.OK == rc) {
- rc = BKException.Code.InterruptedException;
- }
- Throwable reason = new WriteCancelledException(streamName,
FutureUtils.transmitException(rc));
- recordSet.abortTransmit(reason);
- }
- LOG.info("Stream {} aborted {} writes", fullyQualifiedLogSegment,
numRecords);
- }
-
- private synchronized long getWritesPendingTransmit() {
- if (null != recordSetWriter) {
- return recordSetWriter.getNumRecords();
- } else {
- return 0;
- }
- }
-
- private synchronized long getPendingAddCompleteCount() {
- if (null != addCompleteFuturePool) {
- return addCompleteFuturePool.size();
- } else {
- return 0;
- }
- }
-
- private Future<Void> closeInternal(boolean abort) {
- Promise<Void> closePromise;
- synchronized (this) {
- if (null != closeFuture) {
- return closeFuture;
- }
- closePromise = closeFuture = new Promise<Void>();
- }
-
- AtomicReference<Throwable> throwExc = new
AtomicReference<Throwable>(null);
- closeInternal(abort, throwExc, closePromise);
- return closePromise;
- }
-
- private void closeInternal(final boolean abort,
- final AtomicReference<Throwable> throwExc,
- final Promise<Void> closePromise) {
- // clean stats resources
- this.transmitOutstandingLogger.unregisterGauge("requests",
transmitOutstandingGauge);
- this.writeLimiter.close();
-
- // Cancel the periodic keep alive schedule first
- if (null != periodicKeepAliveSchedule) {
- if (!periodicKeepAliveSchedule.cancel(false)) {
- LOG.info("Periodic keepalive for log segment {} isn't
cancelled.", getFullyQualifiedLogSegment());
- }
- }
-
- // Cancel the periodic flush schedule first
- // The task is allowed to exit gracefully
- if (null != periodicFlushSchedule) {
- // we don't need to care about the cancel result here. if the
periodicl flush task couldn't
- // be cancelled, it means that it is doing flushing. So following
flushes would be synchronized
- // to wait until background flush completed.
- if (!periodicFlushSchedule.cancel(false)) {
- LOG.info("Periodic flush for log segment {} isn't cancelled.",
getFullyQualifiedLogSegment());
- }
- }
-
- // If it is a normal close and the stream isn't in an error state, we
attempt to flush any buffered data
- if (!abort && !isLogSegmentInError()) {
- this.enforceLock = false;
- LOG.info("Flushing before closing log segment {}",
getFullyQualifiedLogSegment());
- flushAndCommit().addEventListener(new FutureEventListener<Long>() {
- @Override
- public void onSuccess(Long value) {
- abortTransmitPacketOnClose(abort, throwExc, closePromise);
- }
-
- @Override
- public void onFailure(Throwable cause) {
- throwExc.set(cause);
- abortTransmitPacketOnClose(abort, throwExc, closePromise);
- }
- });
- } else {
- abortTransmitPacketOnClose(abort, throwExc, closePromise);
- }
-
- }
-
- private void abortTransmitPacketOnClose(final boolean abort,
- final AtomicReference<Throwable>
throwExc,
- final Promise<Void> closePromise) {
- LOG.info("Closing BKPerStreamLogWriter (abort={}) for {} :" +
- " lastDLSN = {} outstandingTransmits = {}
writesPendingTransmit = {} addCompletesPending = {}",
- new Object[]{abort, fullyQualifiedLogSegment, getLastDLSN(),
- outstandingTransmits.get(),
getWritesPendingTransmit(), getPendingAddCompleteCount()});
-
- // Save the current packet to reset, leave a new empty packet to avoid
a race with
- // addCompleteDeferredProcessing.
- final BKTransmitPacket packetPreviousSaved;
- final BKTransmitPacket packetCurrentSaved;
- synchronized (this) {
- packetPreviousSaved = packetPrevious;
- packetCurrentSaved = new BKTransmitPacket(recordSetWriter);
- recordSetWriter = newRecordSetWriter();
- }
-
- // Once the last packet been transmitted, apply any remaining promises
asynchronously
- // to avoid blocking close if bk client is slow for some reason.
- if (null != packetPreviousSaved) {
- packetPreviousSaved.addTransmitCompleteListener(new
FutureEventListener<Integer>() {
- @Override
- public void onSuccess(Integer transmitResult) {
- flushAddCompletes();
- abortPacket(packetCurrentSaved);
- }
- @Override
- public void onFailure(Throwable cause) {
- LOG.error("Unexpected error on transmit completion ",
cause);
- }
- });
- } else {
- // In this case there are no pending add completes, but we still
need to abort the
- // current packet.
- abortPacket(packetCurrentSaved);
- }
- closeLedgerOnClose(abort, throwExc, closePromise);
- }
-
- private void closeLedgerOnClose(final boolean abort,
- final AtomicReference<Throwable> throwExc,
- final Promise<Void> closePromise) {
- // close the log segment if it isn't in error state, so all the
outstanding addEntry(s) will callback.
- if (null == throwExc.get() && !isLogSegmentInError()) {
- // Synchronous closing the ledger handle, if we couldn't close a
ledger handle successfully.
- // we should throw the exception to #closeToFinalize, so it would
fail completing a log segment.
- entryWriter.asyncClose(new CloseCallback() {
- @Override
- public void closeComplete(int rc, LedgerHandle lh, Object ctx)
{
- if (BKException.Code.OK != rc &&
BKException.Code.LedgerClosedException != rc) {
- if (!abort) {
- throwExc.set(new IOException("Failed to close
ledger for " + fullyQualifiedLogSegment + " : " +
- BKException.getMessage(rc)));
- }
- }
- completeClosePromise(abort, throwExc, closePromise);
- }
- }, null);
- } else {
- completeClosePromise(abort, throwExc, closePromise);
- }
- }
-
- private void completeClosePromise(final boolean abort,
- final AtomicReference<Throwable>
throwExc,
- final Promise<Void> closePromise) {
- // If add entry failed because of closing ledger above, we don't need
to fail the close operation
- if (!abort && null == throwExc.get() &&
shouldFailCompleteLogSegment()) {
- throwExc.set(new BKTransmitException("Closing an errored stream :
", transmitResult.get()));
- }
-
- if (null == throwExc.get()) {
- FutureUtils.setValue(closePromise, null);
- } else {
- FutureUtils.setException(closePromise, throwExc.get());
- }
- }
-
- @Override
- synchronized public void write(LogRecord record) throws IOException {
- writeUserRecord(record);
- flushIfNeeded();
- }
-
- @Override
- synchronized public Future<DLSN> asyncWrite(LogRecord record) {
- return asyncWrite(record, true);
- }
-
- synchronized public Future<DLSN> asyncWrite(LogRecord record, boolean
flush) {
- Future<DLSN> result = null;
- try {
- if (record.isControl()) {
- // we don't pack control records with user records together
- // so transmit current output buffer if possible
- try {
- transmit();
- } catch (IOException ioe) {
- return Future.exception(new
WriteCancelledException(fullyQualifiedLogSegment, ioe));
- }
- result = writeControlLogRecord(record);
- transmit();
- } else {
- result = writeUserRecord(record);
- if (!isDurableWriteEnabled) {
- // we have no idea about the DLSN if durability is turned
off.
- result = Future.value(DLSN.InvalidDLSN);
- }
- if (flush) {
- flushIfNeeded();
- }
- }
- } catch (IOException ioe) {
- // We may incorrectly report transmit failure here, but only if we
happened to hit
- // packet/xmit size limit conditions AND fail flush above, which
should happen rarely
- if (null != result) {
- LOG.error("Overriding first result with flush failure {}",
result);
- }
- result = Future.exception(ioe);
-
- // Flush to ensure any prev. writes with flush=false are flushed
despite failure.
- flushIfNeededNoThrow();
- }
- return result;
- }
-
- synchronized private Future<DLSN> writeUserRecord(LogRecord record) throws
IOException {
- if (null != closeFuture) {
- throw new WriteException(fullyQualifiedLogSegment,
BKException.getMessage(BKException.Code.LedgerClosedException));
- }
-
- if (BKException.Code.OK != transmitResult.get()) {
- // Failfast if the stream already encountered error with safe
retry on the client
- throw new WriteException(fullyQualifiedLogSegment,
BKException.getMessage(transmitResult.get()));
- }
-
- if (streamEnded) {
- throw new EndOfStreamException("Writing to a stream after it has
been marked as completed");
- }
-
- if ((record.getTransactionId() < 0) ||
- (record.getTransactionId() == DistributedLogConstants.MAX_TXID)) {
- throw new
TransactionIdOutOfOrderException(record.getTransactionId());
- }
-
- // Inject write delay if configured to do so
- writeDelayInjector.inject();
-
- // Will check write rate limits and throw if exceeded.
- writeLimiter.acquire();
- pendingWrites.inc();
-
- // The count represents the number of user records up to the
- // current record
- // Increment the record count only when writing a user log record
- // Internally generated log records don't increment the count
- // writeInternal will always set a count regardless of whether it was
- // incremented or not.
- Future<DLSN> future = null;
- try {
- // increment the position for the record to write
- // if the record is failed to write, it would be decremented.
- positionWithinLogSegment++;
- int numRecords = 1;
- if (record.isRecordSet()) {
- numRecords = LogRecordSet.numRecords(record);
- }
- future = writeInternal(record);
- // after the record (record set) is written, the position should be
- // moved for {numRecords}, but since we already moved the record
by 1
- // so advance the position for other {numRecords - 1}.
- positionWithinLogSegment += (numRecords - 1);
- } catch (IOException ex) {
- writeLimiter.release();
- pendingWrites.dec();
- positionWithinLogSegment--;
- throw ex;
- }
-
- // Track outstanding requests and return the future.
- return future.ensure(new Function0<BoxedUnit>() {
- public BoxedUnit apply() {
- pendingWrites.dec();
- writeLimiter.release();
- return null;
- }
- });
- }
-
- boolean isLogSegmentInError() {
- return (transmitResult.get() != BKException.Code.OK);
- }
-
- boolean shouldFailCompleteLogSegment() {
- return (transmitResult.get() != BKException.Code.OK) &&
- (transmitResult.get() !=
BKException.Code.LedgerClosedException);
- }
-
- synchronized public Future<DLSN> writeInternal(LogRecord record)
- throws LogRecordTooLongException, LockingException,
BKTransmitException,
- WriteException, InvalidEnvelopedEntryException {
- int logRecordSize = record.getPersistentSize();
-
- if (logRecordSize > MAX_LOGRECORD_SIZE) {
- throw new LogRecordTooLongException(String.format(
- "Log Record of size %d written when only %d is allowed",
- logRecordSize, MAX_LOGRECORD_SIZE));
- }
-
- // If we will exceed the max number of bytes allowed per entry
- // initiate a transmit before accepting the new log record
- if ((recordSetWriter.getNumBytes() + logRecordSize) >
MAX_LOGRECORDSET_SIZE) {
- checkStateAndTransmit();
- }
-
- checkWriteLock();
-
- if (enableRecordCounts) {
- // Set the count here. The caller would appropriately increment it
- // if this log record is to be counted
- record.setPositionWithinLogSegment(positionWithinLogSegment);
- }
-
- Promise<DLSN> writePromise = new Promise<DLSN>();
- writePromise.addEventListener(new OpStatsListener<DLSN>(writeTime));
- recordSetWriter.writeRecord(record, writePromise);
-
- if (record.getTransactionId() < lastTxId) {
- LOG.info("Log Segment {} TxId decreased Last: {} Record: {}",
- new Object[] {fullyQualifiedLogSegment, lastTxId,
record.getTransactionId()});
- }
- if (!record.isControl()) {
- // only update last tx id for user records
- lastTxId = record.getTransactionId();
- outstandingBytes += (20 + record.getPayload().length);
- }
- return writePromise;
- }
-
- synchronized private Future<DLSN> writeControlLogRecord()
- throws BKTransmitException, WriteException,
InvalidEnvelopedEntryException,
- LockingException, LogRecordTooLongException {
- LogRecord controlRec = new LogRecord(lastTxId,
DistributedLogConstants.CONTROL_RECORD_CONTENT);
- controlRec.setControl();
- return writeControlLogRecord(controlRec);
- }
-
- synchronized private Future<DLSN> writeControlLogRecord(LogRecord record)
- throws BKTransmitException, WriteException,
InvalidEnvelopedEntryException,
- LockingException, LogRecordTooLongException {
- return writeInternal(record);
- }
-
- /**
- * We write a special log record that marks the end of the stream. Since
this is the last
- * log record in the stream, it is marked with MAX_TXID. MAX_TXID also has
the useful
- * side-effect of disallowing future startLogSegment calls through the
MaxTxID check
- *
- * @throws IOException
- */
- synchronized private void writeEndOfStreamMarker() throws IOException {
- LogRecord endOfStreamRec = new
LogRecord(DistributedLogConstants.MAX_TXID, "endOfStream".getBytes(UTF_8));
- endOfStreamRec.setEndOfStream();
- writeInternal(endOfStreamRec);
- }
-
- /**
- * Flushes all the data up to this point,
- * adds the end of stream marker and marks the stream
- * as read-only in the metadata. No appends to the
- * stream will be allowed after this point
- */
- public Future<Long> markEndOfStream() {
- synchronized (this) {
- try {
- writeEndOfStreamMarker();
- } catch (IOException e) {
- return Future.exception(e);
- }
- streamEnded = true;
- }
- return flushAndCommit();
- }
-
- /**
- * Write bulk of records.
- *
- * (TODO: moved this method to log writer level)
- *
- * @param records list of records to write
- * @return number of records that has been written
- * @throws IOException when there is I/O errors during writing records.
- */
- synchronized public int writeBulk(List<LogRecord> records) throws
IOException {
- int numRecords = 0;
- for (LogRecord r : records) {
- write(r);
- numRecords++;
- }
- return numRecords;
- }
-
- private void checkStateBeforeTransmit() throws WriteException {
- try {
-
FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_TransmitBeforeAddEntry);
- } catch (IOException e) {
- throw new WriteException(streamName, "Fail transmit before adding
entries");
- }
- }
-
- /**
- * Transmit the output buffer data to the backend.
- *
- * @return last txn id that already acknowledged
- * @throws BKTransmitException if the segment writer is already in error
state
- * @throws LockingException if the segment writer lost lock before transmit
- * @throws WriteException if failed to create the envelope for the data to
transmit
- * @throws InvalidEnvelopedEntryException when built an invalid enveloped
entry
- */
- synchronized void checkStateAndTransmit()
- throws BKTransmitException, WriteException,
InvalidEnvelopedEntryException, LockingException {
- checkStateBeforeTransmit();
- transmit();
- }
-
- @Override
- public synchronized Future<Long> flush() {
- try {
- checkStateBeforeTransmit();
- } catch (WriteException e) {
- return Future.exception(e);
- }
-
- Future<Integer> transmitFuture;
- try {
- transmitFuture = transmit();
- } catch (BKTransmitException e) {
- return Future.exception(e);
- } catch (LockingException e) {
- return Future.exception(e);
- } catch (WriteException e) {
- return Future.exception(e);
- } catch (InvalidEnvelopedEntryException e) {
- return Future.exception(e);
- }
-
- if (null == transmitFuture) {
- if (null != packetPrevious) {
- transmitFuture = packetPrevious.getTransmitFuture();
- } else {
- return Future.value(getLastTxIdAcknowledged());
- }
- }
-
- return
transmitFuture.flatMap(GET_LAST_TXID_ACKNOWLEDGED_AFTER_TRANSMIT_FUNC);
- }
-
- @Override
- public synchronized Future<Long> commit() {
- // we don't pack control records with user records together
- // so transmit current output buffer if possible
- Future<Integer> transmitFuture;
- try {
- try {
- transmitFuture = transmit();
- } catch (IOException ioe) {
- return Future.exception(ioe);
- }
- if (null == transmitFuture) {
- writeControlLogRecord();
- return flush();
- }
- } catch (IOException ioe) {
- return Future.exception(ioe);
- }
- return
transmitFuture.flatMap(GET_LAST_TXID_ACKNOWLEDGED_AFTER_TRANSMIT_FUNC);
- }
-
- Future<Long> flushAndCommit() {
- return flush().flatMap(COMMIT_AFTER_FLUSH_FUNC);
- }
-
- void flushIfNeededNoThrow() {
- try {
- flushIfNeeded();
- } catch (IOException ioe) {
- LOG.error("Encountered exception while flushing log records to
stream {}",
- fullyQualifiedLogSegment, ioe);
- }
- }
-
- void scheduleFlushWithDelayIfNeeded(final Callable<?> callable,
- final
AtomicReference<ScheduledFuture<?>> scheduledFutureRef) {
- final long delayMs = Math.max(0, minDelayBetweenImmediateFlushMs -
lastTransmit.elapsed(TimeUnit.MILLISECONDS));
- final ScheduledFuture<?> scheduledFuture = scheduledFutureRef.get();
- if ((null == scheduledFuture) || scheduledFuture.isDone()) {
- scheduledFutureRef.set(scheduler.schedule(new Runnable() {
- @Override
- public void run() {
- synchronized(this) {
- scheduledFutureRef.set(null);
- try {
- callable.call();
-
- // Flush was successful or wasn't needed, the
exception should be unset.
- scheduledFlushException.set(null);
- } catch (Exception exc) {
- scheduledFlushException.set(exc);
- LOG.error("Delayed flush failed", exc);
- }
- }
- }
- }, delayMs, TimeUnit.MILLISECONDS));
- }
- }
-
- // Based on transmit buffer size, immediate flush, etc., should we flush
the current
- // packet now.
- void flushIfNeeded() throws BKTransmitException, WriteException,
InvalidEnvelopedEntryException,
- LockingException, FlushException {
- if (outstandingBytes > transmissionThreshold) {
- // If flush delay is disabled, flush immediately, else schedule
appropriately.
- if (0 == minDelayBetweenImmediateFlushMs) {
- checkStateAndTransmit();
- } else {
- scheduleFlushWithDelayIfNeeded(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- checkStateAndTransmit();
- return null;
- }
- }, transmitSchedFutureRef);
-
- // Timing here is not very important--the last flush failed
and we should
- // indicate this to the caller. The next flush may succeed and
unset the
- // scheduledFlushException in which case the next write will
succeed (if the caller
- // hasn't already closed the writer).
- if (scheduledFlushException.get() != null) {
- throw new FlushException("Last flush encountered an error
while writing data to the backend",
- getLastTxId(), getLastTxIdAcknowledged(),
scheduledFlushException.get());
- }
- }
- }
- }
-
- private void checkWriteLock() throws LockingException {
- try {
- if
(FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_WriteInternalLostLock))
{
- throw new LockingException("/failpoint/lockpath", "failpoint
is simulating a lost lock"
- + getFullyQualifiedLogSegment());
- }
- } catch (IOException e) {
- throw new LockingException("/failpoint/lockpath", "failpoint is
simulating a lost lock for "
- + getFullyQualifiedLogSegment());
- }
- if (enforceLock) {
- lock.checkOwnershipAndReacquire();
- }
- }
-
- /**
- * Transmit the current buffer to bookkeeper.
- * Synchronised at the class. #write() and #setReadyToFlush()
- * are never called at the same time.
- *
- * NOTE: This method should only throw known exceptions so that we don't
accidentally
- * add new code that throws in an inappropriate place.
- *
- * @return a transmit future for caller to wait for transmit result if we
transmit successfully,
- * null if no data to transmit
- * @throws BKTransmitException if the segment writer is already in error
state
- * @throws LockingException if the segment writer lost lock before transmit
- * @throws WriteException if failed to create the envelope for the data to
transmit
- * @throws InvalidEnvelopedEntryException when built an invalid enveloped
entry
- */
- private Future<Integer> transmit()
- throws BKTransmitException, LockingException, WriteException,
InvalidEnvelopedEntryException {
- EntryBuffer recordSetToTransmit;
- transmitLock.lock();
- try {
- synchronized (this) {
- checkWriteLock();
- // If transmitResult is anything other than
BKException.Code.OK, it means that the
- // stream has encountered an error and cannot be written to.
- if (!transmitResult.compareAndSet(BKException.Code.OK,
- BKException.Code.OK)) {
- LOG.error("Log Segment {} Trying to write to an errored
stream; Error is {}",
- fullyQualifiedLogSegment,
- BKException.getMessage(transmitResult.get()));
- throw new BKTransmitException("Trying to write to an
errored stream;"
- + " Error code : ("
+ transmitResult.get()
- + ") " +
BKException.getMessage(transmitResult.get()), transmitResult.get());
- }
-
- if (recordSetWriter.getNumRecords() == 0) {
- // Control flushes always have at least the control record
to flush
- transmitDataMisses.inc();
- return null;
- }
-
- recordSetToTransmit = recordSetWriter;
- recordSetWriter = newRecordSetWriter();
- outstandingBytes = 0;
-
- if (recordSetToTransmit.hasUserRecords()) {
- numBytes += recordSetToTransmit.getNumBytes();
- numFlushesSinceRestart++;
- }
- }
-
- Buffer toSend;
- try {
- toSend = recordSetToTransmit.getBuffer();
-
FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_TransmitFailGetBuffer);
- } catch (IOException e) {
- if (e instanceof InvalidEnvelopedEntryException) {
- alertStatsLogger.raise("Invalid enveloped entry for
segment {} : ", fullyQualifiedLogSegment, e);
- }
- LOG.error("Exception while enveloping entries for segment: {}",
- new Object[] {fullyQualifiedLogSegment}, e);
- // If a write fails here, we need to set the transmit result
to an error so that
- // no future writes go through and violate ordering guarantees.
- transmitResult.set(BKException.Code.WriteException);
- if (e instanceof InvalidEnvelopedEntryException) {
- alertStatsLogger.raise("Invalid enveloped entry for
segment {} : ", fullyQualifiedLogSegment, e);
- throw (InvalidEnvelopedEntryException) e;
- } else {
- throw new WriteException(streamName, "Envelope Error");
- }
- }
-
- synchronized (this) {
- // update the transmit timestamp
- lastTransmitNanos = MathUtils.nowInNano();
-
- BKTransmitPacket packet = new
BKTransmitPacket(recordSetToTransmit);
- packetPrevious = packet;
- entryWriter.asyncAddEntry(toSend.getData(), 0, toSend.size(),
- this, packet);
-
- if (recordSetToTransmit.hasUserRecords()) {
- transmitDataSuccesses.inc();
- } else {
- transmitControlSuccesses.inc();
- }
-
- lastTransmit.reset().start();
- outstandingTransmits.incrementAndGet();
- controlFlushNeeded = false;
- return packet.getTransmitFuture();
- }
- } finally {
- transmitLock.unlock();
- }
- }
-
- /**
- * Checks if there is any data to transmit so that the periodic flush
- * task can determine if there is anything it needs to do
- */
- synchronized private boolean haveDataToTransmit() {
- if (!transmitResult.compareAndSet(BKException.Code.OK,
BKException.Code.OK)) {
- // Even if there is data it cannot be transmitted, so effectively
nothing to send
- return false;
- }
-
- return (recordSetWriter.getNumRecords() > 0);
- }
-
- @Override
- public void addComplete(final int rc, LedgerHandle handle,
- final long entryId, final Object ctx) {
- final AtomicReference<Integer> effectiveRC = new
AtomicReference<Integer>(rc);
- try {
- if
(FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_TransmitComplete))
{
- effectiveRC.set(BKException.Code.UnexpectedConditionException);
- }
- } catch (Exception exc) {
- effectiveRC.set(BKException.Code.UnexpectedConditionException);
- }
-
- // Sanity check to make sure we're receiving these callbacks in order.
- if (entryId > -1 && lastEntryId >= entryId) {
- LOG.error("Log segment {} saw out of order entry {} lastEntryId
{}",
- new Object[] {fullyQualifiedLogSegment, entryId, lastEntryId});
- }
- lastEntryId = entryId;
-
- assert (ctx instanceof BKTransmitPacket);
- final BKTransmitPacket transmitPacket = (BKTransmitPacket) ctx;
-
- // Time from transmit until receipt of addComplete callback
- addCompleteTime.registerSuccessfulEvent(TimeUnit.MICROSECONDS.convert(
- System.nanoTime() - transmitPacket.getTransmitTime(),
TimeUnit.NANOSECONDS));
-
- if (BKException.Code.OK == rc) {
- EntryBuffer recordSet = transmitPacket.getRecordSet();
- if (recordSet.hasUserRecords()) {
- synchronized (this) {
- lastTxIdAcknowledged = Math.max(lastTxIdAcknowledged,
recordSet.getMaxTxId());
- }
- }
- }
-
- if (null != addCompleteFuturePool) {
- final Stopwatch queuedTime = Stopwatch.createStarted();
- addCompleteFuturePool.apply(new Function0<Void>() {
- public Void apply() {
- final Stopwatch deferredTime = Stopwatch.createStarted();
-
addCompleteQueuedTime.registerSuccessfulEvent(queuedTime.elapsed(TimeUnit.MICROSECONDS));
- addCompleteDeferredProcessing(transmitPacket, entryId,
effectiveRC.get());
-
addCompleteDeferredTime.registerSuccessfulEvent(deferredTime.elapsed(TimeUnit.MICROSECONDS));
- return null;
- }
- @Override
- public String toString() {
- return String.format("AddComplete(Stream=%s, entryId=%d,
rc=%d)",
- fullyQualifiedLogSegment, entryId, rc);
- }
- }).addEventListener(new FutureEventListener<Void>() {
- @Override
- public void onSuccess(Void done) {
- }
- @Override
- public void onFailure(Throwable cause) {
- LOG.error("addComplete processing failed for {} entry {}
lastTxId {} rc {} with error",
- new Object[] {fullyQualifiedLogSegment, entryId,
transmitPacket.getRecordSet().getMaxTxId(), rc, cause});
- }
- });
- // Race condition if we notify before the addComplete is enqueued.
- transmitPacket.notifyTransmitComplete(effectiveRC.get());
- outstandingTransmits.getAndDecrement();
- } else {
- // Notify transmit complete must be called before deferred
processing in the
- // sync case since otherwise callbacks in deferred processing may
deadlock.
- transmitPacket.notifyTransmitComplete(effectiveRC.get());
- outstandingTransmits.getAndDecrement();
- addCompleteDeferredProcessing(transmitPacket, entryId,
effectiveRC.get());
- }
- }
-
- private void addCompleteDeferredProcessing(final BKTransmitPacket
transmitPacket,
- final long entryId,
- final int rc) {
- boolean cancelPendingPromises = false;
- EntryBuffer recordSet = transmitPacket.getRecordSet();
- synchronized (this) {
- if (transmitResult.compareAndSet(BKException.Code.OK, rc)) {
- // If this is the first time we are setting an error code in
the transmitResult then
- // we must cancel pending promises; once this error has been
set, more records will not
- // be enqueued; they will be failed with WriteException
- cancelPendingPromises = (BKException.Code.OK != rc);
- } else {
- LOG.warn("Log segment {} entryId {}: Tried to set transmit
result to ({}) but is already ({})",
- new Object[] {fullyQualifiedLogSegment, entryId, rc,
transmitResult.get()});
- }
-
- if (transmitResult.get() != BKException.Code.OK) {
- if (recordSet.hasUserRecords()) {
-
transmitDataPacketSize.registerFailedEvent(recordSet.getNumBytes());
- }
- } else {
- // If we had data that we flushed then we need it to make sure
that
- // background flush in the next pass will make the previous
writes
- // visible by advancing the lastAck
- if (recordSet.hasUserRecords()) {
-
transmitDataPacketSize.registerSuccessfulEvent(recordSet.getNumBytes());
- controlFlushNeeded = true;
- if (immediateFlushEnabled) {
- if (0 == minDelayBetweenImmediateFlushMs) {
- backgroundFlush(true);
- } else {
- scheduleFlushWithDelayIfNeeded(new
Callable<Void>() {
- @Override
- public Void call() throws Exception {
- backgroundFlush(true);
- return null;
- }
- }, immFlushSchedFutureRef);
- }
- }
- }
- }
-
- // update last dlsn before satisifying future
- if (BKException.Code.OK == transmitResult.get()) {
- DLSN lastDLSNInPacket = recordSet.finalizeTransmit(
- logSegmentSequenceNumber, entryId);
- if (recordSet.hasUserRecords()) {
- if (null != lastDLSNInPacket &&
lastDLSN.compareTo(lastDLSNInPacket) < 0) {
- lastDLSN = lastDLSNInPacket;
- }
- }
- }
- }
-
- if (BKException.Code.OK == transmitResult.get()) {
- recordSet.completeTransmit(logSegmentSequenceNumber, entryId);
- } else {
-
recordSet.abortTransmit(FutureUtils.transmitException(transmitResult.get()));
- }
-
- if (cancelPendingPromises) {
- // Since the writer is in a bad state no more packets will be
tramsitted, and its safe to
- // assign a new empty packet. This is to avoid a race with
closeInternal which may also
- // try to cancel the current packet;
- final BKTransmitPacket packetCurrentSaved;
- synchronized (this) {
- packetCurrentSaved = new BKTransmitPacket(recordSetWriter);
- recordSetWriter = newRecordSetWriter();
- }
- packetCurrentSaved.getRecordSet().abortTransmit(
- new WriteCancelledException(streamName,
-
FutureUtils.transmitException(transmitResult.get())));
- }
- }
-
- @Override
- synchronized public void run() {
- backgroundFlush(false);
- }
-
- synchronized private void backgroundFlush(boolean controlFlushOnly) {
- if (null != closeFuture) {
- // if the log segment is closing, skip any background flushing
- LOG.debug("Skip background flushing since log segment {} is
closing.", getFullyQualifiedLogSegment());
- return;
- }
- try {
- boolean newData = haveDataToTransmit();
-
- if (controlFlushNeeded || (!controlFlushOnly && newData)) {
- // If we need this periodic transmit to persist previously
written data but
- // there is no new data (which would cause the transmit to be
skipped) generate
- // a control record
- if (!newData) {
- writeControlLogRecord();
- }
-
- transmit();
- pFlushSuccesses.inc();
- } else {
- pFlushMisses.inc();
- }
- } catch (IOException exc) {
- LOG.error("Log Segment {}: Error encountered by the periodic
flush", fullyQualifiedLogSegment, exc);
- }
- }
-
- synchronized private void keepAlive() {
- if (null != closeFuture) {
- // if the log segment is closing, skip sending any keep alive
records.
- LOG.debug("Skip sending keepAlive control record since log segment
{} is closing.",
- getFullyQualifiedLogSegment());
- return;
- }
-
- if (MathUtils.elapsedMSec(lastTransmitNanos) < periodicKeepAliveMs) {
- return;
- }
-
- LogRecord controlRec = new LogRecord(lastTxId,
DistributedLogConstants.KEEPALIVE_RECORD_CONTENT);
- controlRec.setControl();
- asyncWrite(controlRec);
- }
-
-}