http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java deleted file mode 100644 index 0cf8ed5..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java +++ /dev/null @@ -1,715 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.google.common.base.Stopwatch; -import com.twitter.distributedlog.callback.LogSegmentNamesListener; -import com.twitter.distributedlog.exceptions.LogEmptyException; -import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException; -import com.twitter.distributedlog.exceptions.UnexpectedException; -import com.twitter.distributedlog.logsegment.LogSegmentEntryStore; -import com.twitter.distributedlog.metadata.LogMetadata; -import com.twitter.distributedlog.io.AsyncAbortable; -import com.twitter.distributedlog.io.AsyncCloseable; -import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache; -import com.twitter.distributedlog.logsegment.PerStreamLogSegmentCache; -import com.twitter.distributedlog.logsegment.LogSegmentFilter; -import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore; -import com.twitter.distributedlog.metadata.LogStreamMetadataStore; -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.distributedlog.util.OrderedScheduler; -import com.twitter.util.Function; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import org.apache.bookkeeper.stats.AlertStatsLogger; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.versioning.Version; -import org.apache.bookkeeper.versioning.Versioned; -import org.apache.commons.lang3.tuple.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -/** - * The base class about log handler on managing log segments. - * - * <h3>Metrics</h3> - * The log handler is a base class on managing log segments. so all the metrics - * here are related to log segments retrieval and exposed under `logsegments`. - * These metrics are all OpStats, in the format of <code>`scope`/logsegments/`op`</code>. - * <p> - * Those operations are: - * <ul> - * <li>get_inprogress_segment: time between the inprogress log segment created and - * the handler read it. - * <li>get_completed_segment: time between a log segment is turned to completed and - * the handler read it. - * <li>negative_get_inprogress_segment: record the negative values for `get_inprogress_segment`. - * <li>negative_get_completed_segment: record the negative values for `get_completed_segment`. - * <li>recover_last_entry: recovering last entry from a log segment - * <li>recover_scanned_entries: the number of entries that are scanned during recovering. - * </ul> - * @see BKLogWriteHandler - * @see BKLogReadHandler - */ -public abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { - static final Logger LOG = LoggerFactory.getLogger(BKLogHandler.class); - - protected final LogMetadata logMetadata; - protected final DistributedLogConfiguration conf; - protected final LogStreamMetadataStore streamMetadataStore; - protected final LogSegmentMetadataStore metadataStore; - protected final LogSegmentMetadataCache metadataCache; - protected final LogSegmentEntryStore entryStore; - protected final int firstNumEntriesPerReadLastRecordScan; - protected final int maxNumEntriesPerReadLastRecordScan; - protected volatile long lastLedgerRollingTimeMillis = -1; - protected final OrderedScheduler scheduler; - protected final StatsLogger statsLogger; - protected final AlertStatsLogger alertStatsLogger; - protected volatile boolean reportGetSegmentStats = false; - private final String lockClientId; - protected final AtomicReference<IOException> metadataException = new AtomicReference<IOException>(null); - - // Maintain the list of log segments per stream - protected final PerStreamLogSegmentCache logSegmentCache; - - // trace - protected final long metadataLatencyWarnThresholdMillis; - - // Stats - private final OpStatsLogger getInprogressSegmentStat; - private final OpStatsLogger getCompletedSegmentStat; - private final OpStatsLogger negativeGetInprogressSegmentStat; - private final OpStatsLogger negativeGetCompletedSegmentStat; - private final OpStatsLogger recoverLastEntryStats; - private final OpStatsLogger recoverScannedEntriesStats; - - /** - * Construct a Bookkeeper journal manager. - */ - BKLogHandler(LogMetadata metadata, - DistributedLogConfiguration conf, - LogStreamMetadataStore streamMetadataStore, - LogSegmentMetadataCache metadataCache, - LogSegmentEntryStore entryStore, - OrderedScheduler scheduler, - StatsLogger statsLogger, - AlertStatsLogger alertStatsLogger, - String lockClientId) { - this.logMetadata = metadata; - this.conf = conf; - this.scheduler = scheduler; - this.statsLogger = statsLogger; - this.alertStatsLogger = alertStatsLogger; - this.logSegmentCache = new PerStreamLogSegmentCache( - metadata.getLogName(), - conf.isLogSegmentSequenceNumberValidationEnabled()); - firstNumEntriesPerReadLastRecordScan = conf.getFirstNumEntriesPerReadLastRecordScan(); - maxNumEntriesPerReadLastRecordScan = conf.getMaxNumEntriesPerReadLastRecordScan(); - this.streamMetadataStore = streamMetadataStore; - this.metadataStore = streamMetadataStore.getLogSegmentMetadataStore(); - this.metadataCache = metadataCache; - this.entryStore = entryStore; - this.lockClientId = lockClientId; - - // Traces - this.metadataLatencyWarnThresholdMillis = conf.getMetadataLatencyWarnThresholdMillis(); - - // Stats - StatsLogger segmentsLogger = statsLogger.scope("logsegments"); - getInprogressSegmentStat = segmentsLogger.getOpStatsLogger("get_inprogress_segment"); - getCompletedSegmentStat = segmentsLogger.getOpStatsLogger("get_completed_segment"); - negativeGetInprogressSegmentStat = segmentsLogger.getOpStatsLogger("negative_get_inprogress_segment"); - negativeGetCompletedSegmentStat = segmentsLogger.getOpStatsLogger("negative_get_completed_segment"); - recoverLastEntryStats = segmentsLogger.getOpStatsLogger("recover_last_entry"); - recoverScannedEntriesStats = segmentsLogger.getOpStatsLogger("recover_scanned_entries"); - } - - BKLogHandler checkMetadataException() throws IOException { - if (null != metadataException.get()) { - throw metadataException.get(); - } - return this; - } - - public void reportGetSegmentStats(boolean enabled) { - this.reportGetSegmentStats = enabled; - } - - public String getLockClientId() { - return lockClientId; - } - - public Future<LogRecordWithDLSN> asyncGetFirstLogRecord() { - final Promise<LogRecordWithDLSN> promise = new Promise<LogRecordWithDLSN>(); - streamMetadataStore.logExists(logMetadata.getUri(), logMetadata.getLogName()) - .addEventListener(new FutureEventListener<Void>() { - @Override - public void onSuccess(Void value) { - readLogSegmentsFromStore( - LogSegmentMetadata.COMPARATOR, - LogSegmentFilter.DEFAULT_FILTER, - null - ).addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { - - @Override - public void onSuccess(Versioned<List<LogSegmentMetadata>> ledgerList) { - if (ledgerList.getValue().isEmpty()) { - promise.setException(new LogEmptyException("Log " + getFullyQualifiedName() + " has no records")); - return; - } - Future<LogRecordWithDLSN> firstRecord = null; - for (LogSegmentMetadata ledger : ledgerList.getValue()) { - if (!ledger.isTruncated() && (ledger.getRecordCount() > 0 || ledger.isInProgress())) { - firstRecord = asyncReadFirstUserRecord(ledger, DLSN.InitialDLSN); - break; - } - } - if (null != firstRecord) { - promise.become(firstRecord); - } else { - promise.setException(new LogEmptyException("Log " + getFullyQualifiedName() + " has no records")); - } - } - - @Override - public void onFailure(Throwable cause) { - promise.setException(cause); - } - }); - } - - @Override - public void onFailure(Throwable cause) { - promise.setException(cause); - } - }); - return promise; - } - - public Future<LogRecordWithDLSN> getLastLogRecordAsync(final boolean recover, final boolean includeEndOfStream) { - final Promise<LogRecordWithDLSN> promise = new Promise<LogRecordWithDLSN>(); - streamMetadataStore.logExists(logMetadata.getUri(), logMetadata.getLogName()) - .addEventListener(new FutureEventListener<Void>() { - @Override - public void onSuccess(Void value) { - readLogSegmentsFromStore( - LogSegmentMetadata.DESC_COMPARATOR, - LogSegmentFilter.DEFAULT_FILTER, - null - ).addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { - - @Override - public void onSuccess(Versioned<List<LogSegmentMetadata>> ledgerList) { - if (ledgerList.getValue().isEmpty()) { - promise.setException( - new LogEmptyException("Log " + getFullyQualifiedName() + " has no records")); - return; - } - asyncGetLastLogRecord( - ledgerList.getValue().iterator(), - promise, - recover, - false, - includeEndOfStream); - } - - @Override - public void onFailure(Throwable cause) { - promise.setException(cause); - } - }); - } - - @Override - public void onFailure(Throwable cause) { - promise.setException(cause); - } - }); - return promise; - } - - private void asyncGetLastLogRecord(final Iterator<LogSegmentMetadata> ledgerIter, - final Promise<LogRecordWithDLSN> promise, - final boolean fence, - final boolean includeControlRecord, - final boolean includeEndOfStream) { - if (ledgerIter.hasNext()) { - LogSegmentMetadata metadata = ledgerIter.next(); - asyncReadLastRecord(metadata, fence, includeControlRecord, includeEndOfStream).addEventListener( - new FutureEventListener<LogRecordWithDLSN>() { - @Override - public void onSuccess(LogRecordWithDLSN record) { - if (null == record) { - asyncGetLastLogRecord(ledgerIter, promise, fence, includeControlRecord, includeEndOfStream); - } else { - promise.setValue(record); - } - } - - @Override - public void onFailure(Throwable cause) { - promise.setException(cause); - } - } - ); - } else { - promise.setException(new LogEmptyException("Log " + getFullyQualifiedName() + " has no records")); - } - } - - private Future<LogRecordWithDLSN> asyncReadFirstUserRecord(LogSegmentMetadata ledger, DLSN beginDLSN) { - return ReadUtils.asyncReadFirstUserRecord( - getFullyQualifiedName(), - ledger, - firstNumEntriesPerReadLastRecordScan, - maxNumEntriesPerReadLastRecordScan, - new AtomicInteger(0), - scheduler, - entryStore, - beginDLSN - ); - } - - /** - * This is a helper method to compactly return the record count between two records, the first denoted by - * beginDLSN and the second denoted by endPosition. Its up to the caller to ensure that endPosition refers to - * position in the same ledger as beginDLSN. - */ - private Future<Long> asyncGetLogRecordCount(LogSegmentMetadata ledger, final DLSN beginDLSN, final long endPosition) { - return asyncReadFirstUserRecord(ledger, beginDLSN).map(new Function<LogRecordWithDLSN, Long>() { - public Long apply(final LogRecordWithDLSN beginRecord) { - long recordCount = 0; - if (null != beginRecord) { - recordCount = endPosition + 1 - beginRecord.getLastPositionWithinLogSegment(); - } - return recordCount; - } - }); - } - - /** - * Ledger metadata tells us how many records are in each completed segment, but for the first and last segments - * we may have to crack open the entry and count. For the first entry, we need to do so because beginDLSN may be - * an interior entry. For the last entry, if it is inprogress, we need to recover it and find the last user - * entry. - */ - private Future<Long> asyncGetLogRecordCount(final LogSegmentMetadata ledger, final DLSN beginDLSN) { - if (ledger.isInProgress() && ledger.isDLSNinThisSegment(beginDLSN)) { - return asyncReadLastUserRecord(ledger).flatMap(new Function<LogRecordWithDLSN, Future<Long>>() { - public Future<Long> apply(final LogRecordWithDLSN endRecord) { - if (null != endRecord) { - return asyncGetLogRecordCount(ledger, beginDLSN, endRecord.getLastPositionWithinLogSegment() /* end position */); - } else { - return Future.value((long) 0); - } - } - }); - } else if (ledger.isInProgress()) { - return asyncReadLastUserRecord(ledger).map(new Function<LogRecordWithDLSN, Long>() { - public Long apply(final LogRecordWithDLSN endRecord) { - if (null != endRecord) { - return (long) endRecord.getLastPositionWithinLogSegment(); - } else { - return (long) 0; - } - } - }); - } else if (ledger.isDLSNinThisSegment(beginDLSN)) { - return asyncGetLogRecordCount(ledger, beginDLSN, ledger.getRecordCount() /* end position */); - } else { - return Future.value((long) ledger.getRecordCount()); - } - } - - /** - * Get a count of records between beginDLSN and the end of the stream. - * - * @param beginDLSN dlsn marking the start of the range - * @return the count of records present in the range - */ - public Future<Long> asyncGetLogRecordCount(final DLSN beginDLSN) { - return streamMetadataStore.logExists(logMetadata.getUri(), logMetadata.getLogName()) - .flatMap(new Function<Void, Future<Long>>() { - public Future<Long> apply(Void done) { - - return readLogSegmentsFromStore( - LogSegmentMetadata.COMPARATOR, - LogSegmentFilter.DEFAULT_FILTER, - null - ).flatMap(new Function<Versioned<List<LogSegmentMetadata>>, Future<Long>>() { - public Future<Long> apply(Versioned<List<LogSegmentMetadata>> ledgerList) { - - List<Future<Long>> futureCounts = new ArrayList<Future<Long>>(ledgerList.getValue().size()); - for (LogSegmentMetadata ledger : ledgerList.getValue()) { - if (ledger.getLogSegmentSequenceNumber() >= beginDLSN.getLogSegmentSequenceNo()) { - futureCounts.add(asyncGetLogRecordCount(ledger, beginDLSN)); - } - } - return Future.collect(futureCounts).map(new Function<List<Long>, Long>() { - public Long apply(List<Long> counts) { - return sum(counts); - } - }); - } - }); - } - }); - } - - private Long sum(List<Long> values) { - long sum = 0; - for (Long value : values) { - sum += value; - } - return sum; - } - - @Override - public Future<Void> asyncAbort() { - return asyncClose(); - } - - public Future<LogRecordWithDLSN> asyncReadLastUserRecord(final LogSegmentMetadata l) { - return asyncReadLastRecord(l, false, false, false); - } - - public Future<LogRecordWithDLSN> asyncReadLastRecord(final LogSegmentMetadata l, - final boolean fence, - final boolean includeControl, - final boolean includeEndOfStream) { - final AtomicInteger numRecordsScanned = new AtomicInteger(0); - final Stopwatch stopwatch = Stopwatch.createStarted(); - return ReadUtils.asyncReadLastRecord( - getFullyQualifiedName(), - l, - fence, - includeControl, - includeEndOfStream, - firstNumEntriesPerReadLastRecordScan, - maxNumEntriesPerReadLastRecordScan, - numRecordsScanned, - scheduler, - entryStore - ).addEventListener(new FutureEventListener<LogRecordWithDLSN>() { - @Override - public void onSuccess(LogRecordWithDLSN value) { - recoverLastEntryStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); - recoverScannedEntriesStats.registerSuccessfulEvent(numRecordsScanned.get()); - } - - @Override - public void onFailure(Throwable cause) { - recoverLastEntryStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); - } - }); - } - - protected void setLastLedgerRollingTimeMillis(long rollingTimeMillis) { - if (lastLedgerRollingTimeMillis < rollingTimeMillis) { - lastLedgerRollingTimeMillis = rollingTimeMillis; - } - } - - public String getFullyQualifiedName() { - return logMetadata.getFullyQualifiedName(); - } - - // Log Segments Related Functions - // - // ***Note*** - // Get log segment list should go through #getCachedLogSegments as we need to assign start sequence id - // for inprogress log segment so the reader could generate the right sequence id. - // - // ***PerStreamCache vs LogSegmentMetadataCache ** - // The per stream cache maintains the list of segments per stream, while the metadata cache - // maintains log segments. The metadata cache is just to reduce the access to zookeeper, it is - // okay that some of the log segments are not in the cache; however the per stream cache can not - // have any gaps between log segment sequence numbers which it has to be accurate. - - /** - * Get the cached log segments. - * - * @param comparator the comparator to sort the returned log segments. - * @return list of sorted log segments - * @throws UnexpectedException if unexpected condition detected. - */ - protected List<LogSegmentMetadata> getCachedLogSegments(Comparator<LogSegmentMetadata> comparator) - throws UnexpectedException { - try { - return logSegmentCache.getLogSegments(comparator); - } catch (UnexpectedException ue) { - // the log segments cache went wrong - LOG.error("Unexpected exception on getting log segments from the cache for stream {}", - getFullyQualifiedName(), ue); - metadataException.compareAndSet(null, ue); - throw ue; - } - } - - /** - * Add the segment <i>metadata</i> for <i>name</i> in the cache. - * - * @param name - * segment znode name. - * @param metadata - * segment metadata. - */ - protected void addLogSegmentToCache(String name, LogSegmentMetadata metadata) { - metadataCache.put(metadata.getZkPath(), metadata); - logSegmentCache.add(name, metadata); - // update the last ledger rolling time - if (!metadata.isInProgress() && (lastLedgerRollingTimeMillis < metadata.getCompletionTime())) { - lastLedgerRollingTimeMillis = metadata.getCompletionTime(); - } - - if (reportGetSegmentStats) { - // update stats - long ts = System.currentTimeMillis(); - if (metadata.isInProgress()) { - // as we used timestamp as start tx id we could take it as start time - // NOTE: it is a hack here. - long elapsedMillis = ts - metadata.getFirstTxId(); - long elapsedMicroSec = TimeUnit.MILLISECONDS.toMicros(elapsedMillis); - if (elapsedMicroSec > 0) { - if (elapsedMillis > metadataLatencyWarnThresholdMillis) { - LOG.warn("{} received inprogress log segment in {} millis: {}", - new Object[] { getFullyQualifiedName(), elapsedMillis, metadata }); - } - getInprogressSegmentStat.registerSuccessfulEvent(elapsedMicroSec); - } else { - negativeGetInprogressSegmentStat.registerSuccessfulEvent(-elapsedMicroSec); - } - } else { - long elapsedMillis = ts - metadata.getCompletionTime(); - long elapsedMicroSec = TimeUnit.MILLISECONDS.toMicros(elapsedMillis); - if (elapsedMicroSec > 0) { - if (elapsedMillis > metadataLatencyWarnThresholdMillis) { - LOG.warn("{} received completed log segment in {} millis : {}", - new Object[] { getFullyQualifiedName(), elapsedMillis, metadata }); - } - getCompletedSegmentStat.registerSuccessfulEvent(elapsedMicroSec); - } else { - negativeGetCompletedSegmentStat.registerSuccessfulEvent(-elapsedMicroSec); - } - } - } - } - - /** - * Read log segment <i>name</i> from the cache. - * - * @param name name of the log segment - * @return log segment metadata - */ - protected LogSegmentMetadata readLogSegmentFromCache(String name) { - return logSegmentCache.get(name); - } - - /** - * Remove the log segment <i>name</i> from the cache. - * - * @param name name of the log segment. - * @return log segment metadata - */ - protected LogSegmentMetadata removeLogSegmentFromCache(String name) { - metadataCache.invalidate(name); - return logSegmentCache.remove(name); - } - - /** - * Update the log segment cache with updated mapping - * - * @param logSegmentsRemoved log segments removed - * @param logSegmentsAdded log segments added - */ - protected void updateLogSegmentCache(Set<String> logSegmentsRemoved, - Map<String, LogSegmentMetadata> logSegmentsAdded) { - for (String segmentName : logSegmentsRemoved) { - metadataCache.invalidate(segmentName); - } - for (Map.Entry<String, LogSegmentMetadata> entry : logSegmentsAdded.entrySet()) { - metadataCache.put(entry.getKey(), entry.getValue()); - } - logSegmentCache.update(logSegmentsRemoved, logSegmentsAdded); - } - - /** - * Read the log segments from the store and register a listener - * @param comparator - * @param segmentFilter - * @param logSegmentNamesListener - * @return future represents the result of log segments - */ - public Future<Versioned<List<LogSegmentMetadata>>> readLogSegmentsFromStore( - final Comparator<LogSegmentMetadata> comparator, - final LogSegmentFilter segmentFilter, - final LogSegmentNamesListener logSegmentNamesListener) { - final Promise<Versioned<List<LogSegmentMetadata>>> readResult = - new Promise<Versioned<List<LogSegmentMetadata>>>(); - metadataStore.getLogSegmentNames(logMetadata.getLogSegmentsPath(), logSegmentNamesListener) - .addEventListener(new FutureEventListener<Versioned<List<String>>>() { - @Override - public void onFailure(Throwable cause) { - FutureUtils.setException(readResult, cause); - } - - @Override - public void onSuccess(Versioned<List<String>> logSegmentNames) { - readLogSegmentsFromStore(logSegmentNames, comparator, segmentFilter, readResult); - } - }); - return readResult; - } - - protected void readLogSegmentsFromStore(final Versioned<List<String>> logSegmentNames, - final Comparator<LogSegmentMetadata> comparator, - final LogSegmentFilter segmentFilter, - final Promise<Versioned<List<LogSegmentMetadata>>> readResult) { - Set<String> segmentsReceived = new HashSet<String>(); - segmentsReceived.addAll(segmentFilter.filter(logSegmentNames.getValue())); - Set<String> segmentsAdded; - final Set<String> removedSegments = Collections.synchronizedSet(new HashSet<String>()); - final Map<String, LogSegmentMetadata> addedSegments = - Collections.synchronizedMap(new HashMap<String, LogSegmentMetadata>()); - Pair<Set<String>, Set<String>> segmentChanges = logSegmentCache.diff(segmentsReceived); - segmentsAdded = segmentChanges.getLeft(); - removedSegments.addAll(segmentChanges.getRight()); - - if (segmentsAdded.isEmpty()) { - if (LOG.isTraceEnabled()) { - LOG.trace("No segments added for {}.", getFullyQualifiedName()); - } - - // update the cache before #getCachedLogSegments to return - updateLogSegmentCache(removedSegments, addedSegments); - - List<LogSegmentMetadata> segmentList; - try { - segmentList = getCachedLogSegments(comparator); - } catch (UnexpectedException e) { - FutureUtils.setException(readResult, e); - return; - } - - FutureUtils.setValue(readResult, - new Versioned<List<LogSegmentMetadata>>(segmentList, logSegmentNames.getVersion())); - return; - } - - final AtomicInteger numChildren = new AtomicInteger(segmentsAdded.size()); - final AtomicInteger numFailures = new AtomicInteger(0); - for (final String segment: segmentsAdded) { - String logSegmentPath = logMetadata.getLogSegmentPath(segment); - LogSegmentMetadata cachedSegment = metadataCache.get(logSegmentPath); - if (null != cachedSegment) { - addedSegments.put(segment, cachedSegment); - completeReadLogSegmentsFromStore( - removedSegments, - addedSegments, - comparator, - readResult, - logSegmentNames.getVersion(), - numChildren, - numFailures); - continue; - } - metadataStore.getLogSegment(logSegmentPath) - .addEventListener(new FutureEventListener<LogSegmentMetadata>() { - - @Override - public void onSuccess(LogSegmentMetadata result) { - addedSegments.put(segment, result); - complete(); - } - - @Override - public void onFailure(Throwable cause) { - // LogSegmentNotFoundException exception is possible in two cases - // 1. A log segment was deleted by truncation between the call to getChildren and read - // attempt on the znode corresponding to the segment - // 2. In progress segment has been completed => inprogress ZNode does not exist - if (cause instanceof LogSegmentNotFoundException) { - removedSegments.add(segment); - complete(); - } else { - // fail fast - if (1 == numFailures.incrementAndGet()) { - FutureUtils.setException(readResult, cause); - return; - } - } - } - - private void complete() { - completeReadLogSegmentsFromStore( - removedSegments, - addedSegments, - comparator, - readResult, - logSegmentNames.getVersion(), - numChildren, - numFailures); - } - }); - } - } - - private void completeReadLogSegmentsFromStore(final Set<String> removedSegments, - final Map<String, LogSegmentMetadata> addedSegments, - final Comparator<LogSegmentMetadata> comparator, - final Promise<Versioned<List<LogSegmentMetadata>>> readResult, - final Version logSegmentNamesVersion, - final AtomicInteger numChildren, - final AtomicInteger numFailures) { - if (0 != numChildren.decrementAndGet()) { - return; - } - if (numFailures.get() > 0) { - return; - } - // update the cache only when fetch completed and before #getCachedLogSegments - updateLogSegmentCache(removedSegments, addedSegments); - List<LogSegmentMetadata> segmentList; - try { - segmentList = getCachedLogSegments(comparator); - } catch (UnexpectedException e) { - FutureUtils.setException(readResult, e); - return; - } - FutureUtils.setValue(readResult, - new Versioned<List<LogSegmentMetadata>>(segmentList, logSegmentNamesVersion)); - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java deleted file mode 100644 index 8aa00e7..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java +++ /dev/null @@ -1,431 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.TimeUnit; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.twitter.distributedlog.callback.LogSegmentListener; -import com.twitter.distributedlog.callback.LogSegmentNamesListener; -import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; -import com.twitter.distributedlog.exceptions.DLIllegalStateException; -import com.twitter.distributedlog.exceptions.LockingException; -import com.twitter.distributedlog.exceptions.LogNotFoundException; -import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException; -import com.twitter.distributedlog.exceptions.UnexpectedException; -import com.twitter.distributedlog.logsegment.LogSegmentEntryStore; -import com.twitter.distributedlog.metadata.LogMetadataForReader; -import com.twitter.distributedlog.lock.DistributedLock; -import com.twitter.distributedlog.logsegment.LogSegmentFilter; -import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache; -import com.twitter.distributedlog.metadata.LogStreamMetadataStore; -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.distributedlog.util.OrderedScheduler; -import com.twitter.distributedlog.util.Utils; -import com.twitter.util.ExceptionalFunction; -import com.twitter.util.Function; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import com.twitter.util.Return; -import com.twitter.util.Throw; -import com.twitter.util.Try; -import org.apache.bookkeeper.stats.AlertStatsLogger; -import org.apache.bookkeeper.stats.NullStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.util.SafeRunnable; -import org.apache.bookkeeper.versioning.Version; -import org.apache.bookkeeper.versioning.Versioned; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; - -import javax.annotation.Nullable; - -/** - * Log Handler for Readers. - * <h3>Metrics</h3> - * - * <h4>ReadAhead Worker</h4> - * Most of readahead stats are exposed under scope `readahead_worker`. Only readahead exceptions are exposed - * in parent scope via <code>readAheadExceptionsLogger</code>. - * <ul> - * <li> `readahead_worker`/wait: counter. number of waits that readahead worker is waiting. If this keeps increasing, - * it usually means readahead keep getting full because of reader slows down reading. - * <li> `readahead_worker`/repositions: counter. number of repositions that readhead worker encounters. reposition - * means that a readahead worker finds that it isn't advancing to a new log segment and force re-positioning. - * <li> `readahead_worker`/entry_piggy_back_hits: counter. it increases when the last add confirmed being advanced - * because of the piggy-back lac. - * <li> `readahead_worker`/entry_piggy_back_misses: counter. it increases when the last add confirmed isn't advanced - * by a read entry because it doesn't piggy back a newer lac. - * <li> `readahead_worker`/read_entries: opstats. stats on number of entries read per readahead read batch. - * <li> `readahead_worker`/read_lac_counter: counter. stats on the number of readLastConfirmed operations - * <li> `readahead_worker`/read_lac_and_entry_counter: counter. stats on the number of readLastConfirmedAndEntry - * operations. - * <li> `readahead_worker`/cache_full: counter. it increases each time readahead worker finds cache become full. - * If it keeps increasing, that means reader slows down reading. - * <li> `readahead_worker`/resume: opstats. stats on readahead worker resuming reading from wait state. - * <li> `readahead_worker`/read_lac_lag: opstats. stats on the number of entries diff between the lac reader knew - * last time and the lac that it received. if `lag` between two subsequent lacs is high, that might means delay - * might be high. because reader is only allowed to read entries after lac is advanced. - * <li> `readahead_worker`/long_poll_interruption: opstats. stats on the number of interruptions happened to long - * poll. the interruptions are usually because of receiving zookeeper notifications. - * <li> `readahead_worker`/notification_execution: opstats. stats on executions over the notifications received from - * zookeeper. - * <li> `readahead_worker`/metadata_reinitialization: opstats. stats on metadata reinitialization after receiving - * notifcation from log segments updates. - * <li> `readahead_worker`/idle_reader_warn: counter. it increases each time the readahead worker detects itself - * becoming idle. - * </ul> - * <h4>Read Lock</h4> - * All read lock related stats are exposed under scope `read_lock`. - * for detail stats. - */ -class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { - static final Logger LOG = LoggerFactory.getLogger(BKLogReadHandler.class); - - protected final LogMetadataForReader logMetadataForReader; - - protected final DynamicDistributedLogConfiguration dynConf; - - private final Optional<String> subscriberId; - private DistributedLock readLock; - private Future<Void> lockAcquireFuture; - - // notify the state change about the read handler - protected final AsyncNotification readerStateNotification; - - // log segments listener - protected boolean logSegmentsNotificationDisabled = false; - protected final CopyOnWriteArraySet<LogSegmentListener> listeners = - new CopyOnWriteArraySet<LogSegmentListener>(); - protected Versioned<List<LogSegmentMetadata>> lastNotifiedLogSegments = - new Versioned<List<LogSegmentMetadata>>(null, Version.NEW); - - // stats - private final StatsLogger perLogStatsLogger; - - /** - * Construct a Bookkeeper journal manager. - */ - BKLogReadHandler(LogMetadataForReader logMetadata, - Optional<String> subscriberId, - DistributedLogConfiguration conf, - DynamicDistributedLogConfiguration dynConf, - LogStreamMetadataStore streamMetadataStore, - LogSegmentMetadataCache metadataCache, - LogSegmentEntryStore entryStore, - OrderedScheduler scheduler, - AlertStatsLogger alertStatsLogger, - StatsLogger statsLogger, - StatsLogger perLogStatsLogger, - String clientId, - AsyncNotification readerStateNotification, - boolean isHandleForReading) { - super(logMetadata, - conf, - streamMetadataStore, - metadataCache, - entryStore, - scheduler, - statsLogger, - alertStatsLogger, - clientId); - this.logMetadataForReader = logMetadata; - this.dynConf = dynConf; - this.perLogStatsLogger = - isHandleForReading ? perLogStatsLogger : NullStatsLogger.INSTANCE; - this.readerStateNotification = readerStateNotification; - this.subscriberId = subscriberId; - } - - @VisibleForTesting - String getReadLockPath() { - return logMetadataForReader.getReadLockPath(subscriberId); - } - - <T> void satisfyPromiseAsync(final Promise<T> promise, final Try<T> result) { - scheduler.submit(new SafeRunnable() { - @Override - public void safeRun() { - promise.update(result); - } - }); - } - - Future<Void> checkLogStreamExists() { - return streamMetadataStore.logExists(logMetadata.getUri(), logMetadata.getLogName()); - } - - /** - * Elective stream lock--readers are not required to acquire the lock before using the stream. - */ - synchronized Future<Void> lockStream() { - if (null == lockAcquireFuture) { - lockAcquireFuture = streamMetadataStore.createReadLock(logMetadataForReader, subscriberId) - .flatMap(new ExceptionalFunction<DistributedLock, Future<Void>>() { - @Override - public Future<Void> applyE(DistributedLock lock) throws Throwable { - BKLogReadHandler.this.readLock = lock; - LOG.info("acquiring readlock {} at {}", getLockClientId(), getReadLockPath()); - return acquireLockOnExecutorThread(lock); - } - }); - } - return lockAcquireFuture; - } - - /** - * Begin asynchronous lock acquire, but ensure that the returned future is satisfied on an - * executor service thread. - */ - Future<Void> acquireLockOnExecutorThread(DistributedLock lock) throws LockingException { - final Future<? extends DistributedLock> acquireFuture = lock.asyncAcquire(); - - // The future we return must be satisfied on an executor service thread. If we simply - // return the future returned by asyncAcquire, user callbacks may end up running in - // the lock state executor thread, which will cause deadlocks and introduce latency - // etc. - final Promise<Void> threadAcquirePromise = new Promise<Void>(); - threadAcquirePromise.setInterruptHandler(new Function<Throwable, BoxedUnit>() { - @Override - public BoxedUnit apply(Throwable t) { - FutureUtils.cancel(acquireFuture); - return null; - } - }); - acquireFuture.addEventListener(new FutureEventListener<DistributedLock>() { - @Override - public void onSuccess(DistributedLock lock) { - LOG.info("acquired readlock {} at {}", getLockClientId(), getReadLockPath()); - satisfyPromiseAsync(threadAcquirePromise, new Return<Void>(null)); - } - - @Override - public void onFailure(Throwable cause) { - LOG.info("failed to acquire readlock {} at {}", - new Object[]{ getLockClientId(), getReadLockPath(), cause }); - satisfyPromiseAsync(threadAcquirePromise, new Throw<Void>(cause)); - } - }); - return threadAcquirePromise; - } - - /** - * Check ownership of elective stream lock. - */ - void checkReadLock() throws DLIllegalStateException, LockingException { - synchronized (this) { - if ((null == lockAcquireFuture) || - (!lockAcquireFuture.isDefined())) { - throw new DLIllegalStateException("Attempt to check for lock before it has been acquired successfully"); - } - } - - readLock.checkOwnership(); - } - - public Future<Void> asyncClose() { - DistributedLock lockToClose; - synchronized (this) { - if (null != lockAcquireFuture && !lockAcquireFuture.isDefined()) { - FutureUtils.cancel(lockAcquireFuture); - } - lockToClose = readLock; - } - return Utils.closeSequence(scheduler, lockToClose) - .flatMap(new AbstractFunction1<Void, Future<Void>>() { - @Override - public Future<Void> apply(Void result) { - // unregister the log segment listener - metadataStore.unregisterLogSegmentListener(logMetadata.getLogSegmentsPath(), BKLogReadHandler.this); - return Future.Void(); - } - }); - } - - @Override - public Future<Void> asyncAbort() { - return asyncClose(); - } - - /** - * Start fetch the log segments and register the {@link LogSegmentNamesListener}. - * The future is satisfied only on a successful fetch or encountered a fatal failure. - * - * @return future represents the fetch result - */ - Future<Versioned<List<LogSegmentMetadata>>> asyncStartFetchLogSegments() { - Promise<Versioned<List<LogSegmentMetadata>>> promise = - new Promise<Versioned<List<LogSegmentMetadata>>>(); - asyncStartFetchLogSegments(promise); - return promise; - } - - void asyncStartFetchLogSegments(final Promise<Versioned<List<LogSegmentMetadata>>> promise) { - readLogSegmentsFromStore( - LogSegmentMetadata.COMPARATOR, - LogSegmentFilter.DEFAULT_FILTER, - this).addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { - @Override - public void onFailure(Throwable cause) { - if (cause instanceof LogNotFoundException || - cause instanceof LogSegmentNotFoundException || - cause instanceof UnexpectedException) { - // indicate some inconsistent behavior, abort - metadataException.compareAndSet(null, (IOException) cause); - // notify the reader that read handler is in error state - notifyReaderOnError(cause); - FutureUtils.setException(promise, cause); - return; - } - scheduler.schedule(new Runnable() { - @Override - public void run() { - asyncStartFetchLogSegments(promise); - } - }, conf.getZKRetryBackoffMaxMillis(), TimeUnit.MILLISECONDS); - } - - @Override - public void onSuccess(Versioned<List<LogSegmentMetadata>> segments) { - // no-op - FutureUtils.setValue(promise, segments); - } - }); - } - - @VisibleForTesting - void disableReadAheadLogSegmentsNotification() { - logSegmentsNotificationDisabled = true; - } - - @Override - public void onSegmentsUpdated(final Versioned<List<String>> segments) { - synchronized (this) { - if (lastNotifiedLogSegments.getVersion() != Version.NEW && - lastNotifiedLogSegments.getVersion().compare(segments.getVersion()) != Version.Occurred.BEFORE) { - // the log segments has been read, and it is possibly a retry from last segments update - return; - } - } - - Promise<Versioned<List<LogSegmentMetadata>>> readLogSegmentsPromise = - new Promise<Versioned<List<LogSegmentMetadata>>>(); - readLogSegmentsPromise.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { - @Override - public void onFailure(Throwable cause) { - if (cause instanceof LogNotFoundException || - cause instanceof LogSegmentNotFoundException || - cause instanceof UnexpectedException) { - // indicate some inconsistent behavior, abort - metadataException.compareAndSet(null, (IOException) cause); - // notify the reader that read handler is in error state - notifyReaderOnError(cause); - return; - } - scheduler.schedule(new Runnable() { - @Override - public void run() { - onSegmentsUpdated(segments); - } - }, conf.getZKRetryBackoffMaxMillis(), TimeUnit.MILLISECONDS); - } - - @Override - public void onSuccess(Versioned<List<LogSegmentMetadata>> logSegments) { - List<LogSegmentMetadata> segmentsToNotify = null; - synchronized (BKLogReadHandler.this) { - Versioned<List<LogSegmentMetadata>> lastLogSegments = lastNotifiedLogSegments; - if (lastLogSegments.getVersion() == Version.NEW || - lastLogSegments.getVersion().compare(logSegments.getVersion()) == Version.Occurred.BEFORE) { - lastNotifiedLogSegments = logSegments; - segmentsToNotify = logSegments.getValue(); - } - } - if (null != segmentsToNotify) { - notifyUpdatedLogSegments(segmentsToNotify); - } - } - }); - // log segments list is updated, read their metadata - readLogSegmentsFromStore( - segments, - LogSegmentMetadata.COMPARATOR, - LogSegmentFilter.DEFAULT_FILTER, - readLogSegmentsPromise); - } - - @Override - public void onLogStreamDeleted() { - notifyLogStreamDeleted(); - } - - // - // Listener for log segments - // - - protected void registerListener(@Nullable LogSegmentListener listener) { - if (null != listener) { - listeners.add(listener); - } - } - - protected void unregisterListener(@Nullable LogSegmentListener listener) { - if (null != listener) { - listeners.remove(listener); - } - } - - protected void notifyUpdatedLogSegments(List<LogSegmentMetadata> segments) { - if (logSegmentsNotificationDisabled) { - return; - } - - for (LogSegmentListener listener : listeners) { - List<LogSegmentMetadata> listToReturn = - new ArrayList<LogSegmentMetadata>(segments); - Collections.sort(listToReturn, LogSegmentMetadata.COMPARATOR); - listener.onSegmentsUpdated(listToReturn); - } - } - - protected void notifyLogStreamDeleted() { - if (logSegmentsNotificationDisabled) { - return; - } - - for (LogSegmentListener listener : listeners) { - listener.onLogStreamDeleted(); - } - } - - // notify the errors - protected void notifyReaderOnError(Throwable cause) { - if (null != readerStateNotification) { - readerStateNotification.notifyOnError(cause); - } - } -}