http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogHandler.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogHandler.java new file mode 100644 index 0000000..07ae0ff --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogHandler.java @@ -0,0 +1,715 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import com.google.common.base.Stopwatch; +import org.apache.distributedlog.callback.LogSegmentNamesListener; +import org.apache.distributedlog.exceptions.LogEmptyException; +import org.apache.distributedlog.exceptions.LogSegmentNotFoundException; +import org.apache.distributedlog.exceptions.UnexpectedException; +import org.apache.distributedlog.logsegment.LogSegmentEntryStore; +import org.apache.distributedlog.metadata.LogMetadata; +import org.apache.distributedlog.io.AsyncAbortable; +import org.apache.distributedlog.io.AsyncCloseable; +import org.apache.distributedlog.logsegment.LogSegmentMetadataCache; +import org.apache.distributedlog.logsegment.PerStreamLogSegmentCache; +import org.apache.distributedlog.logsegment.LogSegmentFilter; +import org.apache.distributedlog.logsegment.LogSegmentMetadataStore; +import org.apache.distributedlog.metadata.LogStreamMetadataStore; +import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.util.OrderedScheduler; +import com.twitter.util.Function; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; +import com.twitter.util.Promise; +import org.apache.bookkeeper.stats.AlertStatsLogger; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.versioning.Version; +import org.apache.bookkeeper.versioning.Versioned; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +/** + * The base class about log handler on managing log segments. + * + * <h3>Metrics</h3> + * The log handler is a base class on managing log segments. so all the metrics + * here are related to log segments retrieval and exposed under `logsegments`. + * These metrics are all OpStats, in the format of <code>`scope`/logsegments/`op`</code>. + * <p> + * Those operations are: + * <ul> + * <li>get_inprogress_segment: time between the inprogress log segment created and + * the handler read it. + * <li>get_completed_segment: time between a log segment is turned to completed and + * the handler read it. + * <li>negative_get_inprogress_segment: record the negative values for `get_inprogress_segment`. + * <li>negative_get_completed_segment: record the negative values for `get_completed_segment`. + * <li>recover_last_entry: recovering last entry from a log segment + * <li>recover_scanned_entries: the number of entries that are scanned during recovering. + * </ul> + * @see BKLogWriteHandler + * @see BKLogReadHandler + */ +public abstract class BKLogHandler implements AsyncCloseable, AsyncAbortable { + static final Logger LOG = LoggerFactory.getLogger(BKLogHandler.class); + + protected final LogMetadata logMetadata; + protected final DistributedLogConfiguration conf; + protected final LogStreamMetadataStore streamMetadataStore; + protected final LogSegmentMetadataStore metadataStore; + protected final LogSegmentMetadataCache metadataCache; + protected final LogSegmentEntryStore entryStore; + protected final int firstNumEntriesPerReadLastRecordScan; + protected final int maxNumEntriesPerReadLastRecordScan; + protected volatile long lastLedgerRollingTimeMillis = -1; + protected final OrderedScheduler scheduler; + protected final StatsLogger statsLogger; + protected final AlertStatsLogger alertStatsLogger; + protected volatile boolean reportGetSegmentStats = false; + private final String lockClientId; + protected final AtomicReference<IOException> metadataException = new AtomicReference<IOException>(null); + + // Maintain the list of log segments per stream + protected final PerStreamLogSegmentCache logSegmentCache; + + // trace + protected final long metadataLatencyWarnThresholdMillis; + + // Stats + private final OpStatsLogger getInprogressSegmentStat; + private final OpStatsLogger getCompletedSegmentStat; + private final OpStatsLogger negativeGetInprogressSegmentStat; + private final OpStatsLogger negativeGetCompletedSegmentStat; + private final OpStatsLogger recoverLastEntryStats; + private final OpStatsLogger recoverScannedEntriesStats; + + /** + * Construct a Bookkeeper journal manager. + */ + BKLogHandler(LogMetadata metadata, + DistributedLogConfiguration conf, + LogStreamMetadataStore streamMetadataStore, + LogSegmentMetadataCache metadataCache, + LogSegmentEntryStore entryStore, + OrderedScheduler scheduler, + StatsLogger statsLogger, + AlertStatsLogger alertStatsLogger, + String lockClientId) { + this.logMetadata = metadata; + this.conf = conf; + this.scheduler = scheduler; + this.statsLogger = statsLogger; + this.alertStatsLogger = alertStatsLogger; + this.logSegmentCache = new PerStreamLogSegmentCache( + metadata.getLogName(), + conf.isLogSegmentSequenceNumberValidationEnabled()); + firstNumEntriesPerReadLastRecordScan = conf.getFirstNumEntriesPerReadLastRecordScan(); + maxNumEntriesPerReadLastRecordScan = conf.getMaxNumEntriesPerReadLastRecordScan(); + this.streamMetadataStore = streamMetadataStore; + this.metadataStore = streamMetadataStore.getLogSegmentMetadataStore(); + this.metadataCache = metadataCache; + this.entryStore = entryStore; + this.lockClientId = lockClientId; + + // Traces + this.metadataLatencyWarnThresholdMillis = conf.getMetadataLatencyWarnThresholdMillis(); + + // Stats + StatsLogger segmentsLogger = statsLogger.scope("logsegments"); + getInprogressSegmentStat = segmentsLogger.getOpStatsLogger("get_inprogress_segment"); + getCompletedSegmentStat = segmentsLogger.getOpStatsLogger("get_completed_segment"); + negativeGetInprogressSegmentStat = segmentsLogger.getOpStatsLogger("negative_get_inprogress_segment"); + negativeGetCompletedSegmentStat = segmentsLogger.getOpStatsLogger("negative_get_completed_segment"); + recoverLastEntryStats = segmentsLogger.getOpStatsLogger("recover_last_entry"); + recoverScannedEntriesStats = segmentsLogger.getOpStatsLogger("recover_scanned_entries"); + } + + BKLogHandler checkMetadataException() throws IOException { + if (null != metadataException.get()) { + throw metadataException.get(); + } + return this; + } + + public void reportGetSegmentStats(boolean enabled) { + this.reportGetSegmentStats = enabled; + } + + public String getLockClientId() { + return lockClientId; + } + + public Future<LogRecordWithDLSN> asyncGetFirstLogRecord() { + final Promise<LogRecordWithDLSN> promise = new Promise<LogRecordWithDLSN>(); + streamMetadataStore.logExists(logMetadata.getUri(), logMetadata.getLogName()) + .addEventListener(new FutureEventListener<Void>() { + @Override + public void onSuccess(Void value) { + readLogSegmentsFromStore( + LogSegmentMetadata.COMPARATOR, + LogSegmentFilter.DEFAULT_FILTER, + null + ).addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { + + @Override + public void onSuccess(Versioned<List<LogSegmentMetadata>> ledgerList) { + if (ledgerList.getValue().isEmpty()) { + promise.setException(new LogEmptyException("Log " + getFullyQualifiedName() + " has no records")); + return; + } + Future<LogRecordWithDLSN> firstRecord = null; + for (LogSegmentMetadata ledger : ledgerList.getValue()) { + if (!ledger.isTruncated() && (ledger.getRecordCount() > 0 || ledger.isInProgress())) { + firstRecord = asyncReadFirstUserRecord(ledger, DLSN.InitialDLSN); + break; + } + } + if (null != firstRecord) { + promise.become(firstRecord); + } else { + promise.setException(new LogEmptyException("Log " + getFullyQualifiedName() + " has no records")); + } + } + + @Override + public void onFailure(Throwable cause) { + promise.setException(cause); + } + }); + } + + @Override + public void onFailure(Throwable cause) { + promise.setException(cause); + } + }); + return promise; + } + + public Future<LogRecordWithDLSN> getLastLogRecordAsync(final boolean recover, final boolean includeEndOfStream) { + final Promise<LogRecordWithDLSN> promise = new Promise<LogRecordWithDLSN>(); + streamMetadataStore.logExists(logMetadata.getUri(), logMetadata.getLogName()) + .addEventListener(new FutureEventListener<Void>() { + @Override + public void onSuccess(Void value) { + readLogSegmentsFromStore( + LogSegmentMetadata.DESC_COMPARATOR, + LogSegmentFilter.DEFAULT_FILTER, + null + ).addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { + + @Override + public void onSuccess(Versioned<List<LogSegmentMetadata>> ledgerList) { + if (ledgerList.getValue().isEmpty()) { + promise.setException( + new LogEmptyException("Log " + getFullyQualifiedName() + " has no records")); + return; + } + asyncGetLastLogRecord( + ledgerList.getValue().iterator(), + promise, + recover, + false, + includeEndOfStream); + } + + @Override + public void onFailure(Throwable cause) { + promise.setException(cause); + } + }); + } + + @Override + public void onFailure(Throwable cause) { + promise.setException(cause); + } + }); + return promise; + } + + private void asyncGetLastLogRecord(final Iterator<LogSegmentMetadata> ledgerIter, + final Promise<LogRecordWithDLSN> promise, + final boolean fence, + final boolean includeControlRecord, + final boolean includeEndOfStream) { + if (ledgerIter.hasNext()) { + LogSegmentMetadata metadata = ledgerIter.next(); + asyncReadLastRecord(metadata, fence, includeControlRecord, includeEndOfStream).addEventListener( + new FutureEventListener<LogRecordWithDLSN>() { + @Override + public void onSuccess(LogRecordWithDLSN record) { + if (null == record) { + asyncGetLastLogRecord(ledgerIter, promise, fence, includeControlRecord, includeEndOfStream); + } else { + promise.setValue(record); + } + } + + @Override + public void onFailure(Throwable cause) { + promise.setException(cause); + } + } + ); + } else { + promise.setException(new LogEmptyException("Log " + getFullyQualifiedName() + " has no records")); + } + } + + private Future<LogRecordWithDLSN> asyncReadFirstUserRecord(LogSegmentMetadata ledger, DLSN beginDLSN) { + return ReadUtils.asyncReadFirstUserRecord( + getFullyQualifiedName(), + ledger, + firstNumEntriesPerReadLastRecordScan, + maxNumEntriesPerReadLastRecordScan, + new AtomicInteger(0), + scheduler, + entryStore, + beginDLSN + ); + } + + /** + * This is a helper method to compactly return the record count between two records, the first denoted by + * beginDLSN and the second denoted by endPosition. Its up to the caller to ensure that endPosition refers to + * position in the same ledger as beginDLSN. + */ + private Future<Long> asyncGetLogRecordCount(LogSegmentMetadata ledger, final DLSN beginDLSN, final long endPosition) { + return asyncReadFirstUserRecord(ledger, beginDLSN).map(new Function<LogRecordWithDLSN, Long>() { + public Long apply(final LogRecordWithDLSN beginRecord) { + long recordCount = 0; + if (null != beginRecord) { + recordCount = endPosition + 1 - beginRecord.getLastPositionWithinLogSegment(); + } + return recordCount; + } + }); + } + + /** + * Ledger metadata tells us how many records are in each completed segment, but for the first and last segments + * we may have to crack open the entry and count. For the first entry, we need to do so because beginDLSN may be + * an interior entry. For the last entry, if it is inprogress, we need to recover it and find the last user + * entry. + */ + private Future<Long> asyncGetLogRecordCount(final LogSegmentMetadata ledger, final DLSN beginDLSN) { + if (ledger.isInProgress() && ledger.isDLSNinThisSegment(beginDLSN)) { + return asyncReadLastUserRecord(ledger).flatMap(new Function<LogRecordWithDLSN, Future<Long>>() { + public Future<Long> apply(final LogRecordWithDLSN endRecord) { + if (null != endRecord) { + return asyncGetLogRecordCount(ledger, beginDLSN, endRecord.getLastPositionWithinLogSegment() /* end position */); + } else { + return Future.value((long) 0); + } + } + }); + } else if (ledger.isInProgress()) { + return asyncReadLastUserRecord(ledger).map(new Function<LogRecordWithDLSN, Long>() { + public Long apply(final LogRecordWithDLSN endRecord) { + if (null != endRecord) { + return (long) endRecord.getLastPositionWithinLogSegment(); + } else { + return (long) 0; + } + } + }); + } else if (ledger.isDLSNinThisSegment(beginDLSN)) { + return asyncGetLogRecordCount(ledger, beginDLSN, ledger.getRecordCount() /* end position */); + } else { + return Future.value((long) ledger.getRecordCount()); + } + } + + /** + * Get a count of records between beginDLSN and the end of the stream. + * + * @param beginDLSN dlsn marking the start of the range + * @return the count of records present in the range + */ + public Future<Long> asyncGetLogRecordCount(final DLSN beginDLSN) { + return streamMetadataStore.logExists(logMetadata.getUri(), logMetadata.getLogName()) + .flatMap(new Function<Void, Future<Long>>() { + public Future<Long> apply(Void done) { + + return readLogSegmentsFromStore( + LogSegmentMetadata.COMPARATOR, + LogSegmentFilter.DEFAULT_FILTER, + null + ).flatMap(new Function<Versioned<List<LogSegmentMetadata>>, Future<Long>>() { + public Future<Long> apply(Versioned<List<LogSegmentMetadata>> ledgerList) { + + List<Future<Long>> futureCounts = new ArrayList<Future<Long>>(ledgerList.getValue().size()); + for (LogSegmentMetadata ledger : ledgerList.getValue()) { + if (ledger.getLogSegmentSequenceNumber() >= beginDLSN.getLogSegmentSequenceNo()) { + futureCounts.add(asyncGetLogRecordCount(ledger, beginDLSN)); + } + } + return Future.collect(futureCounts).map(new Function<List<Long>, Long>() { + public Long apply(List<Long> counts) { + return sum(counts); + } + }); + } + }); + } + }); + } + + private Long sum(List<Long> values) { + long sum = 0; + for (Long value : values) { + sum += value; + } + return sum; + } + + @Override + public Future<Void> asyncAbort() { + return asyncClose(); + } + + public Future<LogRecordWithDLSN> asyncReadLastUserRecord(final LogSegmentMetadata l) { + return asyncReadLastRecord(l, false, false, false); + } + + public Future<LogRecordWithDLSN> asyncReadLastRecord(final LogSegmentMetadata l, + final boolean fence, + final boolean includeControl, + final boolean includeEndOfStream) { + final AtomicInteger numRecordsScanned = new AtomicInteger(0); + final Stopwatch stopwatch = Stopwatch.createStarted(); + return ReadUtils.asyncReadLastRecord( + getFullyQualifiedName(), + l, + fence, + includeControl, + includeEndOfStream, + firstNumEntriesPerReadLastRecordScan, + maxNumEntriesPerReadLastRecordScan, + numRecordsScanned, + scheduler, + entryStore + ).addEventListener(new FutureEventListener<LogRecordWithDLSN>() { + @Override + public void onSuccess(LogRecordWithDLSN value) { + recoverLastEntryStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); + recoverScannedEntriesStats.registerSuccessfulEvent(numRecordsScanned.get()); + } + + @Override + public void onFailure(Throwable cause) { + recoverLastEntryStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); + } + }); + } + + protected void setLastLedgerRollingTimeMillis(long rollingTimeMillis) { + if (lastLedgerRollingTimeMillis < rollingTimeMillis) { + lastLedgerRollingTimeMillis = rollingTimeMillis; + } + } + + public String getFullyQualifiedName() { + return logMetadata.getFullyQualifiedName(); + } + + // Log Segments Related Functions + // + // ***Note*** + // Get log segment list should go through #getCachedLogSegments as we need to assign start sequence id + // for inprogress log segment so the reader could generate the right sequence id. + // + // ***PerStreamCache vs LogSegmentMetadataCache ** + // The per stream cache maintains the list of segments per stream, while the metadata cache + // maintains log segments. The metadata cache is just to reduce the access to zookeeper, it is + // okay that some of the log segments are not in the cache; however the per stream cache can not + // have any gaps between log segment sequence numbers which it has to be accurate. + + /** + * Get the cached log segments. + * + * @param comparator the comparator to sort the returned log segments. + * @return list of sorted log segments + * @throws UnexpectedException if unexpected condition detected. + */ + protected List<LogSegmentMetadata> getCachedLogSegments(Comparator<LogSegmentMetadata> comparator) + throws UnexpectedException { + try { + return logSegmentCache.getLogSegments(comparator); + } catch (UnexpectedException ue) { + // the log segments cache went wrong + LOG.error("Unexpected exception on getting log segments from the cache for stream {}", + getFullyQualifiedName(), ue); + metadataException.compareAndSet(null, ue); + throw ue; + } + } + + /** + * Add the segment <i>metadata</i> for <i>name</i> in the cache. + * + * @param name + * segment znode name. + * @param metadata + * segment metadata. + */ + protected void addLogSegmentToCache(String name, LogSegmentMetadata metadata) { + metadataCache.put(metadata.getZkPath(), metadata); + logSegmentCache.add(name, metadata); + // update the last ledger rolling time + if (!metadata.isInProgress() && (lastLedgerRollingTimeMillis < metadata.getCompletionTime())) { + lastLedgerRollingTimeMillis = metadata.getCompletionTime(); + } + + if (reportGetSegmentStats) { + // update stats + long ts = System.currentTimeMillis(); + if (metadata.isInProgress()) { + // as we used timestamp as start tx id we could take it as start time + // NOTE: it is a hack here. + long elapsedMillis = ts - metadata.getFirstTxId(); + long elapsedMicroSec = TimeUnit.MILLISECONDS.toMicros(elapsedMillis); + if (elapsedMicroSec > 0) { + if (elapsedMillis > metadataLatencyWarnThresholdMillis) { + LOG.warn("{} received inprogress log segment in {} millis: {}", + new Object[] { getFullyQualifiedName(), elapsedMillis, metadata }); + } + getInprogressSegmentStat.registerSuccessfulEvent(elapsedMicroSec); + } else { + negativeGetInprogressSegmentStat.registerSuccessfulEvent(-elapsedMicroSec); + } + } else { + long elapsedMillis = ts - metadata.getCompletionTime(); + long elapsedMicroSec = TimeUnit.MILLISECONDS.toMicros(elapsedMillis); + if (elapsedMicroSec > 0) { + if (elapsedMillis > metadataLatencyWarnThresholdMillis) { + LOG.warn("{} received completed log segment in {} millis : {}", + new Object[] { getFullyQualifiedName(), elapsedMillis, metadata }); + } + getCompletedSegmentStat.registerSuccessfulEvent(elapsedMicroSec); + } else { + negativeGetCompletedSegmentStat.registerSuccessfulEvent(-elapsedMicroSec); + } + } + } + } + + /** + * Read log segment <i>name</i> from the cache. + * + * @param name name of the log segment + * @return log segment metadata + */ + protected LogSegmentMetadata readLogSegmentFromCache(String name) { + return logSegmentCache.get(name); + } + + /** + * Remove the log segment <i>name</i> from the cache. + * + * @param name name of the log segment. + * @return log segment metadata + */ + protected LogSegmentMetadata removeLogSegmentFromCache(String name) { + metadataCache.invalidate(name); + return logSegmentCache.remove(name); + } + + /** + * Update the log segment cache with updated mapping + * + * @param logSegmentsRemoved log segments removed + * @param logSegmentsAdded log segments added + */ + protected void updateLogSegmentCache(Set<String> logSegmentsRemoved, + Map<String, LogSegmentMetadata> logSegmentsAdded) { + for (String segmentName : logSegmentsRemoved) { + metadataCache.invalidate(segmentName); + } + for (Map.Entry<String, LogSegmentMetadata> entry : logSegmentsAdded.entrySet()) { + metadataCache.put(entry.getKey(), entry.getValue()); + } + logSegmentCache.update(logSegmentsRemoved, logSegmentsAdded); + } + + /** + * Read the log segments from the store and register a listener + * @param comparator + * @param segmentFilter + * @param logSegmentNamesListener + * @return future represents the result of log segments + */ + public Future<Versioned<List<LogSegmentMetadata>>> readLogSegmentsFromStore( + final Comparator<LogSegmentMetadata> comparator, + final LogSegmentFilter segmentFilter, + final LogSegmentNamesListener logSegmentNamesListener) { + final Promise<Versioned<List<LogSegmentMetadata>>> readResult = + new Promise<Versioned<List<LogSegmentMetadata>>>(); + metadataStore.getLogSegmentNames(logMetadata.getLogSegmentsPath(), logSegmentNamesListener) + .addEventListener(new FutureEventListener<Versioned<List<String>>>() { + @Override + public void onFailure(Throwable cause) { + FutureUtils.setException(readResult, cause); + } + + @Override + public void onSuccess(Versioned<List<String>> logSegmentNames) { + readLogSegmentsFromStore(logSegmentNames, comparator, segmentFilter, readResult); + } + }); + return readResult; + } + + protected void readLogSegmentsFromStore(final Versioned<List<String>> logSegmentNames, + final Comparator<LogSegmentMetadata> comparator, + final LogSegmentFilter segmentFilter, + final Promise<Versioned<List<LogSegmentMetadata>>> readResult) { + Set<String> segmentsReceived = new HashSet<String>(); + segmentsReceived.addAll(segmentFilter.filter(logSegmentNames.getValue())); + Set<String> segmentsAdded; + final Set<String> removedSegments = Collections.synchronizedSet(new HashSet<String>()); + final Map<String, LogSegmentMetadata> addedSegments = + Collections.synchronizedMap(new HashMap<String, LogSegmentMetadata>()); + Pair<Set<String>, Set<String>> segmentChanges = logSegmentCache.diff(segmentsReceived); + segmentsAdded = segmentChanges.getLeft(); + removedSegments.addAll(segmentChanges.getRight()); + + if (segmentsAdded.isEmpty()) { + if (LOG.isTraceEnabled()) { + LOG.trace("No segments added for {}.", getFullyQualifiedName()); + } + + // update the cache before #getCachedLogSegments to return + updateLogSegmentCache(removedSegments, addedSegments); + + List<LogSegmentMetadata> segmentList; + try { + segmentList = getCachedLogSegments(comparator); + } catch (UnexpectedException e) { + FutureUtils.setException(readResult, e); + return; + } + + FutureUtils.setValue(readResult, + new Versioned<List<LogSegmentMetadata>>(segmentList, logSegmentNames.getVersion())); + return; + } + + final AtomicInteger numChildren = new AtomicInteger(segmentsAdded.size()); + final AtomicInteger numFailures = new AtomicInteger(0); + for (final String segment: segmentsAdded) { + String logSegmentPath = logMetadata.getLogSegmentPath(segment); + LogSegmentMetadata cachedSegment = metadataCache.get(logSegmentPath); + if (null != cachedSegment) { + addedSegments.put(segment, cachedSegment); + completeReadLogSegmentsFromStore( + removedSegments, + addedSegments, + comparator, + readResult, + logSegmentNames.getVersion(), + numChildren, + numFailures); + continue; + } + metadataStore.getLogSegment(logSegmentPath) + .addEventListener(new FutureEventListener<LogSegmentMetadata>() { + + @Override + public void onSuccess(LogSegmentMetadata result) { + addedSegments.put(segment, result); + complete(); + } + + @Override + public void onFailure(Throwable cause) { + // LogSegmentNotFoundException exception is possible in two cases + // 1. A log segment was deleted by truncation between the call to getChildren and read + // attempt on the znode corresponding to the segment + // 2. In progress segment has been completed => inprogress ZNode does not exist + if (cause instanceof LogSegmentNotFoundException) { + removedSegments.add(segment); + complete(); + } else { + // fail fast + if (1 == numFailures.incrementAndGet()) { + FutureUtils.setException(readResult, cause); + return; + } + } + } + + private void complete() { + completeReadLogSegmentsFromStore( + removedSegments, + addedSegments, + comparator, + readResult, + logSegmentNames.getVersion(), + numChildren, + numFailures); + } + }); + } + } + + private void completeReadLogSegmentsFromStore(final Set<String> removedSegments, + final Map<String, LogSegmentMetadata> addedSegments, + final Comparator<LogSegmentMetadata> comparator, + final Promise<Versioned<List<LogSegmentMetadata>>> readResult, + final Version logSegmentNamesVersion, + final AtomicInteger numChildren, + final AtomicInteger numFailures) { + if (0 != numChildren.decrementAndGet()) { + return; + } + if (numFailures.get() > 0) { + return; + } + // update the cache only when fetch completed and before #getCachedLogSegments + updateLogSegmentCache(removedSegments, addedSegments); + List<LogSegmentMetadata> segmentList; + try { + segmentList = getCachedLogSegments(comparator); + } catch (UnexpectedException e) { + FutureUtils.setException(readResult, e); + return; + } + FutureUtils.setValue(readResult, + new Versioned<List<LogSegmentMetadata>>(segmentList, logSegmentNamesVersion)); + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java new file mode 100644 index 0000000..c6e2e07 --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java @@ -0,0 +1,431 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.TimeUnit; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import org.apache.distributedlog.callback.LogSegmentListener; +import org.apache.distributedlog.callback.LogSegmentNamesListener; +import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; +import org.apache.distributedlog.exceptions.DLIllegalStateException; +import org.apache.distributedlog.exceptions.LockingException; +import org.apache.distributedlog.exceptions.LogNotFoundException; +import org.apache.distributedlog.exceptions.LogSegmentNotFoundException; +import org.apache.distributedlog.exceptions.UnexpectedException; +import org.apache.distributedlog.logsegment.LogSegmentEntryStore; +import org.apache.distributedlog.metadata.LogMetadataForReader; +import org.apache.distributedlog.lock.DistributedLock; +import org.apache.distributedlog.logsegment.LogSegmentFilter; +import org.apache.distributedlog.logsegment.LogSegmentMetadataCache; +import org.apache.distributedlog.metadata.LogStreamMetadataStore; +import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.util.OrderedScheduler; +import org.apache.distributedlog.util.Utils; +import com.twitter.util.ExceptionalFunction; +import com.twitter.util.Function; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; +import com.twitter.util.Promise; +import com.twitter.util.Return; +import com.twitter.util.Throw; +import com.twitter.util.Try; +import org.apache.bookkeeper.stats.AlertStatsLogger; +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.util.SafeRunnable; +import org.apache.bookkeeper.versioning.Version; +import org.apache.bookkeeper.versioning.Versioned; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.runtime.AbstractFunction1; +import scala.runtime.BoxedUnit; + +import javax.annotation.Nullable; + +/** + * Log Handler for Readers. + * <h3>Metrics</h3> + * + * <h4>ReadAhead Worker</h4> + * Most of readahead stats are exposed under scope `readahead_worker`. Only readahead exceptions are exposed + * in parent scope via <code>readAheadExceptionsLogger</code>. + * <ul> + * <li> `readahead_worker`/wait: counter. number of waits that readahead worker is waiting. If this keeps increasing, + * it usually means readahead keep getting full because of reader slows down reading. + * <li> `readahead_worker`/repositions: counter. number of repositions that readhead worker encounters. reposition + * means that a readahead worker finds that it isn't advancing to a new log segment and force re-positioning. + * <li> `readahead_worker`/entry_piggy_back_hits: counter. it increases when the last add confirmed being advanced + * because of the piggy-back lac. + * <li> `readahead_worker`/entry_piggy_back_misses: counter. it increases when the last add confirmed isn't advanced + * by a read entry because it doesn't piggy back a newer lac. + * <li> `readahead_worker`/read_entries: opstats. stats on number of entries read per readahead read batch. + * <li> `readahead_worker`/read_lac_counter: counter. stats on the number of readLastConfirmed operations + * <li> `readahead_worker`/read_lac_and_entry_counter: counter. stats on the number of readLastConfirmedAndEntry + * operations. + * <li> `readahead_worker`/cache_full: counter. it increases each time readahead worker finds cache become full. + * If it keeps increasing, that means reader slows down reading. + * <li> `readahead_worker`/resume: opstats. stats on readahead worker resuming reading from wait state. + * <li> `readahead_worker`/read_lac_lag: opstats. stats on the number of entries diff between the lac reader knew + * last time and the lac that it received. if `lag` between two subsequent lacs is high, that might means delay + * might be high. because reader is only allowed to read entries after lac is advanced. + * <li> `readahead_worker`/long_poll_interruption: opstats. stats on the number of interruptions happened to long + * poll. the interruptions are usually because of receiving zookeeper notifications. + * <li> `readahead_worker`/notification_execution: opstats. stats on executions over the notifications received from + * zookeeper. + * <li> `readahead_worker`/metadata_reinitialization: opstats. stats on metadata reinitialization after receiving + * notifcation from log segments updates. + * <li> `readahead_worker`/idle_reader_warn: counter. it increases each time the readahead worker detects itself + * becoming idle. + * </ul> + * <h4>Read Lock</h4> + * All read lock related stats are exposed under scope `read_lock`. + * for detail stats. + */ +class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { + static final Logger LOG = LoggerFactory.getLogger(BKLogReadHandler.class); + + protected final LogMetadataForReader logMetadataForReader; + + protected final DynamicDistributedLogConfiguration dynConf; + + private final Optional<String> subscriberId; + private DistributedLock readLock; + private Future<Void> lockAcquireFuture; + + // notify the state change about the read handler + protected final AsyncNotification readerStateNotification; + + // log segments listener + protected boolean logSegmentsNotificationDisabled = false; + protected final CopyOnWriteArraySet<LogSegmentListener> listeners = + new CopyOnWriteArraySet<LogSegmentListener>(); + protected Versioned<List<LogSegmentMetadata>> lastNotifiedLogSegments = + new Versioned<List<LogSegmentMetadata>>(null, Version.NEW); + + // stats + private final StatsLogger perLogStatsLogger; + + /** + * Construct a Bookkeeper journal manager. + */ + BKLogReadHandler(LogMetadataForReader logMetadata, + Optional<String> subscriberId, + DistributedLogConfiguration conf, + DynamicDistributedLogConfiguration dynConf, + LogStreamMetadataStore streamMetadataStore, + LogSegmentMetadataCache metadataCache, + LogSegmentEntryStore entryStore, + OrderedScheduler scheduler, + AlertStatsLogger alertStatsLogger, + StatsLogger statsLogger, + StatsLogger perLogStatsLogger, + String clientId, + AsyncNotification readerStateNotification, + boolean isHandleForReading) { + super(logMetadata, + conf, + streamMetadataStore, + metadataCache, + entryStore, + scheduler, + statsLogger, + alertStatsLogger, + clientId); + this.logMetadataForReader = logMetadata; + this.dynConf = dynConf; + this.perLogStatsLogger = + isHandleForReading ? perLogStatsLogger : NullStatsLogger.INSTANCE; + this.readerStateNotification = readerStateNotification; + this.subscriberId = subscriberId; + } + + @VisibleForTesting + String getReadLockPath() { + return logMetadataForReader.getReadLockPath(subscriberId); + } + + <T> void satisfyPromiseAsync(final Promise<T> promise, final Try<T> result) { + scheduler.submit(new SafeRunnable() { + @Override + public void safeRun() { + promise.update(result); + } + }); + } + + Future<Void> checkLogStreamExists() { + return streamMetadataStore.logExists(logMetadata.getUri(), logMetadata.getLogName()); + } + + /** + * Elective stream lock--readers are not required to acquire the lock before using the stream. + */ + synchronized Future<Void> lockStream() { + if (null == lockAcquireFuture) { + lockAcquireFuture = streamMetadataStore.createReadLock(logMetadataForReader, subscriberId) + .flatMap(new ExceptionalFunction<DistributedLock, Future<Void>>() { + @Override + public Future<Void> applyE(DistributedLock lock) throws Throwable { + BKLogReadHandler.this.readLock = lock; + LOG.info("acquiring readlock {} at {}", getLockClientId(), getReadLockPath()); + return acquireLockOnExecutorThread(lock); + } + }); + } + return lockAcquireFuture; + } + + /** + * Begin asynchronous lock acquire, but ensure that the returned future is satisfied on an + * executor service thread. + */ + Future<Void> acquireLockOnExecutorThread(DistributedLock lock) throws LockingException { + final Future<? extends DistributedLock> acquireFuture = lock.asyncAcquire(); + + // The future we return must be satisfied on an executor service thread. If we simply + // return the future returned by asyncAcquire, user callbacks may end up running in + // the lock state executor thread, which will cause deadlocks and introduce latency + // etc. + final Promise<Void> threadAcquirePromise = new Promise<Void>(); + threadAcquirePromise.setInterruptHandler(new Function<Throwable, BoxedUnit>() { + @Override + public BoxedUnit apply(Throwable t) { + FutureUtils.cancel(acquireFuture); + return null; + } + }); + acquireFuture.addEventListener(new FutureEventListener<DistributedLock>() { + @Override + public void onSuccess(DistributedLock lock) { + LOG.info("acquired readlock {} at {}", getLockClientId(), getReadLockPath()); + satisfyPromiseAsync(threadAcquirePromise, new Return<Void>(null)); + } + + @Override + public void onFailure(Throwable cause) { + LOG.info("failed to acquire readlock {} at {}", + new Object[]{ getLockClientId(), getReadLockPath(), cause }); + satisfyPromiseAsync(threadAcquirePromise, new Throw<Void>(cause)); + } + }); + return threadAcquirePromise; + } + + /** + * Check ownership of elective stream lock. + */ + void checkReadLock() throws DLIllegalStateException, LockingException { + synchronized (this) { + if ((null == lockAcquireFuture) || + (!lockAcquireFuture.isDefined())) { + throw new DLIllegalStateException("Attempt to check for lock before it has been acquired successfully"); + } + } + + readLock.checkOwnership(); + } + + public Future<Void> asyncClose() { + DistributedLock lockToClose; + synchronized (this) { + if (null != lockAcquireFuture && !lockAcquireFuture.isDefined()) { + FutureUtils.cancel(lockAcquireFuture); + } + lockToClose = readLock; + } + return Utils.closeSequence(scheduler, lockToClose) + .flatMap(new AbstractFunction1<Void, Future<Void>>() { + @Override + public Future<Void> apply(Void result) { + // unregister the log segment listener + metadataStore.unregisterLogSegmentListener(logMetadata.getLogSegmentsPath(), BKLogReadHandler.this); + return Future.Void(); + } + }); + } + + @Override + public Future<Void> asyncAbort() { + return asyncClose(); + } + + /** + * Start fetch the log segments and register the {@link LogSegmentNamesListener}. + * The future is satisfied only on a successful fetch or encountered a fatal failure. + * + * @return future represents the fetch result + */ + Future<Versioned<List<LogSegmentMetadata>>> asyncStartFetchLogSegments() { + Promise<Versioned<List<LogSegmentMetadata>>> promise = + new Promise<Versioned<List<LogSegmentMetadata>>>(); + asyncStartFetchLogSegments(promise); + return promise; + } + + void asyncStartFetchLogSegments(final Promise<Versioned<List<LogSegmentMetadata>>> promise) { + readLogSegmentsFromStore( + LogSegmentMetadata.COMPARATOR, + LogSegmentFilter.DEFAULT_FILTER, + this).addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { + @Override + public void onFailure(Throwable cause) { + if (cause instanceof LogNotFoundException || + cause instanceof LogSegmentNotFoundException || + cause instanceof UnexpectedException) { + // indicate some inconsistent behavior, abort + metadataException.compareAndSet(null, (IOException) cause); + // notify the reader that read handler is in error state + notifyReaderOnError(cause); + FutureUtils.setException(promise, cause); + return; + } + scheduler.schedule(new Runnable() { + @Override + public void run() { + asyncStartFetchLogSegments(promise); + } + }, conf.getZKRetryBackoffMaxMillis(), TimeUnit.MILLISECONDS); + } + + @Override + public void onSuccess(Versioned<List<LogSegmentMetadata>> segments) { + // no-op + FutureUtils.setValue(promise, segments); + } + }); + } + + @VisibleForTesting + void disableReadAheadLogSegmentsNotification() { + logSegmentsNotificationDisabled = true; + } + + @Override + public void onSegmentsUpdated(final Versioned<List<String>> segments) { + synchronized (this) { + if (lastNotifiedLogSegments.getVersion() != Version.NEW && + lastNotifiedLogSegments.getVersion().compare(segments.getVersion()) != Version.Occurred.BEFORE) { + // the log segments has been read, and it is possibly a retry from last segments update + return; + } + } + + Promise<Versioned<List<LogSegmentMetadata>>> readLogSegmentsPromise = + new Promise<Versioned<List<LogSegmentMetadata>>>(); + readLogSegmentsPromise.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { + @Override + public void onFailure(Throwable cause) { + if (cause instanceof LogNotFoundException || + cause instanceof LogSegmentNotFoundException || + cause instanceof UnexpectedException) { + // indicate some inconsistent behavior, abort + metadataException.compareAndSet(null, (IOException) cause); + // notify the reader that read handler is in error state + notifyReaderOnError(cause); + return; + } + scheduler.schedule(new Runnable() { + @Override + public void run() { + onSegmentsUpdated(segments); + } + }, conf.getZKRetryBackoffMaxMillis(), TimeUnit.MILLISECONDS); + } + + @Override + public void onSuccess(Versioned<List<LogSegmentMetadata>> logSegments) { + List<LogSegmentMetadata> segmentsToNotify = null; + synchronized (BKLogReadHandler.this) { + Versioned<List<LogSegmentMetadata>> lastLogSegments = lastNotifiedLogSegments; + if (lastLogSegments.getVersion() == Version.NEW || + lastLogSegments.getVersion().compare(logSegments.getVersion()) == Version.Occurred.BEFORE) { + lastNotifiedLogSegments = logSegments; + segmentsToNotify = logSegments.getValue(); + } + } + if (null != segmentsToNotify) { + notifyUpdatedLogSegments(segmentsToNotify); + } + } + }); + // log segments list is updated, read their metadata + readLogSegmentsFromStore( + segments, + LogSegmentMetadata.COMPARATOR, + LogSegmentFilter.DEFAULT_FILTER, + readLogSegmentsPromise); + } + + @Override + public void onLogStreamDeleted() { + notifyLogStreamDeleted(); + } + + // + // Listener for log segments + // + + protected void registerListener(@Nullable LogSegmentListener listener) { + if (null != listener) { + listeners.add(listener); + } + } + + protected void unregisterListener(@Nullable LogSegmentListener listener) { + if (null != listener) { + listeners.remove(listener); + } + } + + protected void notifyUpdatedLogSegments(List<LogSegmentMetadata> segments) { + if (logSegmentsNotificationDisabled) { + return; + } + + for (LogSegmentListener listener : listeners) { + List<LogSegmentMetadata> listToReturn = + new ArrayList<LogSegmentMetadata>(segments); + Collections.sort(listToReturn, LogSegmentMetadata.COMPARATOR); + listener.onSegmentsUpdated(listToReturn); + } + } + + protected void notifyLogStreamDeleted() { + if (logSegmentsNotificationDisabled) { + return; + } + + for (LogSegmentListener listener : listeners) { + listener.onLogStreamDeleted(); + } + } + + // notify the errors + protected void notifyReaderOnError(Throwable cause) { + if (null != readerStateNotification) { + readerStateNotification.notifyOnError(cause); + } + } +}