http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java new file mode 100644 index 0000000..fdb29f3 --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java @@ -0,0 +1,1325 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; +import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; +import org.apache.distributedlog.exceptions.DLIllegalStateException; +import org.apache.distributedlog.exceptions.EndOfStreamException; +import org.apache.distributedlog.exceptions.LockingException; +import org.apache.distributedlog.exceptions.LogSegmentNotFoundException; +import org.apache.distributedlog.exceptions.TransactionIdOutOfOrderException; +import org.apache.distributedlog.exceptions.UnexpectedException; +import org.apache.distributedlog.function.GetLastTxIdFunction; +import org.apache.distributedlog.logsegment.LogSegmentEntryStore; +import org.apache.distributedlog.logsegment.LogSegmentEntryWriter; +import org.apache.distributedlog.metadata.LogMetadataForWriter; +import org.apache.distributedlog.lock.DistributedLock; +import org.apache.distributedlog.logsegment.LogSegmentFilter; +import org.apache.distributedlog.logsegment.LogSegmentMetadataCache; +import org.apache.distributedlog.logsegment.RollingPolicy; +import org.apache.distributedlog.logsegment.SizeBasedRollingPolicy; +import org.apache.distributedlog.logsegment.TimeBasedRollingPolicy; +import org.apache.distributedlog.metadata.LogStreamMetadataStore; +import org.apache.distributedlog.metadata.MetadataUpdater; +import org.apache.distributedlog.metadata.LogSegmentMetadataStoreUpdater; +import org.apache.distributedlog.util.Allocator; +import org.apache.distributedlog.util.DLUtils; +import org.apache.distributedlog.util.FailpointUtils; +import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.util.FutureUtils.FutureEventListenerRunnable; +import org.apache.distributedlog.util.OrderedScheduler; +import org.apache.distributedlog.util.Transaction; +import org.apache.distributedlog.util.PermitLimiter; +import org.apache.distributedlog.util.Utils; +import com.twitter.util.Function; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; +import com.twitter.util.Promise; +import org.apache.bookkeeper.feature.FeatureProvider; +import org.apache.bookkeeper.stats.AlertStatsLogger; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.versioning.Version; +import org.apache.bookkeeper.versioning.Versioned; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.runtime.AbstractFunction1; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.apache.distributedlog.impl.ZKLogSegmentFilters.WRITE_HANDLE_FILTER; + +/** + * Log Handler for Writers. + * + * <h3>Metrics</h3> + * All the metrics about log write handler are exposed under scope `segments`. + * <ul> + * <li> `segments`/open : opstats. latency characteristics on starting a new log segment. + * <li> `segments`/close : opstats. latency characteristics on completing an inprogress log segment. + * <li> `segments`/recover : opstats. latency characteristics on recovering a log segment. + * <li> `segments`/delete : opstats. latency characteristics on deleting a log segment. + * </ul> + */ +class BKLogWriteHandler extends BKLogHandler { + static final Logger LOG = LoggerFactory.getLogger(BKLogReadHandler.class); + + private static Transaction.OpListener<LogSegmentEntryWriter> NULL_OP_LISTENER = + new Transaction.OpListener<LogSegmentEntryWriter>() { + @Override + public void onCommit(LogSegmentEntryWriter r) { + // no-op + } + + @Override + public void onAbort(Throwable t) { + // no-op + } + }; + + protected final LogMetadataForWriter logMetadataForWriter; + protected final Allocator<LogSegmentEntryWriter, Object> logSegmentAllocator; + protected final DistributedLock lock; + protected final MaxTxId maxTxId; + protected final MaxLogSegmentSequenceNo maxLogSegmentSequenceNo; + protected final boolean validateLogSegmentSequenceNumber; + protected final int regionId; + protected final RollingPolicy rollingPolicy; + protected Future<? extends DistributedLock> lockFuture = null; + protected final PermitLimiter writeLimiter; + protected final FeatureProvider featureProvider; + protected final DynamicDistributedLogConfiguration dynConf; + protected final MetadataUpdater metadataUpdater; + // tracking the inprogress log segments + protected final LinkedList<Long> inprogressLSSNs; + + // Fetch LogSegments State: write can continue without full list of log segments while truncation needs + private final Future<Versioned<List<LogSegmentMetadata>>> fetchForWrite; + private Future<Versioned<List<LogSegmentMetadata>>> fetchForTruncation; + + // Recover Functions + private final RecoverLogSegmentFunction recoverLogSegmentFunction = + new RecoverLogSegmentFunction(); + private final AbstractFunction1<List<LogSegmentMetadata>, Future<Long>> recoverLogSegmentsFunction = + new AbstractFunction1<List<LogSegmentMetadata>, Future<Long>>() { + @Override + public Future<Long> apply(List<LogSegmentMetadata> segmentList) { + LOG.info("Initiating Recovery For {} : {}", getFullyQualifiedName(), segmentList); + // if lastLedgerRollingTimeMillis is not updated, we set it to now. + synchronized (BKLogWriteHandler.this) { + if (lastLedgerRollingTimeMillis < 0) { + lastLedgerRollingTimeMillis = Utils.nowInMillis(); + } + } + + if (validateLogSegmentSequenceNumber) { + synchronized (inprogressLSSNs) { + for (LogSegmentMetadata segment : segmentList) { + if (segment.isInProgress()) { + inprogressLSSNs.addLast(segment.getLogSegmentSequenceNumber()); + } + } + } + } + + return FutureUtils.processList(segmentList, recoverLogSegmentFunction, scheduler).map( + GetLastTxIdFunction.INSTANCE); + } + }; + + // Stats + private final StatsLogger perLogStatsLogger; + private final OpStatsLogger closeOpStats; + private final OpStatsLogger openOpStats; + private final OpStatsLogger recoverOpStats; + private final OpStatsLogger deleteOpStats; + + /** + * Construct a Bookkeeper journal manager. + */ + BKLogWriteHandler(LogMetadataForWriter logMetadata, + DistributedLogConfiguration conf, + LogStreamMetadataStore streamMetadataStore, + LogSegmentMetadataCache metadataCache, + LogSegmentEntryStore entryStore, + OrderedScheduler scheduler, + Allocator<LogSegmentEntryWriter, Object> segmentAllocator, + StatsLogger statsLogger, + StatsLogger perLogStatsLogger, + AlertStatsLogger alertStatsLogger, + String clientId, + int regionId, + PermitLimiter writeLimiter, + FeatureProvider featureProvider, + DynamicDistributedLogConfiguration dynConf, + DistributedLock lock /** owned by handler **/) { + super(logMetadata, + conf, + streamMetadataStore, + metadataCache, + entryStore, + scheduler, + statsLogger, + alertStatsLogger, + clientId); + this.logMetadataForWriter = logMetadata; + this.logSegmentAllocator = segmentAllocator; + this.perLogStatsLogger = perLogStatsLogger; + this.writeLimiter = writeLimiter; + this.featureProvider = featureProvider; + this.dynConf = dynConf; + this.lock = lock; + this.metadataUpdater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, metadataStore); + + if (conf.getEncodeRegionIDInLogSegmentMetadata()) { + this.regionId = regionId; + } else { + this.regionId = DistributedLogConstants.LOCAL_REGION_ID; + } + this.validateLogSegmentSequenceNumber = conf.isLogSegmentSequenceNumberValidationEnabled(); + + // Construct the max sequence no + maxLogSegmentSequenceNo = new MaxLogSegmentSequenceNo(logMetadata.getMaxLSSNData()); + inprogressLSSNs = new LinkedList<Long>(); + // Construct the max txn id. + maxTxId = new MaxTxId(logMetadata.getMaxTxIdData()); + + // Schedule fetching log segment list in background before we access it. + // We don't need to watch the log segment list changes for writer, as it manages log segment list. + fetchForWrite = readLogSegmentsFromStore( + LogSegmentMetadata.COMPARATOR, + WRITE_HANDLE_FILTER, + null); + + // Initialize other parameters. + setLastLedgerRollingTimeMillis(Utils.nowInMillis()); + + // Rolling Policy + if (conf.getLogSegmentRollingIntervalMinutes() > 0) { + rollingPolicy = new TimeBasedRollingPolicy(conf.getLogSegmentRollingIntervalMinutes() * 60 * 1000L); + } else { + rollingPolicy = new SizeBasedRollingPolicy(conf.getMaxLogSegmentBytes()); + } + + // Stats + StatsLogger segmentsStatsLogger = statsLogger.scope("segments"); + openOpStats = segmentsStatsLogger.getOpStatsLogger("open"); + closeOpStats = segmentsStatsLogger.getOpStatsLogger("close"); + recoverOpStats = segmentsStatsLogger.getOpStatsLogger("recover"); + deleteOpStats = segmentsStatsLogger.getOpStatsLogger("delete"); + } + + private Future<List<LogSegmentMetadata>> getCachedLogSegmentsAfterFirstFetch( + final Comparator<LogSegmentMetadata> comparator) { + final Promise<List<LogSegmentMetadata>> promise = new Promise<List<LogSegmentMetadata>>(); + fetchForWrite.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { + @Override + public void onFailure(Throwable cause) { + FutureUtils.setException(promise, cause); + } + + @Override + public void onSuccess(Versioned<List<LogSegmentMetadata>> result) { + try { + FutureUtils.setValue(promise, getCachedLogSegments(comparator)); + } catch (UnexpectedException e) { + FutureUtils.setException(promise, e); + } + } + }); + return promise; + } + + private Future<List<LogSegmentMetadata>> getCachedLogSegmentsAfterFirstFullFetch( + final Comparator<LogSegmentMetadata> comparator) { + Future<Versioned<List<LogSegmentMetadata>>> result; + synchronized (this) { + if (null == fetchForTruncation) { + fetchForTruncation = readLogSegmentsFromStore( + LogSegmentMetadata.COMPARATOR, + LogSegmentFilter.DEFAULT_FILTER, + null); + } + result = fetchForTruncation; + } + + final Promise<List<LogSegmentMetadata>> promise = new Promise<List<LogSegmentMetadata>>(); + result.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { + @Override + public void onFailure(Throwable cause) { + FutureUtils.setException(promise, cause); + } + + @Override + public void onSuccess(Versioned<List<LogSegmentMetadata>> result) { + try { + FutureUtils.setValue(promise, getCachedLogSegments(comparator)); + } catch (UnexpectedException e) { + FutureUtils.setException(promise, e); + } + } + }); + return promise; + } + + // Transactional operations for MaxLogSegmentSequenceNo + void storeMaxSequenceNumber(final Transaction<Object> txn, + final MaxLogSegmentSequenceNo maxSeqNo, + final long seqNo, + final boolean isInprogress) { + metadataStore.storeMaxLogSegmentSequenceNumber(txn, logMetadata, maxSeqNo.getVersionedData(seqNo), + new Transaction.OpListener<Version>() { + @Override + public void onCommit(Version version) { + if (validateLogSegmentSequenceNumber) { + synchronized (inprogressLSSNs) { + if (isInprogress) { + inprogressLSSNs.add(seqNo); + } else { + inprogressLSSNs.removeFirst(); + } + } + } + maxSeqNo.update(version, seqNo); + } + + @Override + public void onAbort(Throwable t) { + // no-op + } + }); + } + + // Transactional operations for MaxTxId + void storeMaxTxId(final Transaction<Object> txn, + final MaxTxId maxTxId, + final long txId) { + metadataStore.storeMaxTxnId(txn, logMetadataForWriter, maxTxId.getVersionedData(txId), + new Transaction.OpListener<Version>() { + @Override + public void onCommit(Version version) { + maxTxId.update(version, txId); + } + + @Override + public void onAbort(Throwable t) { + // no-op + } + }); + } + + // Transactional operations for logsegment + void writeLogSegment(final Transaction<Object> txn, + final LogSegmentMetadata metadata) { + metadataStore.createLogSegment(txn, metadata, new Transaction.OpListener<Void>() { + @Override + public void onCommit(Void r) { + addLogSegmentToCache(metadata.getSegmentName(), metadata); + } + + @Override + public void onAbort(Throwable t) { + // no-op + } + }); + } + + void deleteLogSegment(final Transaction<Object> txn, + final LogSegmentMetadata metadata) { + metadataStore.deleteLogSegment(txn, metadata, new Transaction.OpListener<Void>() { + @Override + public void onCommit(Void r) { + removeLogSegmentFromCache(metadata.getSegmentName()); + } + + @Override + public void onAbort(Throwable t) { + // no-op + } + }); + } + + /** + * The caller could call this before any actions, which to hold the lock for + * the write handler of its whole lifecycle. The lock will only be released + * when closing the write handler. + * + * This method is useful to prevent releasing underlying zookeeper lock during + * recovering/completing log segments. Releasing underlying zookeeper lock means + * 1) increase latency when re-lock on starting new log segment. 2) increase the + * possibility of a stream being re-acquired by other instances. + * + * @return future represents the lock result + */ + Future<? extends DistributedLock> lockHandler() { + if (null != lockFuture) { + return lockFuture; + } + lockFuture = lock.asyncAcquire(); + return lockFuture; + } + + Future<Void> unlockHandler() { + if (null != lockFuture) { + return lock.asyncClose(); + } else { + return Future.Void(); + } + } + + /** + * Start a new log segment in a BookKeeper ledger. + * First ensure that we have the write lock for this journal. + * Then create a ledger and stream based on that ledger. + * The ledger id is written to the inprogress znode, so that in the + * case of a crash, a recovery process can find the ledger we were writing + * to when we crashed. + * + * @param txId First transaction id to be written to the stream + * @return + * @throws IOException + */ + public BKLogSegmentWriter startLogSegment(long txId) throws IOException { + return startLogSegment(txId, false, false); + } + + /** + * Start a new log segment in a BookKeeper ledger. + * First ensure that we have the write lock for this journal. + * Then create a ledger and stream based on that ledger. + * The ledger id is written to the inprogress znode, so that in the + * case of a crash, a recovery process can find the ledger we were writing + * to when we crashed. + * + * @param txId First transaction id to be written to the stream + * @param bestEffort + * @param allowMaxTxID + * allow using max tx id to start log segment + * @return + * @throws IOException + */ + public BKLogSegmentWriter startLogSegment(long txId, boolean bestEffort, boolean allowMaxTxID) + throws IOException { + Stopwatch stopwatch = Stopwatch.createStarted(); + boolean success = false; + try { + BKLogSegmentWriter writer = doStartLogSegment(txId, bestEffort, allowMaxTxID); + success = true; + return writer; + } finally { + if (success) { + openOpStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); + } else { + openOpStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); + } + } + } + + protected long assignLogSegmentSequenceNumber() throws IOException { + // For any active stream we will always make sure that there is at least one + // active ledger (except when the stream first starts out). Therefore when we + // see no ledger metadata for a stream, we assume that this is the first ledger + // in the stream + long logSegmentSeqNo = DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO; + boolean logSegmentsFound = false; + + if (LogSegmentMetadata.supportsLogSegmentSequenceNo(conf.getDLLedgerMetadataLayoutVersion())) { + List<LogSegmentMetadata> ledgerListDesc = getCachedLogSegments(LogSegmentMetadata.DESC_COMPARATOR); + Long nextLogSegmentSeqNo = DLUtils.nextLogSegmentSequenceNumber(ledgerListDesc); + + if (null == nextLogSegmentSeqNo) { + logSegmentsFound = false; + // we don't find last assigned log segment sequence number + // then we start the log segment with configured FirstLogSegmentSequenceNumber. + logSegmentSeqNo = conf.getFirstLogSegmentSequenceNumber(); + } else { + logSegmentsFound = true; + // latest log segment is assigned with a sequence number, start with next sequence number + logSegmentSeqNo = nextLogSegmentSeqNo; + } + } + + // We only skip log segment sequence number validation only when no log segments found & + // the maximum log segment sequence number is "UNASSIGNED". + if (!logSegmentsFound && + (DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO == maxLogSegmentSequenceNo.getSequenceNumber())) { + // no ledger seqno stored in /ledgers before + LOG.info("No max ledger sequence number found while creating log segment {} for {}.", + logSegmentSeqNo, getFullyQualifiedName()); + } else if (maxLogSegmentSequenceNo.getSequenceNumber() + 1 != logSegmentSeqNo) { + LOG.warn("Unexpected max log segment sequence number {} for {} : list of cached segments = {}", + new Object[]{maxLogSegmentSequenceNo.getSequenceNumber(), getFullyQualifiedName(), + getCachedLogSegments(LogSegmentMetadata.DESC_COMPARATOR)}); + // there is max log segment number recorded there and it isn't match. throw exception. + throw new DLIllegalStateException("Unexpected max log segment sequence number " + + maxLogSegmentSequenceNo.getSequenceNumber() + " for " + getFullyQualifiedName() + + ", expected " + (logSegmentSeqNo - 1)); + } + + return logSegmentSeqNo; + } + + protected BKLogSegmentWriter doStartLogSegment(long txId, boolean bestEffort, boolean allowMaxTxID) throws IOException { + return FutureUtils.result(asyncStartLogSegment(txId, bestEffort, allowMaxTxID)); + } + + protected Future<BKLogSegmentWriter> asyncStartLogSegment(final long txId, + final boolean bestEffort, + final boolean allowMaxTxID) { + final Promise<BKLogSegmentWriter> promise = new Promise<BKLogSegmentWriter>(); + try { + lock.checkOwnershipAndReacquire(); + } catch (LockingException e) { + FutureUtils.setException(promise, e); + return promise; + } + fetchForWrite.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { + @Override + public void onFailure(Throwable cause) { + FutureUtils.setException(promise, cause); + } + + @Override + public void onSuccess(Versioned<List<LogSegmentMetadata>> list) { + doStartLogSegment(txId, bestEffort, allowMaxTxID, promise); + } + }); + return promise; + } + + protected void doStartLogSegment(final long txId, + final boolean bestEffort, + final boolean allowMaxTxID, + final Promise<BKLogSegmentWriter> promise) { + // validate the tx id + if ((txId < 0) || + (!allowMaxTxID && (txId == DistributedLogConstants.MAX_TXID))) { + FutureUtils.setException(promise, new IOException("Invalid Transaction Id " + txId)); + return; + } + + long highestTxIdWritten = maxTxId.get(); + if (txId < highestTxIdWritten) { + if (highestTxIdWritten == DistributedLogConstants.MAX_TXID) { + LOG.error("We've already marked the stream as ended and attempting to start a new log segment"); + FutureUtils.setException(promise, new EndOfStreamException("Writing to a stream after it has been marked as completed")); + return; + } else { + LOG.error("We've already seen TxId {} the max TXId is {}", txId, highestTxIdWritten); + FutureUtils.setException(promise, new TransactionIdOutOfOrderException(txId, highestTxIdWritten)); + return; + } + } + + try { + logSegmentAllocator.allocate(); + } catch (IOException e) { + // failed to issue an allocation request + failStartLogSegment(promise, bestEffort, e); + return; + } + + // start the transaction from zookeeper + final Transaction<Object> txn = streamMetadataStore.newTransaction(); + + // failpoint injected before creating ledger + try { + FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_StartLogSegmentBeforeLedgerCreate); + } catch (IOException ioe) { + failStartLogSegment(promise, bestEffort, ioe); + return; + } + + logSegmentAllocator.tryObtain(txn, NULL_OP_LISTENER) + .addEventListener(new FutureEventListener<LogSegmentEntryWriter>() { + + @Override + public void onSuccess(LogSegmentEntryWriter entryWriter) { + // try-obtain succeed + createInprogressLogSegment( + txn, + txId, + entryWriter, + bestEffort, + promise); + } + + @Override + public void onFailure(Throwable cause) { + failStartLogSegment(promise, bestEffort, cause); + } + }); + } + + private void failStartLogSegment(Promise<BKLogSegmentWriter> promise, + boolean bestEffort, + Throwable cause) { + if (bestEffort) { + FutureUtils.setValue(promise, null); + } else { + FutureUtils.setException(promise, cause); + } + } + + // once the ledger handle is obtained from allocator, this function should guarantee + // either the transaction is executed or aborted. Otherwise, the ledger handle will + // just leak from the allocation pool - hence cause "No Ledger Allocator" + private void createInprogressLogSegment(Transaction<Object> txn, + final long txId, + final LogSegmentEntryWriter entryWriter, + boolean bestEffort, + final Promise<BKLogSegmentWriter> promise) { + final long logSegmentSeqNo; + try { + FailpointUtils.checkFailPoint( + FailpointUtils.FailPointName.FP_StartLogSegmentOnAssignLogSegmentSequenceNumber); + logSegmentSeqNo = assignLogSegmentSequenceNumber(); + } catch (IOException e) { + // abort the current prepared transaction + txn.abort(e); + failStartLogSegment(promise, bestEffort, e); + return; + } + + final String inprogressZnodePath = inprogressZNode( + entryWriter.getLogSegmentId(), txId, logSegmentSeqNo); + final LogSegmentMetadata l = + new LogSegmentMetadata.LogSegmentMetadataBuilder(inprogressZnodePath, + conf.getDLLedgerMetadataLayoutVersion(), entryWriter.getLogSegmentId(), txId) + .setLogSegmentSequenceNo(logSegmentSeqNo) + .setRegionId(regionId) + .setEnvelopeEntries( + LogSegmentMetadata.supportsEnvelopedEntries(conf.getDLLedgerMetadataLayoutVersion())) + .build(); + + // Create an inprogress segment + writeLogSegment(txn, l); + + // Try storing max sequence number. + LOG.debug("Try storing max sequence number in startLogSegment {} : {}", inprogressZnodePath, logSegmentSeqNo); + storeMaxSequenceNumber(txn, maxLogSegmentSequenceNo, logSegmentSeqNo, true); + + // Try storing max tx id. + LOG.debug("Try storing MaxTxId in startLogSegment {} {}", inprogressZnodePath, txId); + storeMaxTxId(txn, maxTxId, txId); + + txn.execute().addEventListener(FutureEventListenerRunnable.of(new FutureEventListener<Void>() { + + @Override + public void onSuccess(Void value) { + try { + FutureUtils.setValue(promise, new BKLogSegmentWriter( + getFullyQualifiedName(), + l.getSegmentName(), + conf, + conf.getDLLedgerMetadataLayoutVersion(), + entryWriter, + lock, + txId, + logSegmentSeqNo, + scheduler, + statsLogger, + perLogStatsLogger, + alertStatsLogger, + writeLimiter, + featureProvider, + dynConf)); + } catch (IOException ioe) { + failStartLogSegment(promise, false, ioe); + } + } + + @Override + public void onFailure(Throwable cause) { + failStartLogSegment(promise, false, cause); + } + }, scheduler)); + } + + boolean shouldStartNewSegment(BKLogSegmentWriter writer) { + return rollingPolicy.shouldRollover(writer, lastLedgerRollingTimeMillis); + } + + /** + * Finalize a log segment. If the journal manager is currently + * writing to a ledger, ensure that this is the ledger of the log segment + * being finalized. + * <p/> + * Otherwise this is the recovery case. In the recovery case, ensure that + * the firstTxId of the ledger matches firstTxId for the segment we are + * trying to finalize. + */ + Future<LogSegmentMetadata> completeAndCloseLogSegment(final BKLogSegmentWriter writer) { + final Promise<LogSegmentMetadata> promise = new Promise<LogSegmentMetadata>(); + completeAndCloseLogSegment(writer, promise); + return promise; + } + + private void completeAndCloseLogSegment(final BKLogSegmentWriter writer, + final Promise<LogSegmentMetadata> promise) { + writer.asyncClose().addEventListener(new FutureEventListener<Void>() { + @Override + public void onSuccess(Void value) { + // in theory closeToFinalize should throw exception if a stream is in error. + // just in case, add another checking here to make sure we don't close log segment is a stream is in error. + if (writer.shouldFailCompleteLogSegment()) { + FutureUtils.setException(promise, + new IOException("LogSegmentWriter for " + writer.getFullyQualifiedLogSegment() + " is already in error.")); + return; + } + doCompleteAndCloseLogSegment( + inprogressZNodeName(writer.getLogSegmentId(), writer.getStartTxId(), writer.getLogSegmentSequenceNumber()), + writer.getLogSegmentSequenceNumber(), + writer.getLogSegmentId(), + writer.getStartTxId(), + writer.getLastTxId(), + writer.getPositionWithinLogSegment(), + writer.getLastDLSN().getEntryId(), + writer.getLastDLSN().getSlotId(), + promise); + } + + @Override + public void onFailure(Throwable cause) { + FutureUtils.setException(promise, cause); + } + }); + } + + @VisibleForTesting + LogSegmentMetadata completeAndCloseLogSegment(long logSegmentSeqNo, + long logSegmentId, + long firstTxId, + long lastTxId, + int recordCount) + throws IOException { + return completeAndCloseLogSegment(inprogressZNodeName(logSegmentId, firstTxId, logSegmentSeqNo), logSegmentSeqNo, + logSegmentId, firstTxId, lastTxId, recordCount, -1, -1); + } + + /** + * Finalize a log segment. If the journal manager is currently + * writing to a ledger, ensure that this is the ledger of the log segment + * being finalized. + * <p/> + * Otherwise this is the recovery case. In the recovery case, ensure that + * the firstTxId of the ledger matches firstTxId for the segment we are + * trying to finalize. + */ + LogSegmentMetadata completeAndCloseLogSegment(String inprogressZnodeName, long logSegmentSeqNo, + long logSegmentId, long firstTxId, long lastTxId, + int recordCount, long lastEntryId, long lastSlotId) + throws IOException { + Stopwatch stopwatch = Stopwatch.createStarted(); + boolean success = false; + try { + LogSegmentMetadata completedLogSegment = + doCompleteAndCloseLogSegment(inprogressZnodeName, logSegmentSeqNo, + logSegmentId, firstTxId, lastTxId, recordCount, + lastEntryId, lastSlotId); + success = true; + return completedLogSegment; + } finally { + if (success) { + closeOpStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); + } else { + closeOpStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); + } + } + } + + protected long computeStartSequenceId(LogSegmentMetadata segment) throws IOException { + if (!segment.isInProgress()) { + return segment.getStartSequenceId(); + } + + long startSequenceId = DistributedLogConstants.UNASSIGNED_SEQUENCE_ID; + + // we only record sequence id when both write version and logsegment's version support sequence id + if (LogSegmentMetadata.supportsSequenceId(conf.getDLLedgerMetadataLayoutVersion()) + && segment.supportsSequenceId()) { + List<LogSegmentMetadata> logSegmentDescList = + getCachedLogSegments(LogSegmentMetadata.DESC_COMPARATOR); + startSequenceId = DLUtils.computeStartSequenceId(logSegmentDescList, segment); + } + + return startSequenceId; + } + + /** + * Close log segment + * + * @param inprogressZnodeName + * @param logSegmentSeqNo + * @param logSegmentId + * @param firstTxId + * @param lastTxId + * @param recordCount + * @param lastEntryId + * @param lastSlotId + * @throws IOException + */ + protected LogSegmentMetadata doCompleteAndCloseLogSegment( + String inprogressZnodeName, + long logSegmentSeqNo, + long logSegmentId, + long firstTxId, + long lastTxId, + int recordCount, + long lastEntryId, + long lastSlotId) throws IOException { + Promise<LogSegmentMetadata> promise = new Promise<LogSegmentMetadata>(); + doCompleteAndCloseLogSegment( + inprogressZnodeName, + logSegmentSeqNo, + logSegmentId, + firstTxId, + lastTxId, + recordCount, + lastEntryId, + lastSlotId, + promise); + return FutureUtils.result(promise); + } + + protected void doCompleteAndCloseLogSegment(final String inprogressZnodeName, + final long logSegmentSeqNo, + final long logSegmentId, + final long firstTxId, + final long lastTxId, + final int recordCount, + final long lastEntryId, + final long lastSlotId, + final Promise<LogSegmentMetadata> promise) { + fetchForWrite.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { + @Override + public void onFailure(Throwable cause) { + FutureUtils.setException(promise, cause); + } + + @Override + public void onSuccess(Versioned<List<LogSegmentMetadata>> segments) { + doCompleteAndCloseLogSegmentAfterLogSegmentListFetched( + inprogressZnodeName, + logSegmentSeqNo, + logSegmentId, + firstTxId, + lastTxId, + recordCount, + lastEntryId, + lastSlotId, + promise); + } + }); + } + + private void doCompleteAndCloseLogSegmentAfterLogSegmentListFetched( + final String inprogressZnodeName, + long logSegmentSeqNo, + long logSegmentId, + long firstTxId, + long lastTxId, + int recordCount, + long lastEntryId, + long lastSlotId, + final Promise<LogSegmentMetadata> promise) { + try { + lock.checkOwnershipAndReacquire(); + } catch (IOException ioe) { + FutureUtils.setException(promise, ioe); + return; + } + + LOG.debug("Completing and Closing Log Segment {} {}", firstTxId, lastTxId); + LogSegmentMetadata inprogressLogSegment = readLogSegmentFromCache(inprogressZnodeName); + + // validate log segment + if (inprogressLogSegment.getLogSegmentId() != logSegmentId) { + FutureUtils.setException(promise, new IOException( + "Active ledger has different ID to inprogress. " + + inprogressLogSegment.getLogSegmentId() + " found, " + + logSegmentId + " expected")); + return; + } + // validate the transaction id + if (inprogressLogSegment.getFirstTxId() != firstTxId) { + FutureUtils.setException(promise, new IOException("Transaction id not as expected, " + + inprogressLogSegment.getFirstTxId() + " found, " + firstTxId + " expected")); + return; + } + // validate the log sequence number + if (validateLogSegmentSequenceNumber) { + synchronized (inprogressLSSNs) { + if (inprogressLSSNs.isEmpty()) { + FutureUtils.setException(promise, new UnexpectedException( + "Didn't find matched inprogress log segments when completing inprogress " + + inprogressLogSegment)); + return; + } + long leastInprogressLSSN = inprogressLSSNs.getFirst(); + // the log segment sequence number in metadata {@link inprogressLogSegment.getLogSegmentSequenceNumber()} + // should be same as the sequence number we are completing (logSegmentSeqNo) + // and + // it should also be same as the least inprogress log segment sequence number tracked in {@link inprogressLSSNs} + if ((inprogressLogSegment.getLogSegmentSequenceNumber() != logSegmentSeqNo) || + (leastInprogressLSSN != logSegmentSeqNo)) { + FutureUtils.setException(promise, new UnexpectedException( + "Didn't find matched inprogress log segments when completing inprogress " + + inprogressLogSegment)); + return; + } + } + } + + // store max sequence number. + long maxSeqNo= Math.max(logSegmentSeqNo, maxLogSegmentSequenceNo.getSequenceNumber()); + if (maxLogSegmentSequenceNo.getSequenceNumber() == logSegmentSeqNo || + (maxLogSegmentSequenceNo.getSequenceNumber() == logSegmentSeqNo + 1)) { + // ignore the case that a new inprogress log segment is pre-allocated + // before completing current inprogress one + LOG.info("Try storing max sequence number {} in completing {}.", + new Object[] { logSegmentSeqNo, inprogressLogSegment.getZkPath() }); + } else { + LOG.warn("Unexpected max ledger sequence number {} found while completing log segment {} for {}", + new Object[] { maxLogSegmentSequenceNo.getSequenceNumber(), logSegmentSeqNo, getFullyQualifiedName() }); + if (validateLogSegmentSequenceNumber) { + FutureUtils.setException(promise, new DLIllegalStateException("Unexpected max log segment sequence number " + + maxLogSegmentSequenceNo.getSequenceNumber() + " for " + getFullyQualifiedName() + + ", expected " + (logSegmentSeqNo - 1))); + return; + } + } + + // Prepare the completion + final String pathForCompletedLedger = completedLedgerZNode(firstTxId, lastTxId, logSegmentSeqNo); + long startSequenceId; + try { + startSequenceId = computeStartSequenceId(inprogressLogSegment); + } catch (IOException ioe) { + FutureUtils.setException(promise, ioe); + return; + } + // write completed ledger znode + final LogSegmentMetadata completedLogSegment = + inprogressLogSegment.completeLogSegment( + pathForCompletedLedger, + lastTxId, + recordCount, + lastEntryId, + lastSlotId, + startSequenceId); + setLastLedgerRollingTimeMillis(completedLogSegment.getCompletionTime()); + + // prepare the transaction + Transaction<Object> txn = streamMetadataStore.newTransaction(); + + // create completed log segment + writeLogSegment(txn, completedLogSegment); + // delete inprogress log segment + deleteLogSegment(txn, inprogressLogSegment); + // store max sequence number + storeMaxSequenceNumber(txn, maxLogSegmentSequenceNo, maxSeqNo, false); + // update max txn id. + LOG.debug("Trying storing LastTxId in Finalize Path {} LastTxId {}", pathForCompletedLedger, lastTxId); + storeMaxTxId(txn, maxTxId, lastTxId); + + txn.execute().addEventListener(FutureEventListenerRunnable.of(new FutureEventListener<Void>() { + @Override + public void onSuccess(Void value) { + LOG.info("Completed {} to {} for {} : {}", + new Object[] { inprogressZnodeName, completedLogSegment.getSegmentName(), + getFullyQualifiedName(), completedLogSegment }); + FutureUtils.setValue(promise, completedLogSegment); + } + + @Override + public void onFailure(Throwable cause) { + FutureUtils.setException(promise, cause); + } + }, scheduler)); + } + + public Future<Long> recoverIncompleteLogSegments() { + try { + FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_RecoverIncompleteLogSegments); + } catch (IOException ioe) { + return Future.exception(ioe); + } + return getCachedLogSegmentsAfterFirstFetch(LogSegmentMetadata.COMPARATOR).flatMap(recoverLogSegmentsFunction); + } + + class RecoverLogSegmentFunction extends Function<LogSegmentMetadata, Future<LogSegmentMetadata>> { + + @Override + public Future<LogSegmentMetadata> apply(final LogSegmentMetadata l) { + if (!l.isInProgress()) { + return Future.value(l); + } + + LOG.info("Recovering last record in log segment {} for {}.", l, getFullyQualifiedName()); + return asyncReadLastRecord(l, true, true, true).flatMap( + new AbstractFunction1<LogRecordWithDLSN, Future<LogSegmentMetadata>>() { + @Override + public Future<LogSegmentMetadata> apply(LogRecordWithDLSN lastRecord) { + return completeLogSegment(l, lastRecord); + } + }); + } + + private Future<LogSegmentMetadata> completeLogSegment(LogSegmentMetadata l, + LogRecordWithDLSN lastRecord) { + LOG.info("Recovered last record in log segment {} for {}.", l, getFullyQualifiedName()); + + long endTxId = DistributedLogConstants.EMPTY_LOGSEGMENT_TX_ID; + int recordCount = 0; + long lastEntryId = -1; + long lastSlotId = -1; + + if (null != lastRecord) { + endTxId = lastRecord.getTransactionId(); + recordCount = lastRecord.getLastPositionWithinLogSegment(); + lastEntryId = lastRecord.getDlsn().getEntryId(); + lastSlotId = lastRecord.getDlsn().getSlotId(); + } + + if (endTxId == DistributedLogConstants.INVALID_TXID) { + LOG.error("Unrecoverable corruption has occurred in segment " + + l.toString() + " at path " + l.getZkPath() + + ". Unable to continue recovery."); + return Future.exception(new IOException("Unrecoverable corruption," + + " please check logs.")); + } else if (endTxId == DistributedLogConstants.EMPTY_LOGSEGMENT_TX_ID) { + // TODO: Empty ledger - Ideally we should just remove it? + endTxId = l.getFirstTxId(); + } + + Promise<LogSegmentMetadata> promise = new Promise<LogSegmentMetadata>(); + doCompleteAndCloseLogSegment( + l.getZNodeName(), + l.getLogSegmentSequenceNumber(), + l.getLogSegmentId(), + l.getFirstTxId(), + endTxId, + recordCount, + lastEntryId, + lastSlotId, + promise); + return promise; + } + + } + + Future<List<LogSegmentMetadata>> setLogSegmentsOlderThanDLSNTruncated(final DLSN dlsn) { + if (DLSN.InvalidDLSN == dlsn) { + List<LogSegmentMetadata> emptyList = new ArrayList<LogSegmentMetadata>(0); + return Future.value(emptyList); + } + return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).flatMap( + new AbstractFunction1<List<LogSegmentMetadata>, Future<List<LogSegmentMetadata>>>() { + @Override + public Future<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) { + return setLogSegmentsOlderThanDLSNTruncated(logSegments, dlsn); + } + }); + } + + private Future<List<LogSegmentMetadata>> setLogSegmentsOlderThanDLSNTruncated(List<LogSegmentMetadata> logSegments, + final DLSN dlsn) { + LOG.debug("Setting truncation status on logs older than {} from {} for {}", + new Object[]{dlsn, logSegments, getFullyQualifiedName()}); + List<LogSegmentMetadata> truncateList = new ArrayList<LogSegmentMetadata>(logSegments.size()); + LogSegmentMetadata partialTruncate = null; + LOG.info("{}: Truncating log segments older than {}", getFullyQualifiedName(), dlsn); + for (int i = 0; i < logSegments.size(); i++) { + LogSegmentMetadata l = logSegments.get(i); + if (!l.isInProgress()) { + if (l.getLastDLSN().compareTo(dlsn) < 0) { + LOG.debug("{}: Truncating log segment {} ", getFullyQualifiedName(), l); + truncateList.add(l); + } else if (l.getFirstDLSN().compareTo(dlsn) < 0) { + // Can be satisfied by at most one segment + if (null != partialTruncate) { + String logMsg = String.format("Potential metadata inconsistency for stream %s at segment %s", getFullyQualifiedName(), l); + LOG.error(logMsg); + return Future.exception(new DLIllegalStateException(logMsg)); + } + LOG.info("{}: Partially truncating log segment {} older than {}.", new Object[] {getFullyQualifiedName(), l, dlsn}); + partialTruncate = l; + } else { + break; + } + } else { + break; + } + } + return setLogSegmentTruncationStatus(truncateList, partialTruncate, dlsn); + } + + private int getNumCandidateLogSegmentsToPurge(List<LogSegmentMetadata> logSegments) { + if (logSegments.isEmpty()) { + return 0; + } else { + // we have to keep at least one completed log segment for sequence id + int numCandidateLogSegments = 0; + for (LogSegmentMetadata segment : logSegments) { + if (segment.isInProgress()) { + break; + } else { + ++numCandidateLogSegments; + } + } + + return numCandidateLogSegments - 1; + } + } + + Future<List<LogSegmentMetadata>> purgeLogSegmentsOlderThanTimestamp(final long minTimestampToKeep) { + if (minTimestampToKeep >= Utils.nowInMillis()) { + return Future.exception(new IllegalArgumentException( + "Invalid timestamp " + minTimestampToKeep + " to purge logs for " + getFullyQualifiedName())); + } + return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).flatMap( + new Function<List<LogSegmentMetadata>, Future<List<LogSegmentMetadata>>>() { + @Override + public Future<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) { + List<LogSegmentMetadata> purgeList = new ArrayList<LogSegmentMetadata>(logSegments.size()); + + int numCandidates = getNumCandidateLogSegmentsToPurge(logSegments); + + for (int iterator = 0; iterator < numCandidates; iterator++) { + LogSegmentMetadata l = logSegments.get(iterator); + // When application explicitly truncates segments; timestamp based purge is + // only used to cleanup log segments that have been marked for truncation + if ((l.isTruncated() || !conf.getExplicitTruncationByApplication()) && + !l.isInProgress() && (l.getCompletionTime() < minTimestampToKeep)) { + purgeList.add(l); + } else { + // stop truncating log segments if we find either an inprogress or a partially + // truncated log segment + break; + } + } + LOG.info("Deleting log segments older than {} for {} : {}", + new Object[] { minTimestampToKeep, getFullyQualifiedName(), purgeList }); + return deleteLogSegments(purgeList); + } + }); + } + + Future<List<LogSegmentMetadata>> purgeLogSegmentsOlderThanTxnId(final long minTxIdToKeep) { + return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).flatMap( + new AbstractFunction1<List<LogSegmentMetadata>, Future<List<LogSegmentMetadata>>>() { + @Override + public Future<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) { + int numLogSegmentsToProcess; + + if (minTxIdToKeep < 0) { + // we are deleting the log, we can remove whole log segments + numLogSegmentsToProcess = logSegments.size(); + } else { + numLogSegmentsToProcess = getNumCandidateLogSegmentsToPurge(logSegments); + } + List<LogSegmentMetadata> purgeList = Lists.newArrayListWithExpectedSize(numLogSegmentsToProcess); + for (int iterator = 0; iterator < numLogSegmentsToProcess; iterator++) { + LogSegmentMetadata l = logSegments.get(iterator); + if ((minTxIdToKeep < 0) || + ((l.isTruncated() || !conf.getExplicitTruncationByApplication()) && + !l.isInProgress() && (l.getLastTxId() < minTxIdToKeep))) { + purgeList.add(l); + } else { + // stop truncating log segments if we find either an inprogress or a partially + // truncated log segment + break; + } + } + return deleteLogSegments(purgeList); + } + }); + } + + private Future<List<LogSegmentMetadata>> setLogSegmentTruncationStatus( + final List<LogSegmentMetadata> truncateList, + LogSegmentMetadata partialTruncate, + DLSN minActiveDLSN) { + final List<LogSegmentMetadata> listToTruncate = Lists.newArrayListWithCapacity(truncateList.size() + 1); + final List<LogSegmentMetadata> listAfterTruncated = Lists.newArrayListWithCapacity(truncateList.size() + 1); + Transaction<Object> updateTxn = metadataUpdater.transaction(); + for(LogSegmentMetadata l : truncateList) { + if (!l.isTruncated()) { + LogSegmentMetadata newSegment = metadataUpdater.setLogSegmentTruncated(updateTxn, l); + listToTruncate.add(l); + listAfterTruncated.add(newSegment); + } + } + + if (null != partialTruncate && (partialTruncate.isNonTruncated() || + (partialTruncate.isPartiallyTruncated() && (partialTruncate.getMinActiveDLSN().compareTo(minActiveDLSN) < 0)))) { + LogSegmentMetadata newSegment = metadataUpdater.setLogSegmentPartiallyTruncated( + updateTxn, partialTruncate, minActiveDLSN); + listToTruncate.add(partialTruncate); + listAfterTruncated.add(newSegment); + } + + return updateTxn.execute().map(new AbstractFunction1<Void, List<LogSegmentMetadata>>() { + @Override + public List<LogSegmentMetadata> apply(Void value) { + for (int i = 0; i < listToTruncate.size(); i++) { + removeLogSegmentFromCache(listToTruncate.get(i).getSegmentName()); + LogSegmentMetadata newSegment = listAfterTruncated.get(i); + addLogSegmentToCache(newSegment.getSegmentName(), newSegment); + } + return listAfterTruncated; + } + }); + } + + private Future<List<LogSegmentMetadata>> deleteLogSegments( + final List<LogSegmentMetadata> logs) { + if (LOG.isTraceEnabled()) { + LOG.trace("Purging logs for {} : {}", getFullyQualifiedName(), logs); + } + return FutureUtils.processList(logs, + new Function<LogSegmentMetadata, Future<LogSegmentMetadata>>() { + @Override + public Future<LogSegmentMetadata> apply(LogSegmentMetadata segment) { + return deleteLogSegment(segment); + } + }, scheduler); + } + + private Future<LogSegmentMetadata> deleteLogSegment( + final LogSegmentMetadata ledgerMetadata) { + LOG.info("Deleting ledger {} for {}", ledgerMetadata, getFullyQualifiedName()); + final Promise<LogSegmentMetadata> promise = new Promise<LogSegmentMetadata>(); + final Stopwatch stopwatch = Stopwatch.createStarted(); + promise.addEventListener(new FutureEventListener<LogSegmentMetadata>() { + @Override + public void onSuccess(LogSegmentMetadata segment) { + deleteOpStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); + } + + @Override + public void onFailure(Throwable cause) { + deleteOpStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); + } + }); + entryStore.deleteLogSegment(ledgerMetadata) + .addEventListener(new FutureEventListener<LogSegmentMetadata>() { + @Override + public void onFailure(Throwable cause) { + FutureUtils.setException(promise, cause); + } + + @Override + public void onSuccess(LogSegmentMetadata segment) { + deleteLogSegmentMetadata(segment, promise); + } + }); + return promise; + } + + private void deleteLogSegmentMetadata(final LogSegmentMetadata segmentMetadata, + final Promise<LogSegmentMetadata> promise) { + Transaction<Object> deleteTxn = metadataStore.transaction(); + metadataStore.deleteLogSegment(deleteTxn, segmentMetadata, new Transaction.OpListener<Void>() { + @Override + public void onCommit(Void r) { + // purge log segment + removeLogSegmentFromCache(segmentMetadata.getZNodeName()); + promise.setValue(segmentMetadata); + } + + @Override + public void onAbort(Throwable t) { + if (t instanceof LogSegmentNotFoundException) { + // purge log segment + removeLogSegmentFromCache(segmentMetadata.getZNodeName()); + promise.setValue(segmentMetadata); + return; + } else { + LOG.error("Couldn't purge {} for {}: with error {}", + new Object[]{ segmentMetadata, getFullyQualifiedName(), t }); + promise.setException(t); + } + } + }); + deleteTxn.execute(); + } + + @Override + public Future<Void> asyncClose() { + return Utils.closeSequence(scheduler, + lock, + logSegmentAllocator); + } + + @Override + public Future<Void> asyncAbort() { + return asyncClose(); + } + + String completedLedgerZNodeName(long firstTxId, long lastTxId, long logSegmentSeqNo) { + if (DistributedLogConstants.LOGSEGMENT_NAME_VERSION == conf.getLogSegmentNameVersion()) { + return String.format("%s_%018d", DistributedLogConstants.COMPLETED_LOGSEGMENT_PREFIX, logSegmentSeqNo); + } else { + return String.format("%s_%018d_%018d", DistributedLogConstants.COMPLETED_LOGSEGMENT_PREFIX, + firstTxId, lastTxId); + } + } + + /** + * Get the znode path for a finalize ledger + */ + String completedLedgerZNode(long firstTxId, long lastTxId, long logSegmentSeqNo) { + return String.format("%s/%s", logMetadata.getLogSegmentsPath(), + completedLedgerZNodeName(firstTxId, lastTxId, logSegmentSeqNo)); + } + + /** + * Get the name of the inprogress znode. + * + * @return name of the inprogress znode. + */ + String inprogressZNodeName(long logSegmentId, long firstTxId, long logSegmentSeqNo) { + if (DistributedLogConstants.LOGSEGMENT_NAME_VERSION == conf.getLogSegmentNameVersion()) { + // Lots of the problems are introduced due to different inprogress names with same ledger sequence number. + return String.format("%s_%018d", DistributedLogConstants.INPROGRESS_LOGSEGMENT_PREFIX, logSegmentSeqNo); + } else { + return DistributedLogConstants.INPROGRESS_LOGSEGMENT_PREFIX + "_" + Long.toString(firstTxId, 16); + } + } + + /** + * Get the znode path for the inprogressZNode + */ + String inprogressZNode(long logSegmentId, long firstTxId, long logSegmentSeqNo) { + return logMetadata.getLogSegmentsPath() + "/" + inprogressZNodeName(logSegmentId, firstTxId, logSegmentSeqNo); + } + + String inprogressZNode(String inprogressZNodeName) { + return logMetadata.getLogSegmentsPath() + "/" + inprogressZNodeName; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java new file mode 100644 index 0000000..bf89823 --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java @@ -0,0 +1,276 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Ticker; +import org.apache.distributedlog.exceptions.EndOfStreamException; +import org.apache.distributedlog.exceptions.IdleReaderException; +import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.util.Utils; +import com.twitter.util.Future; +import com.twitter.util.Promise; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.versioning.Versioned; +import scala.runtime.AbstractFunction1; +import scala.runtime.BoxedUnit; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Synchronous Log Reader based on {@link AsyncLogReader} + */ +class BKSyncLogReader implements LogReader, AsyncNotification { + + private final BKDistributedLogManager bkdlm; + private final BKLogReadHandler readHandler; + private final AtomicReference<IOException> readerException = + new AtomicReference<IOException>(null); + private final int maxReadAheadWaitTime; + private Promise<Void> closeFuture; + private final Optional<Long> startTransactionId; + private boolean positioned = false; + private Entry.Reader currentEntry = null; + + // readahead reader + ReadAheadEntryReader readAheadReader = null; + + // idle reader settings + private final boolean shouldCheckIdleReader; + private final int idleErrorThresholdMillis; + + // Stats + private final Counter idleReaderError; + + BKSyncLogReader(DistributedLogConfiguration conf, + BKDistributedLogManager bkdlm, + DLSN startDLSN, + Optional<Long> startTransactionId, + StatsLogger statsLogger) throws IOException { + this.bkdlm = bkdlm; + this.readHandler = bkdlm.createReadHandler( + Optional.<String>absent(), + this, + true); + this.maxReadAheadWaitTime = conf.getReadAheadWaitTime(); + this.idleErrorThresholdMillis = conf.getReaderIdleErrorThresholdMillis(); + this.shouldCheckIdleReader = idleErrorThresholdMillis > 0 && idleErrorThresholdMillis < Integer.MAX_VALUE; + this.startTransactionId = startTransactionId; + + // start readahead + startReadAhead(startDLSN); + if (!startTransactionId.isPresent()) { + positioned = true; + } + + // Stats + StatsLogger syncReaderStatsLogger = statsLogger.scope("sync_reader"); + idleReaderError = syncReaderStatsLogger.getCounter("idle_reader_error"); + } + + private void startReadAhead(DLSN startDLSN) throws IOException { + readAheadReader = new ReadAheadEntryReader( + bkdlm.getStreamName(), + startDLSN, + bkdlm.getConf(), + readHandler, + bkdlm.getReaderEntryStore(), + bkdlm.getScheduler(), + Ticker.systemTicker(), + bkdlm.alertStatsLogger); + readHandler.registerListener(readAheadReader); + readHandler.asyncStartFetchLogSegments() + .map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() { + @Override + public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) { + readAheadReader.addStateChangeNotification(BKSyncLogReader.this); + readAheadReader.start(logSegments.getValue()); + return BoxedUnit.UNIT; + } + }); + } + + @VisibleForTesting + ReadAheadEntryReader getReadAheadReader() { + return readAheadReader; + } + + @VisibleForTesting + BKLogReadHandler getReadHandler() { + return readHandler; + } + + private Entry.Reader readNextEntry(boolean nonBlocking) throws IOException { + Entry.Reader entry = null; + if (nonBlocking) { + return readAheadReader.getNextReadAheadEntry(0L, TimeUnit.MILLISECONDS); + } else { + while (!readAheadReader.isReadAheadCaughtUp() + && null == readerException.get() + && null == entry) { + entry = readAheadReader.getNextReadAheadEntry(maxReadAheadWaitTime, TimeUnit.MILLISECONDS); + } + if (null != entry) { + return entry; + } + // reader is caught up + if (readAheadReader.isReadAheadCaughtUp() + && null == readerException.get()) { + entry = readAheadReader.getNextReadAheadEntry(maxReadAheadWaitTime, TimeUnit.MILLISECONDS); + } + return entry; + } + } + + private void markReaderAsIdle() throws IdleReaderException { + idleReaderError.inc(); + IdleReaderException ire = new IdleReaderException("Sync reader on stream " + + readHandler.getFullyQualifiedName() + + " is idle for more than " + idleErrorThresholdMillis + " ms"); + readerException.compareAndSet(null, ire); + throw ire; + } + + @Override + public synchronized LogRecordWithDLSN readNext(boolean nonBlocking) + throws IOException { + if (null != readerException.get()) { + throw readerException.get(); + } + LogRecordWithDLSN record = doReadNext(nonBlocking); + // no record is returned, check if the reader becomes idle + if (null == record && shouldCheckIdleReader) { + if (readAheadReader.getNumCachedEntries() <= 0 && + readAheadReader.isReaderIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS)) { + markReaderAsIdle(); + } + } + return record; + } + + private LogRecordWithDLSN doReadNext(boolean nonBlocking) throws IOException { + LogRecordWithDLSN record = null; + + do { + // fetch one record until we don't find any entry available in the readahead cache + while (null == record) { + if (null == currentEntry) { + currentEntry = readNextEntry(nonBlocking); + if (null == currentEntry) { + return null; + } + } + record = currentEntry.nextRecord(); + if (null == record) { + currentEntry = null; + } + } + + // check if we reached the end of stream + if (record.isEndOfStream()) { + EndOfStreamException eos = new EndOfStreamException("End of Stream Reached for " + + readHandler.getFullyQualifiedName()); + readerException.compareAndSet(null, eos); + throw eos; + } + // skip control records + if (record.isControl()) { + record = null; + continue; + } + if (!positioned) { + if (record.getTransactionId() < startTransactionId.get()) { + record = null; + continue; + } else { + positioned = true; + break; + } + } else { + break; + } + } while (true); + return record; + } + + @Override + public synchronized List<LogRecordWithDLSN> readBulk(boolean nonBlocking, int numLogRecords) + throws IOException { + LinkedList<LogRecordWithDLSN> retList = + new LinkedList<LogRecordWithDLSN>(); + + int numRead = 0; + LogRecordWithDLSN record = readNext(nonBlocking); + while ((null != record)) { + retList.add(record); + numRead++; + if (numRead >= numLogRecords) { + break; + } + record = readNext(nonBlocking); + } + return retList; + } + + @Override + public Future<Void> asyncClose() { + Promise<Void> closePromise; + synchronized (this) { + if (null != closeFuture) { + return closeFuture; + } + closeFuture = closePromise = new Promise<Void>(); + } + readHandler.unregisterListener(readAheadReader); + readAheadReader.removeStateChangeNotification(this); + Utils.closeSequence(bkdlm.getScheduler(), true, + readAheadReader, + readHandler + ).proxyTo(closePromise); + return closePromise; + } + + @Override + public void close() throws IOException { + FutureUtils.result(asyncClose()); + } + + // + // Notification From ReadHandler + // + + @Override + public void notifyOnError(Throwable cause) { + if (cause instanceof IOException) { + readerException.compareAndSet(null, (IOException) cause); + } else { + readerException.compareAndSet(null, new IOException(cause)); + } + } + + @Override + public void notifyOnOperationComplete() { + // no-op + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogWriter.java new file mode 100644 index 0000000..7d33d12 --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogWriter.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; +import org.apache.distributedlog.util.FutureUtils; + +import java.io.IOException; +import java.util.List; + +class BKSyncLogWriter extends BKAbstractLogWriter implements LogWriter { + + public BKSyncLogWriter(DistributedLogConfiguration conf, + DynamicDistributedLogConfiguration dynConf, + BKDistributedLogManager bkdlm) { + super(conf, dynConf, bkdlm); + } + /** + * Write log records to the stream. + * + * @param record operation + */ + @Override + public void write(LogRecord record) throws IOException { + getLedgerWriter(record.getTransactionId(), false).write(record); + } + + /** + * Write edits logs operation to the stream. + * + * @param records list of records + */ + @Override + @Deprecated + public int writeBulk(List<LogRecord> records) throws IOException { + return getLedgerWriter(records.get(0).getTransactionId(), false).writeBulk(records); + } + + /** + * Flushes all the data up to this point, + * adds the end of stream marker and marks the stream + * as read-only in the metadata. No appends to the + * stream will be allowed after this point + */ + @Override + public void markEndOfStream() throws IOException { + FutureUtils.result(getLedgerWriter(DistributedLogConstants.MAX_TXID, true).markEndOfStream()); + closeAndComplete(); + } + + /** + * All data that has been written to the stream so far will be flushed. + * New data can be still written to the stream while flush is ongoing. + */ + @Override + public long setReadyToFlush() throws IOException { + checkClosedOrInError("setReadyToFlush"); + long highestTransactionId = 0; + BKLogSegmentWriter writer = getCachedLogWriter(); + if (null != writer) { + highestTransactionId = Math.max(highestTransactionId, FutureUtils.result(writer.flush())); + } + return highestTransactionId; + } + + /** + * Commit data that is already flushed. + * <p/> + * This API is optional as the writer implements a policy for automatically syncing + * the log records in the buffer. The buffered edits can be flushed when the buffer + * becomes full or a certain period of time is elapsed. + */ + @Override + public long flushAndSync() throws IOException { + checkClosedOrInError("flushAndSync"); + + LOG.debug("FlushAndSync Started"); + long highestTransactionId = 0; + BKLogSegmentWriter writer = getCachedLogWriter(); + if (null != writer) { + highestTransactionId = Math.max(highestTransactionId, FutureUtils.result(writer.commit())); + LOG.debug("FlushAndSync Completed"); + } else { + LOG.debug("FlushAndSync Completed - Nothing to Flush"); + } + return highestTransactionId; + } + + /** + * Close the stream without necessarily flushing immediately. + * This may be called if the stream is in error such as after a + * previous write or close threw an exception. + */ + @Override + public void abort() throws IOException { + super.abort(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/BKTransmitPacket.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKTransmitPacket.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKTransmitPacket.java new file mode 100644 index 0000000..6ed662b --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKTransmitPacket.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import com.twitter.util.Await; +import com.twitter.util.Duration; +import com.twitter.util.FutureEventListener; +import com.twitter.util.Promise; + +import java.util.concurrent.TimeUnit; + +class BKTransmitPacket { + + private final EntryBuffer recordSet; + private final long transmitTime; + private final Promise<Integer> transmitComplete; + + BKTransmitPacket(EntryBuffer recordSet) { + this.recordSet = recordSet; + this.transmitTime = System.nanoTime(); + this.transmitComplete = new Promise<Integer>(); + } + + EntryBuffer getRecordSet() { + return recordSet; + } + + Promise<Integer> getTransmitFuture() { + return transmitComplete; + } + + /** + * Complete the transmit with result code <code>transmitRc</code>. + * <p>It would notify all the waiters that are waiting via {@link #awaitTransmitComplete(long, TimeUnit)} + * or {@link #addTransmitCompleteListener(FutureEventListener)}. + * + * @param transmitResult + * transmit result code. + */ + public void notifyTransmitComplete(int transmitResult) { + transmitComplete.setValue(transmitResult); + } + + /** + * Register a transmit complete listener. + * <p>The listener will be triggered with transmit result when transmit completes. + * The method should be non-blocking. + * + * @param transmitCompleteListener + * listener on transmit completion + * @see #awaitTransmitComplete(long, TimeUnit) + */ + void addTransmitCompleteListener(FutureEventListener<Integer> transmitCompleteListener) { + transmitComplete.addEventListener(transmitCompleteListener); + } + + /** + * Await for the transmit to be complete + * + * @param timeout + * wait timeout + * @param unit + * wait timeout unit + */ + int awaitTransmitComplete(long timeout, TimeUnit unit) + throws Exception { + return Await.result(transmitComplete, + Duration.fromTimeUnit(timeout, unit)); + } + + public long getTransmitTime() { + return transmitTime; + } + +}