http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java deleted file mode 100644 index 2486297..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java +++ /dev/null @@ -1,1325 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Stopwatch; -import com.google.common.collect.Lists; -import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; -import com.twitter.distributedlog.exceptions.DLIllegalStateException; -import com.twitter.distributedlog.exceptions.EndOfStreamException; -import com.twitter.distributedlog.exceptions.LockingException; -import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException; -import com.twitter.distributedlog.exceptions.TransactionIdOutOfOrderException; -import com.twitter.distributedlog.exceptions.UnexpectedException; -import com.twitter.distributedlog.function.GetLastTxIdFunction; -import com.twitter.distributedlog.logsegment.LogSegmentEntryStore; -import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter; -import com.twitter.distributedlog.metadata.LogMetadataForWriter; -import com.twitter.distributedlog.lock.DistributedLock; -import com.twitter.distributedlog.logsegment.LogSegmentFilter; -import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache; -import com.twitter.distributedlog.logsegment.RollingPolicy; -import com.twitter.distributedlog.logsegment.SizeBasedRollingPolicy; -import com.twitter.distributedlog.logsegment.TimeBasedRollingPolicy; -import com.twitter.distributedlog.metadata.LogStreamMetadataStore; -import com.twitter.distributedlog.metadata.MetadataUpdater; -import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater; -import com.twitter.distributedlog.util.Allocator; -import com.twitter.distributedlog.util.DLUtils; -import com.twitter.distributedlog.util.FailpointUtils; -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.distributedlog.util.FutureUtils.FutureEventListenerRunnable; -import com.twitter.distributedlog.util.OrderedScheduler; -import com.twitter.distributedlog.util.Transaction; -import com.twitter.distributedlog.util.PermitLimiter; -import com.twitter.distributedlog.util.Utils; -import com.twitter.util.Function; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import org.apache.bookkeeper.feature.FeatureProvider; -import org.apache.bookkeeper.stats.AlertStatsLogger; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.versioning.Version; -import org.apache.bookkeeper.versioning.Versioned; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction1; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import static com.twitter.distributedlog.impl.ZKLogSegmentFilters.WRITE_HANDLE_FILTER; - -/** - * Log Handler for Writers. - * - * <h3>Metrics</h3> - * All the metrics about log write handler are exposed under scope `segments`. - * <ul> - * <li> `segments`/open : opstats. latency characteristics on starting a new log segment. - * <li> `segments`/close : opstats. latency characteristics on completing an inprogress log segment. - * <li> `segments`/recover : opstats. latency characteristics on recovering a log segment. - * <li> `segments`/delete : opstats. latency characteristics on deleting a log segment. - * </ul> - */ -class BKLogWriteHandler extends BKLogHandler { - static final Logger LOG = LoggerFactory.getLogger(BKLogReadHandler.class); - - private static Transaction.OpListener<LogSegmentEntryWriter> NULL_OP_LISTENER = - new Transaction.OpListener<LogSegmentEntryWriter>() { - @Override - public void onCommit(LogSegmentEntryWriter r) { - // no-op - } - - @Override - public void onAbort(Throwable t) { - // no-op - } - }; - - protected final LogMetadataForWriter logMetadataForWriter; - protected final Allocator<LogSegmentEntryWriter, Object> logSegmentAllocator; - protected final DistributedLock lock; - protected final MaxTxId maxTxId; - protected final MaxLogSegmentSequenceNo maxLogSegmentSequenceNo; - protected final boolean validateLogSegmentSequenceNumber; - protected final int regionId; - protected final RollingPolicy rollingPolicy; - protected Future<? extends DistributedLock> lockFuture = null; - protected final PermitLimiter writeLimiter; - protected final FeatureProvider featureProvider; - protected final DynamicDistributedLogConfiguration dynConf; - protected final MetadataUpdater metadataUpdater; - // tracking the inprogress log segments - protected final LinkedList<Long> inprogressLSSNs; - - // Fetch LogSegments State: write can continue without full list of log segments while truncation needs - private final Future<Versioned<List<LogSegmentMetadata>>> fetchForWrite; - private Future<Versioned<List<LogSegmentMetadata>>> fetchForTruncation; - - // Recover Functions - private final RecoverLogSegmentFunction recoverLogSegmentFunction = - new RecoverLogSegmentFunction(); - private final AbstractFunction1<List<LogSegmentMetadata>, Future<Long>> recoverLogSegmentsFunction = - new AbstractFunction1<List<LogSegmentMetadata>, Future<Long>>() { - @Override - public Future<Long> apply(List<LogSegmentMetadata> segmentList) { - LOG.info("Initiating Recovery For {} : {}", getFullyQualifiedName(), segmentList); - // if lastLedgerRollingTimeMillis is not updated, we set it to now. - synchronized (BKLogWriteHandler.this) { - if (lastLedgerRollingTimeMillis < 0) { - lastLedgerRollingTimeMillis = Utils.nowInMillis(); - } - } - - if (validateLogSegmentSequenceNumber) { - synchronized (inprogressLSSNs) { - for (LogSegmentMetadata segment : segmentList) { - if (segment.isInProgress()) { - inprogressLSSNs.addLast(segment.getLogSegmentSequenceNumber()); - } - } - } - } - - return FutureUtils.processList(segmentList, recoverLogSegmentFunction, scheduler).map( - GetLastTxIdFunction.INSTANCE); - } - }; - - // Stats - private final StatsLogger perLogStatsLogger; - private final OpStatsLogger closeOpStats; - private final OpStatsLogger openOpStats; - private final OpStatsLogger recoverOpStats; - private final OpStatsLogger deleteOpStats; - - /** - * Construct a Bookkeeper journal manager. - */ - BKLogWriteHandler(LogMetadataForWriter logMetadata, - DistributedLogConfiguration conf, - LogStreamMetadataStore streamMetadataStore, - LogSegmentMetadataCache metadataCache, - LogSegmentEntryStore entryStore, - OrderedScheduler scheduler, - Allocator<LogSegmentEntryWriter, Object> segmentAllocator, - StatsLogger statsLogger, - StatsLogger perLogStatsLogger, - AlertStatsLogger alertStatsLogger, - String clientId, - int regionId, - PermitLimiter writeLimiter, - FeatureProvider featureProvider, - DynamicDistributedLogConfiguration dynConf, - DistributedLock lock /** owned by handler **/) { - super(logMetadata, - conf, - streamMetadataStore, - metadataCache, - entryStore, - scheduler, - statsLogger, - alertStatsLogger, - clientId); - this.logMetadataForWriter = logMetadata; - this.logSegmentAllocator = segmentAllocator; - this.perLogStatsLogger = perLogStatsLogger; - this.writeLimiter = writeLimiter; - this.featureProvider = featureProvider; - this.dynConf = dynConf; - this.lock = lock; - this.metadataUpdater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, metadataStore); - - if (conf.getEncodeRegionIDInLogSegmentMetadata()) { - this.regionId = regionId; - } else { - this.regionId = DistributedLogConstants.LOCAL_REGION_ID; - } - this.validateLogSegmentSequenceNumber = conf.isLogSegmentSequenceNumberValidationEnabled(); - - // Construct the max sequence no - maxLogSegmentSequenceNo = new MaxLogSegmentSequenceNo(logMetadata.getMaxLSSNData()); - inprogressLSSNs = new LinkedList<Long>(); - // Construct the max txn id. - maxTxId = new MaxTxId(logMetadata.getMaxTxIdData()); - - // Schedule fetching log segment list in background before we access it. - // We don't need to watch the log segment list changes for writer, as it manages log segment list. - fetchForWrite = readLogSegmentsFromStore( - LogSegmentMetadata.COMPARATOR, - WRITE_HANDLE_FILTER, - null); - - // Initialize other parameters. - setLastLedgerRollingTimeMillis(Utils.nowInMillis()); - - // Rolling Policy - if (conf.getLogSegmentRollingIntervalMinutes() > 0) { - rollingPolicy = new TimeBasedRollingPolicy(conf.getLogSegmentRollingIntervalMinutes() * 60 * 1000L); - } else { - rollingPolicy = new SizeBasedRollingPolicy(conf.getMaxLogSegmentBytes()); - } - - // Stats - StatsLogger segmentsStatsLogger = statsLogger.scope("segments"); - openOpStats = segmentsStatsLogger.getOpStatsLogger("open"); - closeOpStats = segmentsStatsLogger.getOpStatsLogger("close"); - recoverOpStats = segmentsStatsLogger.getOpStatsLogger("recover"); - deleteOpStats = segmentsStatsLogger.getOpStatsLogger("delete"); - } - - private Future<List<LogSegmentMetadata>> getCachedLogSegmentsAfterFirstFetch( - final Comparator<LogSegmentMetadata> comparator) { - final Promise<List<LogSegmentMetadata>> promise = new Promise<List<LogSegmentMetadata>>(); - fetchForWrite.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { - @Override - public void onFailure(Throwable cause) { - FutureUtils.setException(promise, cause); - } - - @Override - public void onSuccess(Versioned<List<LogSegmentMetadata>> result) { - try { - FutureUtils.setValue(promise, getCachedLogSegments(comparator)); - } catch (UnexpectedException e) { - FutureUtils.setException(promise, e); - } - } - }); - return promise; - } - - private Future<List<LogSegmentMetadata>> getCachedLogSegmentsAfterFirstFullFetch( - final Comparator<LogSegmentMetadata> comparator) { - Future<Versioned<List<LogSegmentMetadata>>> result; - synchronized (this) { - if (null == fetchForTruncation) { - fetchForTruncation = readLogSegmentsFromStore( - LogSegmentMetadata.COMPARATOR, - LogSegmentFilter.DEFAULT_FILTER, - null); - } - result = fetchForTruncation; - } - - final Promise<List<LogSegmentMetadata>> promise = new Promise<List<LogSegmentMetadata>>(); - result.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { - @Override - public void onFailure(Throwable cause) { - FutureUtils.setException(promise, cause); - } - - @Override - public void onSuccess(Versioned<List<LogSegmentMetadata>> result) { - try { - FutureUtils.setValue(promise, getCachedLogSegments(comparator)); - } catch (UnexpectedException e) { - FutureUtils.setException(promise, e); - } - } - }); - return promise; - } - - // Transactional operations for MaxLogSegmentSequenceNo - void storeMaxSequenceNumber(final Transaction<Object> txn, - final MaxLogSegmentSequenceNo maxSeqNo, - final long seqNo, - final boolean isInprogress) { - metadataStore.storeMaxLogSegmentSequenceNumber(txn, logMetadata, maxSeqNo.getVersionedData(seqNo), - new Transaction.OpListener<Version>() { - @Override - public void onCommit(Version version) { - if (validateLogSegmentSequenceNumber) { - synchronized (inprogressLSSNs) { - if (isInprogress) { - inprogressLSSNs.add(seqNo); - } else { - inprogressLSSNs.removeFirst(); - } - } - } - maxSeqNo.update(version, seqNo); - } - - @Override - public void onAbort(Throwable t) { - // no-op - } - }); - } - - // Transactional operations for MaxTxId - void storeMaxTxId(final Transaction<Object> txn, - final MaxTxId maxTxId, - final long txId) { - metadataStore.storeMaxTxnId(txn, logMetadataForWriter, maxTxId.getVersionedData(txId), - new Transaction.OpListener<Version>() { - @Override - public void onCommit(Version version) { - maxTxId.update(version, txId); - } - - @Override - public void onAbort(Throwable t) { - // no-op - } - }); - } - - // Transactional operations for logsegment - void writeLogSegment(final Transaction<Object> txn, - final LogSegmentMetadata metadata) { - metadataStore.createLogSegment(txn, metadata, new Transaction.OpListener<Void>() { - @Override - public void onCommit(Void r) { - addLogSegmentToCache(metadata.getSegmentName(), metadata); - } - - @Override - public void onAbort(Throwable t) { - // no-op - } - }); - } - - void deleteLogSegment(final Transaction<Object> txn, - final LogSegmentMetadata metadata) { - metadataStore.deleteLogSegment(txn, metadata, new Transaction.OpListener<Void>() { - @Override - public void onCommit(Void r) { - removeLogSegmentFromCache(metadata.getSegmentName()); - } - - @Override - public void onAbort(Throwable t) { - // no-op - } - }); - } - - /** - * The caller could call this before any actions, which to hold the lock for - * the write handler of its whole lifecycle. The lock will only be released - * when closing the write handler. - * - * This method is useful to prevent releasing underlying zookeeper lock during - * recovering/completing log segments. Releasing underlying zookeeper lock means - * 1) increase latency when re-lock on starting new log segment. 2) increase the - * possibility of a stream being re-acquired by other instances. - * - * @return future represents the lock result - */ - Future<? extends DistributedLock> lockHandler() { - if (null != lockFuture) { - return lockFuture; - } - lockFuture = lock.asyncAcquire(); - return lockFuture; - } - - Future<Void> unlockHandler() { - if (null != lockFuture) { - return lock.asyncClose(); - } else { - return Future.Void(); - } - } - - /** - * Start a new log segment in a BookKeeper ledger. - * First ensure that we have the write lock for this journal. - * Then create a ledger and stream based on that ledger. - * The ledger id is written to the inprogress znode, so that in the - * case of a crash, a recovery process can find the ledger we were writing - * to when we crashed. - * - * @param txId First transaction id to be written to the stream - * @return - * @throws IOException - */ - public BKLogSegmentWriter startLogSegment(long txId) throws IOException { - return startLogSegment(txId, false, false); - } - - /** - * Start a new log segment in a BookKeeper ledger. - * First ensure that we have the write lock for this journal. - * Then create a ledger and stream based on that ledger. - * The ledger id is written to the inprogress znode, so that in the - * case of a crash, a recovery process can find the ledger we were writing - * to when we crashed. - * - * @param txId First transaction id to be written to the stream - * @param bestEffort - * @param allowMaxTxID - * allow using max tx id to start log segment - * @return - * @throws IOException - */ - public BKLogSegmentWriter startLogSegment(long txId, boolean bestEffort, boolean allowMaxTxID) - throws IOException { - Stopwatch stopwatch = Stopwatch.createStarted(); - boolean success = false; - try { - BKLogSegmentWriter writer = doStartLogSegment(txId, bestEffort, allowMaxTxID); - success = true; - return writer; - } finally { - if (success) { - openOpStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); - } else { - openOpStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); - } - } - } - - protected long assignLogSegmentSequenceNumber() throws IOException { - // For any active stream we will always make sure that there is at least one - // active ledger (except when the stream first starts out). Therefore when we - // see no ledger metadata for a stream, we assume that this is the first ledger - // in the stream - long logSegmentSeqNo = DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO; - boolean logSegmentsFound = false; - - if (LogSegmentMetadata.supportsLogSegmentSequenceNo(conf.getDLLedgerMetadataLayoutVersion())) { - List<LogSegmentMetadata> ledgerListDesc = getCachedLogSegments(LogSegmentMetadata.DESC_COMPARATOR); - Long nextLogSegmentSeqNo = DLUtils.nextLogSegmentSequenceNumber(ledgerListDesc); - - if (null == nextLogSegmentSeqNo) { - logSegmentsFound = false; - // we don't find last assigned log segment sequence number - // then we start the log segment with configured FirstLogSegmentSequenceNumber. - logSegmentSeqNo = conf.getFirstLogSegmentSequenceNumber(); - } else { - logSegmentsFound = true; - // latest log segment is assigned with a sequence number, start with next sequence number - logSegmentSeqNo = nextLogSegmentSeqNo; - } - } - - // We only skip log segment sequence number validation only when no log segments found & - // the maximum log segment sequence number is "UNASSIGNED". - if (!logSegmentsFound && - (DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO == maxLogSegmentSequenceNo.getSequenceNumber())) { - // no ledger seqno stored in /ledgers before - LOG.info("No max ledger sequence number found while creating log segment {} for {}.", - logSegmentSeqNo, getFullyQualifiedName()); - } else if (maxLogSegmentSequenceNo.getSequenceNumber() + 1 != logSegmentSeqNo) { - LOG.warn("Unexpected max log segment sequence number {} for {} : list of cached segments = {}", - new Object[]{maxLogSegmentSequenceNo.getSequenceNumber(), getFullyQualifiedName(), - getCachedLogSegments(LogSegmentMetadata.DESC_COMPARATOR)}); - // there is max log segment number recorded there and it isn't match. throw exception. - throw new DLIllegalStateException("Unexpected max log segment sequence number " - + maxLogSegmentSequenceNo.getSequenceNumber() + " for " + getFullyQualifiedName() - + ", expected " + (logSegmentSeqNo - 1)); - } - - return logSegmentSeqNo; - } - - protected BKLogSegmentWriter doStartLogSegment(long txId, boolean bestEffort, boolean allowMaxTxID) throws IOException { - return FutureUtils.result(asyncStartLogSegment(txId, bestEffort, allowMaxTxID)); - } - - protected Future<BKLogSegmentWriter> asyncStartLogSegment(final long txId, - final boolean bestEffort, - final boolean allowMaxTxID) { - final Promise<BKLogSegmentWriter> promise = new Promise<BKLogSegmentWriter>(); - try { - lock.checkOwnershipAndReacquire(); - } catch (LockingException e) { - FutureUtils.setException(promise, e); - return promise; - } - fetchForWrite.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { - @Override - public void onFailure(Throwable cause) { - FutureUtils.setException(promise, cause); - } - - @Override - public void onSuccess(Versioned<List<LogSegmentMetadata>> list) { - doStartLogSegment(txId, bestEffort, allowMaxTxID, promise); - } - }); - return promise; - } - - protected void doStartLogSegment(final long txId, - final boolean bestEffort, - final boolean allowMaxTxID, - final Promise<BKLogSegmentWriter> promise) { - // validate the tx id - if ((txId < 0) || - (!allowMaxTxID && (txId == DistributedLogConstants.MAX_TXID))) { - FutureUtils.setException(promise, new IOException("Invalid Transaction Id " + txId)); - return; - } - - long highestTxIdWritten = maxTxId.get(); - if (txId < highestTxIdWritten) { - if (highestTxIdWritten == DistributedLogConstants.MAX_TXID) { - LOG.error("We've already marked the stream as ended and attempting to start a new log segment"); - FutureUtils.setException(promise, new EndOfStreamException("Writing to a stream after it has been marked as completed")); - return; - } else { - LOG.error("We've already seen TxId {} the max TXId is {}", txId, highestTxIdWritten); - FutureUtils.setException(promise, new TransactionIdOutOfOrderException(txId, highestTxIdWritten)); - return; - } - } - - try { - logSegmentAllocator.allocate(); - } catch (IOException e) { - // failed to issue an allocation request - failStartLogSegment(promise, bestEffort, e); - return; - } - - // start the transaction from zookeeper - final Transaction<Object> txn = streamMetadataStore.newTransaction(); - - // failpoint injected before creating ledger - try { - FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_StartLogSegmentBeforeLedgerCreate); - } catch (IOException ioe) { - failStartLogSegment(promise, bestEffort, ioe); - return; - } - - logSegmentAllocator.tryObtain(txn, NULL_OP_LISTENER) - .addEventListener(new FutureEventListener<LogSegmentEntryWriter>() { - - @Override - public void onSuccess(LogSegmentEntryWriter entryWriter) { - // try-obtain succeed - createInprogressLogSegment( - txn, - txId, - entryWriter, - bestEffort, - promise); - } - - @Override - public void onFailure(Throwable cause) { - failStartLogSegment(promise, bestEffort, cause); - } - }); - } - - private void failStartLogSegment(Promise<BKLogSegmentWriter> promise, - boolean bestEffort, - Throwable cause) { - if (bestEffort) { - FutureUtils.setValue(promise, null); - } else { - FutureUtils.setException(promise, cause); - } - } - - // once the ledger handle is obtained from allocator, this function should guarantee - // either the transaction is executed or aborted. Otherwise, the ledger handle will - // just leak from the allocation pool - hence cause "No Ledger Allocator" - private void createInprogressLogSegment(Transaction<Object> txn, - final long txId, - final LogSegmentEntryWriter entryWriter, - boolean bestEffort, - final Promise<BKLogSegmentWriter> promise) { - final long logSegmentSeqNo; - try { - FailpointUtils.checkFailPoint( - FailpointUtils.FailPointName.FP_StartLogSegmentOnAssignLogSegmentSequenceNumber); - logSegmentSeqNo = assignLogSegmentSequenceNumber(); - } catch (IOException e) { - // abort the current prepared transaction - txn.abort(e); - failStartLogSegment(promise, bestEffort, e); - return; - } - - final String inprogressZnodePath = inprogressZNode( - entryWriter.getLogSegmentId(), txId, logSegmentSeqNo); - final LogSegmentMetadata l = - new LogSegmentMetadata.LogSegmentMetadataBuilder(inprogressZnodePath, - conf.getDLLedgerMetadataLayoutVersion(), entryWriter.getLogSegmentId(), txId) - .setLogSegmentSequenceNo(logSegmentSeqNo) - .setRegionId(regionId) - .setEnvelopeEntries( - LogSegmentMetadata.supportsEnvelopedEntries(conf.getDLLedgerMetadataLayoutVersion())) - .build(); - - // Create an inprogress segment - writeLogSegment(txn, l); - - // Try storing max sequence number. - LOG.debug("Try storing max sequence number in startLogSegment {} : {}", inprogressZnodePath, logSegmentSeqNo); - storeMaxSequenceNumber(txn, maxLogSegmentSequenceNo, logSegmentSeqNo, true); - - // Try storing max tx id. - LOG.debug("Try storing MaxTxId in startLogSegment {} {}", inprogressZnodePath, txId); - storeMaxTxId(txn, maxTxId, txId); - - txn.execute().addEventListener(FutureEventListenerRunnable.of(new FutureEventListener<Void>() { - - @Override - public void onSuccess(Void value) { - try { - FutureUtils.setValue(promise, new BKLogSegmentWriter( - getFullyQualifiedName(), - l.getSegmentName(), - conf, - conf.getDLLedgerMetadataLayoutVersion(), - entryWriter, - lock, - txId, - logSegmentSeqNo, - scheduler, - statsLogger, - perLogStatsLogger, - alertStatsLogger, - writeLimiter, - featureProvider, - dynConf)); - } catch (IOException ioe) { - failStartLogSegment(promise, false, ioe); - } - } - - @Override - public void onFailure(Throwable cause) { - failStartLogSegment(promise, false, cause); - } - }, scheduler)); - } - - boolean shouldStartNewSegment(BKLogSegmentWriter writer) { - return rollingPolicy.shouldRollover(writer, lastLedgerRollingTimeMillis); - } - - /** - * Finalize a log segment. If the journal manager is currently - * writing to a ledger, ensure that this is the ledger of the log segment - * being finalized. - * <p/> - * Otherwise this is the recovery case. In the recovery case, ensure that - * the firstTxId of the ledger matches firstTxId for the segment we are - * trying to finalize. - */ - Future<LogSegmentMetadata> completeAndCloseLogSegment(final BKLogSegmentWriter writer) { - final Promise<LogSegmentMetadata> promise = new Promise<LogSegmentMetadata>(); - completeAndCloseLogSegment(writer, promise); - return promise; - } - - private void completeAndCloseLogSegment(final BKLogSegmentWriter writer, - final Promise<LogSegmentMetadata> promise) { - writer.asyncClose().addEventListener(new FutureEventListener<Void>() { - @Override - public void onSuccess(Void value) { - // in theory closeToFinalize should throw exception if a stream is in error. - // just in case, add another checking here to make sure we don't close log segment is a stream is in error. - if (writer.shouldFailCompleteLogSegment()) { - FutureUtils.setException(promise, - new IOException("LogSegmentWriter for " + writer.getFullyQualifiedLogSegment() + " is already in error.")); - return; - } - doCompleteAndCloseLogSegment( - inprogressZNodeName(writer.getLogSegmentId(), writer.getStartTxId(), writer.getLogSegmentSequenceNumber()), - writer.getLogSegmentSequenceNumber(), - writer.getLogSegmentId(), - writer.getStartTxId(), - writer.getLastTxId(), - writer.getPositionWithinLogSegment(), - writer.getLastDLSN().getEntryId(), - writer.getLastDLSN().getSlotId(), - promise); - } - - @Override - public void onFailure(Throwable cause) { - FutureUtils.setException(promise, cause); - } - }); - } - - @VisibleForTesting - LogSegmentMetadata completeAndCloseLogSegment(long logSegmentSeqNo, - long logSegmentId, - long firstTxId, - long lastTxId, - int recordCount) - throws IOException { - return completeAndCloseLogSegment(inprogressZNodeName(logSegmentId, firstTxId, logSegmentSeqNo), logSegmentSeqNo, - logSegmentId, firstTxId, lastTxId, recordCount, -1, -1); - } - - /** - * Finalize a log segment. If the journal manager is currently - * writing to a ledger, ensure that this is the ledger of the log segment - * being finalized. - * <p/> - * Otherwise this is the recovery case. In the recovery case, ensure that - * the firstTxId of the ledger matches firstTxId for the segment we are - * trying to finalize. - */ - LogSegmentMetadata completeAndCloseLogSegment(String inprogressZnodeName, long logSegmentSeqNo, - long logSegmentId, long firstTxId, long lastTxId, - int recordCount, long lastEntryId, long lastSlotId) - throws IOException { - Stopwatch stopwatch = Stopwatch.createStarted(); - boolean success = false; - try { - LogSegmentMetadata completedLogSegment = - doCompleteAndCloseLogSegment(inprogressZnodeName, logSegmentSeqNo, - logSegmentId, firstTxId, lastTxId, recordCount, - lastEntryId, lastSlotId); - success = true; - return completedLogSegment; - } finally { - if (success) { - closeOpStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); - } else { - closeOpStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); - } - } - } - - protected long computeStartSequenceId(LogSegmentMetadata segment) throws IOException { - if (!segment.isInProgress()) { - return segment.getStartSequenceId(); - } - - long startSequenceId = DistributedLogConstants.UNASSIGNED_SEQUENCE_ID; - - // we only record sequence id when both write version and logsegment's version support sequence id - if (LogSegmentMetadata.supportsSequenceId(conf.getDLLedgerMetadataLayoutVersion()) - && segment.supportsSequenceId()) { - List<LogSegmentMetadata> logSegmentDescList = - getCachedLogSegments(LogSegmentMetadata.DESC_COMPARATOR); - startSequenceId = DLUtils.computeStartSequenceId(logSegmentDescList, segment); - } - - return startSequenceId; - } - - /** - * Close log segment - * - * @param inprogressZnodeName - * @param logSegmentSeqNo - * @param logSegmentId - * @param firstTxId - * @param lastTxId - * @param recordCount - * @param lastEntryId - * @param lastSlotId - * @throws IOException - */ - protected LogSegmentMetadata doCompleteAndCloseLogSegment( - String inprogressZnodeName, - long logSegmentSeqNo, - long logSegmentId, - long firstTxId, - long lastTxId, - int recordCount, - long lastEntryId, - long lastSlotId) throws IOException { - Promise<LogSegmentMetadata> promise = new Promise<LogSegmentMetadata>(); - doCompleteAndCloseLogSegment( - inprogressZnodeName, - logSegmentSeqNo, - logSegmentId, - firstTxId, - lastTxId, - recordCount, - lastEntryId, - lastSlotId, - promise); - return FutureUtils.result(promise); - } - - protected void doCompleteAndCloseLogSegment(final String inprogressZnodeName, - final long logSegmentSeqNo, - final long logSegmentId, - final long firstTxId, - final long lastTxId, - final int recordCount, - final long lastEntryId, - final long lastSlotId, - final Promise<LogSegmentMetadata> promise) { - fetchForWrite.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { - @Override - public void onFailure(Throwable cause) { - FutureUtils.setException(promise, cause); - } - - @Override - public void onSuccess(Versioned<List<LogSegmentMetadata>> segments) { - doCompleteAndCloseLogSegmentAfterLogSegmentListFetched( - inprogressZnodeName, - logSegmentSeqNo, - logSegmentId, - firstTxId, - lastTxId, - recordCount, - lastEntryId, - lastSlotId, - promise); - } - }); - } - - private void doCompleteAndCloseLogSegmentAfterLogSegmentListFetched( - final String inprogressZnodeName, - long logSegmentSeqNo, - long logSegmentId, - long firstTxId, - long lastTxId, - int recordCount, - long lastEntryId, - long lastSlotId, - final Promise<LogSegmentMetadata> promise) { - try { - lock.checkOwnershipAndReacquire(); - } catch (IOException ioe) { - FutureUtils.setException(promise, ioe); - return; - } - - LOG.debug("Completing and Closing Log Segment {} {}", firstTxId, lastTxId); - LogSegmentMetadata inprogressLogSegment = readLogSegmentFromCache(inprogressZnodeName); - - // validate log segment - if (inprogressLogSegment.getLogSegmentId() != logSegmentId) { - FutureUtils.setException(promise, new IOException( - "Active ledger has different ID to inprogress. " - + inprogressLogSegment.getLogSegmentId() + " found, " - + logSegmentId + " expected")); - return; - } - // validate the transaction id - if (inprogressLogSegment.getFirstTxId() != firstTxId) { - FutureUtils.setException(promise, new IOException("Transaction id not as expected, " - + inprogressLogSegment.getFirstTxId() + " found, " + firstTxId + " expected")); - return; - } - // validate the log sequence number - if (validateLogSegmentSequenceNumber) { - synchronized (inprogressLSSNs) { - if (inprogressLSSNs.isEmpty()) { - FutureUtils.setException(promise, new UnexpectedException( - "Didn't find matched inprogress log segments when completing inprogress " - + inprogressLogSegment)); - return; - } - long leastInprogressLSSN = inprogressLSSNs.getFirst(); - // the log segment sequence number in metadata {@link inprogressLogSegment.getLogSegmentSequenceNumber()} - // should be same as the sequence number we are completing (logSegmentSeqNo) - // and - // it should also be same as the least inprogress log segment sequence number tracked in {@link inprogressLSSNs} - if ((inprogressLogSegment.getLogSegmentSequenceNumber() != logSegmentSeqNo) || - (leastInprogressLSSN != logSegmentSeqNo)) { - FutureUtils.setException(promise, new UnexpectedException( - "Didn't find matched inprogress log segments when completing inprogress " - + inprogressLogSegment)); - return; - } - } - } - - // store max sequence number. - long maxSeqNo= Math.max(logSegmentSeqNo, maxLogSegmentSequenceNo.getSequenceNumber()); - if (maxLogSegmentSequenceNo.getSequenceNumber() == logSegmentSeqNo || - (maxLogSegmentSequenceNo.getSequenceNumber() == logSegmentSeqNo + 1)) { - // ignore the case that a new inprogress log segment is pre-allocated - // before completing current inprogress one - LOG.info("Try storing max sequence number {} in completing {}.", - new Object[] { logSegmentSeqNo, inprogressLogSegment.getZkPath() }); - } else { - LOG.warn("Unexpected max ledger sequence number {} found while completing log segment {} for {}", - new Object[] { maxLogSegmentSequenceNo.getSequenceNumber(), logSegmentSeqNo, getFullyQualifiedName() }); - if (validateLogSegmentSequenceNumber) { - FutureUtils.setException(promise, new DLIllegalStateException("Unexpected max log segment sequence number " - + maxLogSegmentSequenceNo.getSequenceNumber() + " for " + getFullyQualifiedName() - + ", expected " + (logSegmentSeqNo - 1))); - return; - } - } - - // Prepare the completion - final String pathForCompletedLedger = completedLedgerZNode(firstTxId, lastTxId, logSegmentSeqNo); - long startSequenceId; - try { - startSequenceId = computeStartSequenceId(inprogressLogSegment); - } catch (IOException ioe) { - FutureUtils.setException(promise, ioe); - return; - } - // write completed ledger znode - final LogSegmentMetadata completedLogSegment = - inprogressLogSegment.completeLogSegment( - pathForCompletedLedger, - lastTxId, - recordCount, - lastEntryId, - lastSlotId, - startSequenceId); - setLastLedgerRollingTimeMillis(completedLogSegment.getCompletionTime()); - - // prepare the transaction - Transaction<Object> txn = streamMetadataStore.newTransaction(); - - // create completed log segment - writeLogSegment(txn, completedLogSegment); - // delete inprogress log segment - deleteLogSegment(txn, inprogressLogSegment); - // store max sequence number - storeMaxSequenceNumber(txn, maxLogSegmentSequenceNo, maxSeqNo, false); - // update max txn id. - LOG.debug("Trying storing LastTxId in Finalize Path {} LastTxId {}", pathForCompletedLedger, lastTxId); - storeMaxTxId(txn, maxTxId, lastTxId); - - txn.execute().addEventListener(FutureEventListenerRunnable.of(new FutureEventListener<Void>() { - @Override - public void onSuccess(Void value) { - LOG.info("Completed {} to {} for {} : {}", - new Object[] { inprogressZnodeName, completedLogSegment.getSegmentName(), - getFullyQualifiedName(), completedLogSegment }); - FutureUtils.setValue(promise, completedLogSegment); - } - - @Override - public void onFailure(Throwable cause) { - FutureUtils.setException(promise, cause); - } - }, scheduler)); - } - - public Future<Long> recoverIncompleteLogSegments() { - try { - FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_RecoverIncompleteLogSegments); - } catch (IOException ioe) { - return Future.exception(ioe); - } - return getCachedLogSegmentsAfterFirstFetch(LogSegmentMetadata.COMPARATOR).flatMap(recoverLogSegmentsFunction); - } - - class RecoverLogSegmentFunction extends Function<LogSegmentMetadata, Future<LogSegmentMetadata>> { - - @Override - public Future<LogSegmentMetadata> apply(final LogSegmentMetadata l) { - if (!l.isInProgress()) { - return Future.value(l); - } - - LOG.info("Recovering last record in log segment {} for {}.", l, getFullyQualifiedName()); - return asyncReadLastRecord(l, true, true, true).flatMap( - new AbstractFunction1<LogRecordWithDLSN, Future<LogSegmentMetadata>>() { - @Override - public Future<LogSegmentMetadata> apply(LogRecordWithDLSN lastRecord) { - return completeLogSegment(l, lastRecord); - } - }); - } - - private Future<LogSegmentMetadata> completeLogSegment(LogSegmentMetadata l, - LogRecordWithDLSN lastRecord) { - LOG.info("Recovered last record in log segment {} for {}.", l, getFullyQualifiedName()); - - long endTxId = DistributedLogConstants.EMPTY_LOGSEGMENT_TX_ID; - int recordCount = 0; - long lastEntryId = -1; - long lastSlotId = -1; - - if (null != lastRecord) { - endTxId = lastRecord.getTransactionId(); - recordCount = lastRecord.getLastPositionWithinLogSegment(); - lastEntryId = lastRecord.getDlsn().getEntryId(); - lastSlotId = lastRecord.getDlsn().getSlotId(); - } - - if (endTxId == DistributedLogConstants.INVALID_TXID) { - LOG.error("Unrecoverable corruption has occurred in segment " - + l.toString() + " at path " + l.getZkPath() - + ". Unable to continue recovery."); - return Future.exception(new IOException("Unrecoverable corruption," - + " please check logs.")); - } else if (endTxId == DistributedLogConstants.EMPTY_LOGSEGMENT_TX_ID) { - // TODO: Empty ledger - Ideally we should just remove it? - endTxId = l.getFirstTxId(); - } - - Promise<LogSegmentMetadata> promise = new Promise<LogSegmentMetadata>(); - doCompleteAndCloseLogSegment( - l.getZNodeName(), - l.getLogSegmentSequenceNumber(), - l.getLogSegmentId(), - l.getFirstTxId(), - endTxId, - recordCount, - lastEntryId, - lastSlotId, - promise); - return promise; - } - - } - - Future<List<LogSegmentMetadata>> setLogSegmentsOlderThanDLSNTruncated(final DLSN dlsn) { - if (DLSN.InvalidDLSN == dlsn) { - List<LogSegmentMetadata> emptyList = new ArrayList<LogSegmentMetadata>(0); - return Future.value(emptyList); - } - return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).flatMap( - new AbstractFunction1<List<LogSegmentMetadata>, Future<List<LogSegmentMetadata>>>() { - @Override - public Future<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) { - return setLogSegmentsOlderThanDLSNTruncated(logSegments, dlsn); - } - }); - } - - private Future<List<LogSegmentMetadata>> setLogSegmentsOlderThanDLSNTruncated(List<LogSegmentMetadata> logSegments, - final DLSN dlsn) { - LOG.debug("Setting truncation status on logs older than {} from {} for {}", - new Object[]{dlsn, logSegments, getFullyQualifiedName()}); - List<LogSegmentMetadata> truncateList = new ArrayList<LogSegmentMetadata>(logSegments.size()); - LogSegmentMetadata partialTruncate = null; - LOG.info("{}: Truncating log segments older than {}", getFullyQualifiedName(), dlsn); - for (int i = 0; i < logSegments.size(); i++) { - LogSegmentMetadata l = logSegments.get(i); - if (!l.isInProgress()) { - if (l.getLastDLSN().compareTo(dlsn) < 0) { - LOG.debug("{}: Truncating log segment {} ", getFullyQualifiedName(), l); - truncateList.add(l); - } else if (l.getFirstDLSN().compareTo(dlsn) < 0) { - // Can be satisfied by at most one segment - if (null != partialTruncate) { - String logMsg = String.format("Potential metadata inconsistency for stream %s at segment %s", getFullyQualifiedName(), l); - LOG.error(logMsg); - return Future.exception(new DLIllegalStateException(logMsg)); - } - LOG.info("{}: Partially truncating log segment {} older than {}.", new Object[] {getFullyQualifiedName(), l, dlsn}); - partialTruncate = l; - } else { - break; - } - } else { - break; - } - } - return setLogSegmentTruncationStatus(truncateList, partialTruncate, dlsn); - } - - private int getNumCandidateLogSegmentsToPurge(List<LogSegmentMetadata> logSegments) { - if (logSegments.isEmpty()) { - return 0; - } else { - // we have to keep at least one completed log segment for sequence id - int numCandidateLogSegments = 0; - for (LogSegmentMetadata segment : logSegments) { - if (segment.isInProgress()) { - break; - } else { - ++numCandidateLogSegments; - } - } - - return numCandidateLogSegments - 1; - } - } - - Future<List<LogSegmentMetadata>> purgeLogSegmentsOlderThanTimestamp(final long minTimestampToKeep) { - if (minTimestampToKeep >= Utils.nowInMillis()) { - return Future.exception(new IllegalArgumentException( - "Invalid timestamp " + minTimestampToKeep + " to purge logs for " + getFullyQualifiedName())); - } - return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).flatMap( - new Function<List<LogSegmentMetadata>, Future<List<LogSegmentMetadata>>>() { - @Override - public Future<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) { - List<LogSegmentMetadata> purgeList = new ArrayList<LogSegmentMetadata>(logSegments.size()); - - int numCandidates = getNumCandidateLogSegmentsToPurge(logSegments); - - for (int iterator = 0; iterator < numCandidates; iterator++) { - LogSegmentMetadata l = logSegments.get(iterator); - // When application explicitly truncates segments; timestamp based purge is - // only used to cleanup log segments that have been marked for truncation - if ((l.isTruncated() || !conf.getExplicitTruncationByApplication()) && - !l.isInProgress() && (l.getCompletionTime() < minTimestampToKeep)) { - purgeList.add(l); - } else { - // stop truncating log segments if we find either an inprogress or a partially - // truncated log segment - break; - } - } - LOG.info("Deleting log segments older than {} for {} : {}", - new Object[] { minTimestampToKeep, getFullyQualifiedName(), purgeList }); - return deleteLogSegments(purgeList); - } - }); - } - - Future<List<LogSegmentMetadata>> purgeLogSegmentsOlderThanTxnId(final long minTxIdToKeep) { - return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).flatMap( - new AbstractFunction1<List<LogSegmentMetadata>, Future<List<LogSegmentMetadata>>>() { - @Override - public Future<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) { - int numLogSegmentsToProcess; - - if (minTxIdToKeep < 0) { - // we are deleting the log, we can remove whole log segments - numLogSegmentsToProcess = logSegments.size(); - } else { - numLogSegmentsToProcess = getNumCandidateLogSegmentsToPurge(logSegments); - } - List<LogSegmentMetadata> purgeList = Lists.newArrayListWithExpectedSize(numLogSegmentsToProcess); - for (int iterator = 0; iterator < numLogSegmentsToProcess; iterator++) { - LogSegmentMetadata l = logSegments.get(iterator); - if ((minTxIdToKeep < 0) || - ((l.isTruncated() || !conf.getExplicitTruncationByApplication()) && - !l.isInProgress() && (l.getLastTxId() < minTxIdToKeep))) { - purgeList.add(l); - } else { - // stop truncating log segments if we find either an inprogress or a partially - // truncated log segment - break; - } - } - return deleteLogSegments(purgeList); - } - }); - } - - private Future<List<LogSegmentMetadata>> setLogSegmentTruncationStatus( - final List<LogSegmentMetadata> truncateList, - LogSegmentMetadata partialTruncate, - DLSN minActiveDLSN) { - final List<LogSegmentMetadata> listToTruncate = Lists.newArrayListWithCapacity(truncateList.size() + 1); - final List<LogSegmentMetadata> listAfterTruncated = Lists.newArrayListWithCapacity(truncateList.size() + 1); - Transaction<Object> updateTxn = metadataUpdater.transaction(); - for(LogSegmentMetadata l : truncateList) { - if (!l.isTruncated()) { - LogSegmentMetadata newSegment = metadataUpdater.setLogSegmentTruncated(updateTxn, l); - listToTruncate.add(l); - listAfterTruncated.add(newSegment); - } - } - - if (null != partialTruncate && (partialTruncate.isNonTruncated() || - (partialTruncate.isPartiallyTruncated() && (partialTruncate.getMinActiveDLSN().compareTo(minActiveDLSN) < 0)))) { - LogSegmentMetadata newSegment = metadataUpdater.setLogSegmentPartiallyTruncated( - updateTxn, partialTruncate, minActiveDLSN); - listToTruncate.add(partialTruncate); - listAfterTruncated.add(newSegment); - } - - return updateTxn.execute().map(new AbstractFunction1<Void, List<LogSegmentMetadata>>() { - @Override - public List<LogSegmentMetadata> apply(Void value) { - for (int i = 0; i < listToTruncate.size(); i++) { - removeLogSegmentFromCache(listToTruncate.get(i).getSegmentName()); - LogSegmentMetadata newSegment = listAfterTruncated.get(i); - addLogSegmentToCache(newSegment.getSegmentName(), newSegment); - } - return listAfterTruncated; - } - }); - } - - private Future<List<LogSegmentMetadata>> deleteLogSegments( - final List<LogSegmentMetadata> logs) { - if (LOG.isTraceEnabled()) { - LOG.trace("Purging logs for {} : {}", getFullyQualifiedName(), logs); - } - return FutureUtils.processList(logs, - new Function<LogSegmentMetadata, Future<LogSegmentMetadata>>() { - @Override - public Future<LogSegmentMetadata> apply(LogSegmentMetadata segment) { - return deleteLogSegment(segment); - } - }, scheduler); - } - - private Future<LogSegmentMetadata> deleteLogSegment( - final LogSegmentMetadata ledgerMetadata) { - LOG.info("Deleting ledger {} for {}", ledgerMetadata, getFullyQualifiedName()); - final Promise<LogSegmentMetadata> promise = new Promise<LogSegmentMetadata>(); - final Stopwatch stopwatch = Stopwatch.createStarted(); - promise.addEventListener(new FutureEventListener<LogSegmentMetadata>() { - @Override - public void onSuccess(LogSegmentMetadata segment) { - deleteOpStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); - } - - @Override - public void onFailure(Throwable cause) { - deleteOpStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); - } - }); - entryStore.deleteLogSegment(ledgerMetadata) - .addEventListener(new FutureEventListener<LogSegmentMetadata>() { - @Override - public void onFailure(Throwable cause) { - FutureUtils.setException(promise, cause); - } - - @Override - public void onSuccess(LogSegmentMetadata segment) { - deleteLogSegmentMetadata(segment, promise); - } - }); - return promise; - } - - private void deleteLogSegmentMetadata(final LogSegmentMetadata segmentMetadata, - final Promise<LogSegmentMetadata> promise) { - Transaction<Object> deleteTxn = metadataStore.transaction(); - metadataStore.deleteLogSegment(deleteTxn, segmentMetadata, new Transaction.OpListener<Void>() { - @Override - public void onCommit(Void r) { - // purge log segment - removeLogSegmentFromCache(segmentMetadata.getZNodeName()); - promise.setValue(segmentMetadata); - } - - @Override - public void onAbort(Throwable t) { - if (t instanceof LogSegmentNotFoundException) { - // purge log segment - removeLogSegmentFromCache(segmentMetadata.getZNodeName()); - promise.setValue(segmentMetadata); - return; - } else { - LOG.error("Couldn't purge {} for {}: with error {}", - new Object[]{ segmentMetadata, getFullyQualifiedName(), t }); - promise.setException(t); - } - } - }); - deleteTxn.execute(); - } - - @Override - public Future<Void> asyncClose() { - return Utils.closeSequence(scheduler, - lock, - logSegmentAllocator); - } - - @Override - public Future<Void> asyncAbort() { - return asyncClose(); - } - - String completedLedgerZNodeName(long firstTxId, long lastTxId, long logSegmentSeqNo) { - if (DistributedLogConstants.LOGSEGMENT_NAME_VERSION == conf.getLogSegmentNameVersion()) { - return String.format("%s_%018d", DistributedLogConstants.COMPLETED_LOGSEGMENT_PREFIX, logSegmentSeqNo); - } else { - return String.format("%s_%018d_%018d", DistributedLogConstants.COMPLETED_LOGSEGMENT_PREFIX, - firstTxId, lastTxId); - } - } - - /** - * Get the znode path for a finalize ledger - */ - String completedLedgerZNode(long firstTxId, long lastTxId, long logSegmentSeqNo) { - return String.format("%s/%s", logMetadata.getLogSegmentsPath(), - completedLedgerZNodeName(firstTxId, lastTxId, logSegmentSeqNo)); - } - - /** - * Get the name of the inprogress znode. - * - * @return name of the inprogress znode. - */ - String inprogressZNodeName(long logSegmentId, long firstTxId, long logSegmentSeqNo) { - if (DistributedLogConstants.LOGSEGMENT_NAME_VERSION == conf.getLogSegmentNameVersion()) { - // Lots of the problems are introduced due to different inprogress names with same ledger sequence number. - return String.format("%s_%018d", DistributedLogConstants.INPROGRESS_LOGSEGMENT_PREFIX, logSegmentSeqNo); - } else { - return DistributedLogConstants.INPROGRESS_LOGSEGMENT_PREFIX + "_" + Long.toString(firstTxId, 16); - } - } - - /** - * Get the znode path for the inprogressZNode - */ - String inprogressZNode(long logSegmentId, long firstTxId, long logSegmentSeqNo) { - return logMetadata.getLogSegmentsPath() + "/" + inprogressZNodeName(logSegmentId, firstTxId, logSegmentSeqNo); - } - - String inprogressZNode(String inprogressZNodeName) { - return logMetadata.getLogSegmentsPath() + "/" + inprogressZNodeName; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReader.java deleted file mode 100644 index 308f42a..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReader.java +++ /dev/null @@ -1,276 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.base.Ticker; -import com.twitter.distributedlog.exceptions.EndOfStreamException; -import com.twitter.distributedlog.exceptions.IdleReaderException; -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.distributedlog.util.Utils; -import com.twitter.util.Future; -import com.twitter.util.Promise; -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.versioning.Versioned; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; - -import java.io.IOException; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -/** - * Synchronous Log Reader based on {@link AsyncLogReader} - */ -class BKSyncLogReader implements LogReader, AsyncNotification { - - private final BKDistributedLogManager bkdlm; - private final BKLogReadHandler readHandler; - private final AtomicReference<IOException> readerException = - new AtomicReference<IOException>(null); - private final int maxReadAheadWaitTime; - private Promise<Void> closeFuture; - private final Optional<Long> startTransactionId; - private boolean positioned = false; - private Entry.Reader currentEntry = null; - - // readahead reader - ReadAheadEntryReader readAheadReader = null; - - // idle reader settings - private final boolean shouldCheckIdleReader; - private final int idleErrorThresholdMillis; - - // Stats - private final Counter idleReaderError; - - BKSyncLogReader(DistributedLogConfiguration conf, - BKDistributedLogManager bkdlm, - DLSN startDLSN, - Optional<Long> startTransactionId, - StatsLogger statsLogger) throws IOException { - this.bkdlm = bkdlm; - this.readHandler = bkdlm.createReadHandler( - Optional.<String>absent(), - this, - true); - this.maxReadAheadWaitTime = conf.getReadAheadWaitTime(); - this.idleErrorThresholdMillis = conf.getReaderIdleErrorThresholdMillis(); - this.shouldCheckIdleReader = idleErrorThresholdMillis > 0 && idleErrorThresholdMillis < Integer.MAX_VALUE; - this.startTransactionId = startTransactionId; - - // start readahead - startReadAhead(startDLSN); - if (!startTransactionId.isPresent()) { - positioned = true; - } - - // Stats - StatsLogger syncReaderStatsLogger = statsLogger.scope("sync_reader"); - idleReaderError = syncReaderStatsLogger.getCounter("idle_reader_error"); - } - - private void startReadAhead(DLSN startDLSN) throws IOException { - readAheadReader = new ReadAheadEntryReader( - bkdlm.getStreamName(), - startDLSN, - bkdlm.getConf(), - readHandler, - bkdlm.getReaderEntryStore(), - bkdlm.getScheduler(), - Ticker.systemTicker(), - bkdlm.alertStatsLogger); - readHandler.registerListener(readAheadReader); - readHandler.asyncStartFetchLogSegments() - .map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() { - @Override - public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) { - readAheadReader.addStateChangeNotification(BKSyncLogReader.this); - readAheadReader.start(logSegments.getValue()); - return BoxedUnit.UNIT; - } - }); - } - - @VisibleForTesting - ReadAheadEntryReader getReadAheadReader() { - return readAheadReader; - } - - @VisibleForTesting - BKLogReadHandler getReadHandler() { - return readHandler; - } - - private Entry.Reader readNextEntry(boolean nonBlocking) throws IOException { - Entry.Reader entry = null; - if (nonBlocking) { - return readAheadReader.getNextReadAheadEntry(0L, TimeUnit.MILLISECONDS); - } else { - while (!readAheadReader.isReadAheadCaughtUp() - && null == readerException.get() - && null == entry) { - entry = readAheadReader.getNextReadAheadEntry(maxReadAheadWaitTime, TimeUnit.MILLISECONDS); - } - if (null != entry) { - return entry; - } - // reader is caught up - if (readAheadReader.isReadAheadCaughtUp() - && null == readerException.get()) { - entry = readAheadReader.getNextReadAheadEntry(maxReadAheadWaitTime, TimeUnit.MILLISECONDS); - } - return entry; - } - } - - private void markReaderAsIdle() throws IdleReaderException { - idleReaderError.inc(); - IdleReaderException ire = new IdleReaderException("Sync reader on stream " - + readHandler.getFullyQualifiedName() - + " is idle for more than " + idleErrorThresholdMillis + " ms"); - readerException.compareAndSet(null, ire); - throw ire; - } - - @Override - public synchronized LogRecordWithDLSN readNext(boolean nonBlocking) - throws IOException { - if (null != readerException.get()) { - throw readerException.get(); - } - LogRecordWithDLSN record = doReadNext(nonBlocking); - // no record is returned, check if the reader becomes idle - if (null == record && shouldCheckIdleReader) { - if (readAheadReader.getNumCachedEntries() <= 0 && - readAheadReader.isReaderIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS)) { - markReaderAsIdle(); - } - } - return record; - } - - private LogRecordWithDLSN doReadNext(boolean nonBlocking) throws IOException { - LogRecordWithDLSN record = null; - - do { - // fetch one record until we don't find any entry available in the readahead cache - while (null == record) { - if (null == currentEntry) { - currentEntry = readNextEntry(nonBlocking); - if (null == currentEntry) { - return null; - } - } - record = currentEntry.nextRecord(); - if (null == record) { - currentEntry = null; - } - } - - // check if we reached the end of stream - if (record.isEndOfStream()) { - EndOfStreamException eos = new EndOfStreamException("End of Stream Reached for " - + readHandler.getFullyQualifiedName()); - readerException.compareAndSet(null, eos); - throw eos; - } - // skip control records - if (record.isControl()) { - record = null; - continue; - } - if (!positioned) { - if (record.getTransactionId() < startTransactionId.get()) { - record = null; - continue; - } else { - positioned = true; - break; - } - } else { - break; - } - } while (true); - return record; - } - - @Override - public synchronized List<LogRecordWithDLSN> readBulk(boolean nonBlocking, int numLogRecords) - throws IOException { - LinkedList<LogRecordWithDLSN> retList = - new LinkedList<LogRecordWithDLSN>(); - - int numRead = 0; - LogRecordWithDLSN record = readNext(nonBlocking); - while ((null != record)) { - retList.add(record); - numRead++; - if (numRead >= numLogRecords) { - break; - } - record = readNext(nonBlocking); - } - return retList; - } - - @Override - public Future<Void> asyncClose() { - Promise<Void> closePromise; - synchronized (this) { - if (null != closeFuture) { - return closeFuture; - } - closeFuture = closePromise = new Promise<Void>(); - } - readHandler.unregisterListener(readAheadReader); - readAheadReader.removeStateChangeNotification(this); - Utils.closeSequence(bkdlm.getScheduler(), true, - readAheadReader, - readHandler - ).proxyTo(closePromise); - return closePromise; - } - - @Override - public void close() throws IOException { - FutureUtils.result(asyncClose()); - } - - // - // Notification From ReadHandler - // - - @Override - public void notifyOnError(Throwable cause) { - if (cause instanceof IOException) { - readerException.compareAndSet(null, (IOException) cause); - } else { - readerException.compareAndSet(null, new IOException(cause)); - } - } - - @Override - public void notifyOnOperationComplete() { - // no-op - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogWriter.java deleted file mode 100644 index b638020..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogWriter.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; -import com.twitter.distributedlog.util.FutureUtils; - -import java.io.IOException; -import java.util.List; - -class BKSyncLogWriter extends BKAbstractLogWriter implements LogWriter { - - public BKSyncLogWriter(DistributedLogConfiguration conf, - DynamicDistributedLogConfiguration dynConf, - BKDistributedLogManager bkdlm) { - super(conf, dynConf, bkdlm); - } - /** - * Write log records to the stream. - * - * @param record operation - */ - @Override - public void write(LogRecord record) throws IOException { - getLedgerWriter(record.getTransactionId(), false).write(record); - } - - /** - * Write edits logs operation to the stream. - * - * @param records list of records - */ - @Override - @Deprecated - public int writeBulk(List<LogRecord> records) throws IOException { - return getLedgerWriter(records.get(0).getTransactionId(), false).writeBulk(records); - } - - /** - * Flushes all the data up to this point, - * adds the end of stream marker and marks the stream - * as read-only in the metadata. No appends to the - * stream will be allowed after this point - */ - @Override - public void markEndOfStream() throws IOException { - FutureUtils.result(getLedgerWriter(DistributedLogConstants.MAX_TXID, true).markEndOfStream()); - closeAndComplete(); - } - - /** - * All data that has been written to the stream so far will be flushed. - * New data can be still written to the stream while flush is ongoing. - */ - @Override - public long setReadyToFlush() throws IOException { - checkClosedOrInError("setReadyToFlush"); - long highestTransactionId = 0; - BKLogSegmentWriter writer = getCachedLogWriter(); - if (null != writer) { - highestTransactionId = Math.max(highestTransactionId, FutureUtils.result(writer.flush())); - } - return highestTransactionId; - } - - /** - * Commit data that is already flushed. - * <p/> - * This API is optional as the writer implements a policy for automatically syncing - * the log records in the buffer. The buffered edits can be flushed when the buffer - * becomes full or a certain period of time is elapsed. - */ - @Override - public long flushAndSync() throws IOException { - checkClosedOrInError("flushAndSync"); - - LOG.debug("FlushAndSync Started"); - long highestTransactionId = 0; - BKLogSegmentWriter writer = getCachedLogWriter(); - if (null != writer) { - highestTransactionId = Math.max(highestTransactionId, FutureUtils.result(writer.commit())); - LOG.debug("FlushAndSync Completed"); - } else { - LOG.debug("FlushAndSync Completed - Nothing to Flush"); - } - return highestTransactionId; - } - - /** - * Close the stream without necessarily flushing immediately. - * This may be called if the stream is in error such as after a - * previous write or close threw an exception. - */ - @Override - public void abort() throws IOException { - super.abort(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/BKTransmitPacket.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKTransmitPacket.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKTransmitPacket.java deleted file mode 100644 index 4586602..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKTransmitPacket.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.twitter.util.Await; -import com.twitter.util.Duration; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; - -import java.util.concurrent.TimeUnit; - -class BKTransmitPacket { - - private final EntryBuffer recordSet; - private final long transmitTime; - private final Promise<Integer> transmitComplete; - - BKTransmitPacket(EntryBuffer recordSet) { - this.recordSet = recordSet; - this.transmitTime = System.nanoTime(); - this.transmitComplete = new Promise<Integer>(); - } - - EntryBuffer getRecordSet() { - return recordSet; - } - - Promise<Integer> getTransmitFuture() { - return transmitComplete; - } - - /** - * Complete the transmit with result code <code>transmitRc</code>. - * <p>It would notify all the waiters that are waiting via {@link #awaitTransmitComplete(long, TimeUnit)} - * or {@link #addTransmitCompleteListener(FutureEventListener)}. - * - * @param transmitResult - * transmit result code. - */ - public void notifyTransmitComplete(int transmitResult) { - transmitComplete.setValue(transmitResult); - } - - /** - * Register a transmit complete listener. - * <p>The listener will be triggered with transmit result when transmit completes. - * The method should be non-blocking. - * - * @param transmitCompleteListener - * listener on transmit completion - * @see #awaitTransmitComplete(long, TimeUnit) - */ - void addTransmitCompleteListener(FutureEventListener<Integer> transmitCompleteListener) { - transmitComplete.addEventListener(transmitCompleteListener); - } - - /** - * Await for the transmit to be complete - * - * @param timeout - * wait timeout - * @param unit - * wait timeout unit - */ - int awaitTransmitComplete(long timeout, TimeUnit unit) - throws Exception { - return Await.result(transmitComplete, - Duration.fromTimeUnit(timeout, unit)); - } - - public long getTransmitTime() { - return transmitTime; - } - -}