http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java new file mode 100644 index 0000000..00e6b5c --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java @@ -0,0 +1,1106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import org.apache.distributedlog.callback.LogSegmentListener; +import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; +import org.apache.distributedlog.exceptions.AlreadyClosedException; +import org.apache.distributedlog.exceptions.LogEmptyException; +import org.apache.distributedlog.exceptions.LogNotFoundException; +import org.apache.distributedlog.exceptions.UnexpectedException; +import org.apache.distributedlog.function.CloseAsyncCloseableFunction; +import org.apache.distributedlog.function.GetVersionedValueFunction; +import org.apache.distributedlog.injector.AsyncFailureInjector; +import org.apache.distributedlog.logsegment.LogSegmentEntryStore; +import org.apache.distributedlog.logsegment.LogSegmentEntryWriter; +import org.apache.distributedlog.metadata.LogMetadataForReader; +import org.apache.distributedlog.metadata.LogMetadataForWriter; +import org.apache.distributedlog.io.AsyncCloseable; +import org.apache.distributedlog.lock.DistributedLock; +import org.apache.distributedlog.lock.NopDistributedLock; +import org.apache.distributedlog.lock.ZKDistributedLock; +import org.apache.distributedlog.logsegment.LogSegmentFilter; +import org.apache.distributedlog.logsegment.LogSegmentMetadataCache; +import org.apache.distributedlog.metadata.LogStreamMetadataStore; +import org.apache.distributedlog.namespace.NamespaceDriver; +import org.apache.distributedlog.stats.BroadCastStatsLogger; +import org.apache.distributedlog.subscription.SubscriptionsStore; +import org.apache.distributedlog.util.Allocator; +import org.apache.distributedlog.util.DLUtils; +import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.util.MonitoredFuturePool; +import org.apache.distributedlog.util.OrderedScheduler; +import org.apache.distributedlog.util.PermitLimiter; +import org.apache.distributedlog.util.PermitManager; +import org.apache.distributedlog.util.SchedulerUtils; +import org.apache.distributedlog.util.Utils; +import com.twitter.util.ExceptionalFunction; +import com.twitter.util.ExceptionalFunction0; +import com.twitter.util.Function; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; +import com.twitter.util.Promise; +import org.apache.bookkeeper.feature.FeatureProvider; +import org.apache.bookkeeper.stats.AlertStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.runtime.AbstractFunction0; +import scala.runtime.AbstractFunction1; +import scala.runtime.BoxedUnit; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.apache.distributedlog.namespace.NamespaceDriver.Role.READER; +import static org.apache.distributedlog.namespace.NamespaceDriver.Role.WRITER; + +/** + * <h3>Metrics</h3> + * <ul> + * <li> `log_writer/*`: all asynchronous writer related metrics are exposed under scope `log_writer`. + * See {@link BKAsyncLogWriter} for detail stats. + * <li> `async_reader/*`: all asyncrhonous reader related metrics are exposed under scope `async_reader`. + * See {@link BKAsyncLogReader} for detail stats. + * <li> `writer_future_pool/*`: metrics about the future pools that used by writers are exposed under + * scope `writer_future_pool`. See {@link MonitoredFuturePool} for detail stats. + * <li> `reader_future_pool/*`: metrics about the future pools that used by readers are exposed under + * scope `reader_future_pool`. See {@link MonitoredFuturePool} for detail stats. + * <li> `lock/*`: metrics about the locks used by writers. See {@link ZKDistributedLock} for detail + * stats. + * <li> `read_lock/*`: metrics about the locks used by readers. See {@link ZKDistributedLock} for + * detail stats. + * <li> `logsegments/*`: metrics about basic operations on log segments. See {@link BKLogHandler} for details. + * <li> `segments/*`: metrics about write operations on log segments. See {@link BKLogWriteHandler} for details. + * <li> `readahead_worker/*`: metrics about readahead workers used by readers. See {@link BKLogReadHandler} + * for details. + * </ul> + */ +class BKDistributedLogManager implements DistributedLogManager { + static final Logger LOG = LoggerFactory.getLogger(BKDistributedLogManager.class); + + static final Function<LogRecordWithDLSN, Long> RECORD_2_TXID_FUNCTION = + new Function<LogRecordWithDLSN, Long>() { + @Override + public Long apply(LogRecordWithDLSN record) { + return record.getTransactionId(); + } + }; + + static final Function<LogRecordWithDLSN, DLSN> RECORD_2_DLSN_FUNCTION = + new Function<LogRecordWithDLSN, DLSN>() { + @Override + public DLSN apply(LogRecordWithDLSN record) { + return record.getDlsn(); + } + }; + + private final URI uri; + private final String name; + private final String clientId; + private final int regionId; + private final String streamIdentifier; + private final DistributedLogConfiguration conf; + private final DynamicDistributedLogConfiguration dynConf; + private final NamespaceDriver driver; + private Promise<Void> closePromise; + private final OrderedScheduler scheduler; + private final FeatureProvider featureProvider; + private final AsyncFailureInjector failureInjector; + private final StatsLogger statsLogger; + private final StatsLogger perLogStatsLogger; + final AlertStatsLogger alertStatsLogger; + + // log segment metadata cache + private final LogSegmentMetadataCache logSegmentMetadataCache; + + // + // Writer Related Variables + // + private final PermitLimiter writeLimiter; + + // + // Reader Related Variables + /// + // read handler for listener. + private BKLogReadHandler readHandlerForListener = null; + private final PendingReaders pendingReaders; + + // resource to close + private final Optional<AsyncCloseable> resourcesCloseable; + + /** + * Create a {@link DistributedLogManager} with supplied resources. + * + * @param name log name + * @param conf distributedlog configuration + * @param dynConf dynamic distributedlog configuration + * @param uri uri location for the log + * @param driver namespace driver + * @param logSegmentMetadataCache log segment metadata cache + * @param scheduler ordered scheduled used by readers and writers + * @param clientId client id that used to initiate the locks + * @param regionId region id that would be encrypted as part of log segment metadata + * to indicate which region that the log segment will be created + * @param writeLimiter write limiter + * @param featureProvider provider to offer features + * @param statsLogger stats logger to receive stats + * @param perLogStatsLogger stats logger to receive per log stats + * @throws IOException + */ + BKDistributedLogManager(String name, + DistributedLogConfiguration conf, + DynamicDistributedLogConfiguration dynConf, + URI uri, + NamespaceDriver driver, + LogSegmentMetadataCache logSegmentMetadataCache, + OrderedScheduler scheduler, + String clientId, + Integer regionId, + PermitLimiter writeLimiter, + FeatureProvider featureProvider, + AsyncFailureInjector failureInjector, + StatsLogger statsLogger, + StatsLogger perLogStatsLogger, + Optional<AsyncCloseable> resourcesCloseable) { + this.name = name; + this.conf = conf; + this.dynConf = dynConf; + this.uri = uri; + this.driver = driver; + this.logSegmentMetadataCache = logSegmentMetadataCache; + this.scheduler = scheduler; + this.statsLogger = statsLogger; + this.perLogStatsLogger = BroadCastStatsLogger.masterslave(perLogStatsLogger, statsLogger); + this.pendingReaders = new PendingReaders(scheduler); + this.regionId = regionId; + this.clientId = clientId; + this.streamIdentifier = conf.getUnpartitionedStreamName(); + this.writeLimiter = writeLimiter; + // Feature Provider + this.featureProvider = featureProvider; + // Failure Injector + this.failureInjector = failureInjector; + // Stats + this.alertStatsLogger = new AlertStatsLogger(this.perLogStatsLogger, "dl_alert"); + this.resourcesCloseable = resourcesCloseable; + } + + @Override + public String getStreamName() { + return name; + } + + @Override + public NamespaceDriver getNamespaceDriver() { + return driver; + } + + URI getUri() { + return uri; + } + + DistributedLogConfiguration getConf() { + return conf; + } + + OrderedScheduler getScheduler() { + return scheduler; + } + + AsyncFailureInjector getFailureInjector() { + return failureInjector; + } + + // + // Test Methods + // + + @VisibleForTesting + LogStreamMetadataStore getWriterMetadataStore() { + return driver.getLogStreamMetadataStore(WRITER); + } + + @VisibleForTesting + LogSegmentEntryStore getReaderEntryStore() { + return driver.getLogSegmentEntryStore(READER); + } + + @VisibleForTesting + FeatureProvider getFeatureProvider() { + return this.featureProvider; + } + + private synchronized BKLogReadHandler getReadHandlerAndRegisterListener( + boolean create, LogSegmentListener listener) { + if (null == readHandlerForListener && create) { + readHandlerForListener = createReadHandler(); + readHandlerForListener.registerListener(listener); + // start fetch the log segments after created the listener + readHandlerForListener.asyncStartFetchLogSegments(); + return readHandlerForListener; + } + if (null != readHandlerForListener && null != listener) { + readHandlerForListener.registerListener(listener); + } + return readHandlerForListener; + } + + @Override + public List<LogSegmentMetadata> getLogSegments() throws IOException { + return FutureUtils.result(getLogSegmentsAsync()); + } + + protected Future<List<LogSegmentMetadata>> getLogSegmentsAsync() { + final BKLogReadHandler readHandler = createReadHandler(); + return readHandler.readLogSegmentsFromStore( + LogSegmentMetadata.COMPARATOR, + LogSegmentFilter.DEFAULT_FILTER, + null) + .map(GetVersionedValueFunction.GET_LOGSEGMENT_LIST_FUNC) + .ensure(CloseAsyncCloseableFunction.of(readHandler)); + } + + @Override + public void registerListener(LogSegmentListener listener) throws IOException { + getReadHandlerAndRegisterListener(true, listener); + } + + @Override + public synchronized void unregisterListener(LogSegmentListener listener) { + if (null != readHandlerForListener) { + readHandlerForListener.unregisterListener(listener); + } + } + + public void checkClosedOrInError(String operation) throws AlreadyClosedException { + synchronized (this) { + if (null != closePromise) { + throw new AlreadyClosedException("Executing " + operation + " on already closed DistributedLogManager"); + } + } + } + + // Create Read Handler + + synchronized BKLogReadHandler createReadHandler() { + Optional<String> subscriberId = Optional.absent(); + return createReadHandler(subscriberId, false); + } + + synchronized BKLogReadHandler createReadHandler(Optional<String> subscriberId) { + return createReadHandler(subscriberId, false); + } + + synchronized BKLogReadHandler createReadHandler(Optional<String> subscriberId, + boolean isHandleForReading) { + return createReadHandler( + subscriberId, + null, + isHandleForReading); + } + + synchronized BKLogReadHandler createReadHandler(Optional<String> subscriberId, + AsyncNotification notification, + boolean isHandleForReading) { + LogMetadataForReader logMetadata = LogMetadataForReader.of(uri, name, streamIdentifier); + return new BKLogReadHandler( + logMetadata, + subscriberId, + conf, + dynConf, + driver.getLogStreamMetadataStore(READER), + logSegmentMetadataCache, + driver.getLogSegmentEntryStore(READER), + scheduler, + alertStatsLogger, + statsLogger, + perLogStatsLogger, + clientId, + notification, + isHandleForReading); + } + + // Create Ledger Allocator + + + + // Create Write Handler + + public BKLogWriteHandler createWriteHandler(boolean lockHandler) + throws IOException { + return FutureUtils.result(asyncCreateWriteHandler(lockHandler)); + } + + Future<BKLogWriteHandler> asyncCreateWriteHandler(final boolean lockHandler) { + // Fetching Log Metadata (create if not exists) + return driver.getLogStreamMetadataStore(WRITER).getLog( + uri, + name, + true, + conf.getCreateStreamIfNotExists() + ).flatMap(new AbstractFunction1<LogMetadataForWriter, Future<BKLogWriteHandler>>() { + @Override + public Future<BKLogWriteHandler> apply(LogMetadataForWriter logMetadata) { + Promise<BKLogWriteHandler> createPromise = new Promise<BKLogWriteHandler>(); + createWriteHandler(logMetadata, lockHandler, createPromise); + return createPromise; + } + }); + } + + private void createWriteHandler(LogMetadataForWriter logMetadata, + boolean lockHandler, + final Promise<BKLogWriteHandler> createPromise) { + // Build the locks + DistributedLock lock; + if (conf.isWriteLockEnabled()) { + lock = driver.getLogStreamMetadataStore(WRITER).createWriteLock(logMetadata); + } else { + lock = NopDistributedLock.INSTANCE; + } + + Allocator<LogSegmentEntryWriter, Object> segmentAllocator; + try { + segmentAllocator = driver.getLogSegmentEntryStore(WRITER) + .newLogSegmentAllocator(logMetadata, dynConf); + } catch (IOException ioe) { + FutureUtils.setException(createPromise, ioe); + return; + } + + // Make sure writer handler created before resources are initialized + final BKLogWriteHandler writeHandler = new BKLogWriteHandler( + logMetadata, + conf, + driver.getLogStreamMetadataStore(WRITER), + logSegmentMetadataCache, + driver.getLogSegmentEntryStore(WRITER), + scheduler, + segmentAllocator, + statsLogger, + perLogStatsLogger, + alertStatsLogger, + clientId, + regionId, + writeLimiter, + featureProvider, + dynConf, + lock); + if (lockHandler) { + writeHandler.lockHandler().addEventListener(new FutureEventListener<DistributedLock>() { + @Override + public void onSuccess(DistributedLock lock) { + FutureUtils.setValue(createPromise, writeHandler); + } + + @Override + public void onFailure(final Throwable cause) { + writeHandler.asyncClose().ensure(new AbstractFunction0<BoxedUnit>() { + @Override + public BoxedUnit apply() { + FutureUtils.setException(createPromise, cause); + return BoxedUnit.UNIT; + } + }); + } + }); + } else { + FutureUtils.setValue(createPromise, writeHandler); + } + } + + PermitManager getLogSegmentRollingPermitManager() { + return driver.getLogStreamMetadataStore(WRITER).getPermitManager(); + } + + <T> Future<T> processReaderOperation(final Function<BKLogReadHandler, Future<T>> func) { + return scheduler.apply(new ExceptionalFunction0<BKLogReadHandler>() { + @Override + public BKLogReadHandler applyE() throws Throwable { + return getReadHandlerAndRegisterListener(true, null); + } + }).flatMap(new ExceptionalFunction<BKLogReadHandler, Future<T>>() { + @Override + public Future<T> applyE(final BKLogReadHandler readHandler) throws Throwable { + return func.apply(readHandler); + } + }); + } + + /** + * Check if an end of stream marker was added to the stream + * A stream with an end of stream marker cannot be appended to + * + * @return true if the marker was added to the stream, false otherwise + */ + @Override + public boolean isEndOfStreamMarked() throws IOException { + checkClosedOrInError("isEndOfStreamMarked"); + long lastTxId = FutureUtils.result(getLastLogRecordAsyncInternal(false, true)).getTransactionId(); + return lastTxId == DistributedLogConstants.MAX_TXID; + } + + /** + * Begin appending to the end of the log stream which is being treated as a sequence of bytes + * + * @return the writer interface to generate log records + */ + public AppendOnlyStreamWriter getAppendOnlyStreamWriter() throws IOException { + long position; + try { + position = FutureUtils.result(getLastLogRecordAsyncInternal(true, false)).getTransactionId(); + if (DistributedLogConstants.INVALID_TXID == position || + DistributedLogConstants.EMPTY_LOGSEGMENT_TX_ID == position) { + position = 0; + } + } catch (LogEmptyException ex) { + position = 0; + } catch (LogNotFoundException ex) { + position = 0; + } + return new AppendOnlyStreamWriter(startAsyncLogSegmentNonPartitioned(), position); + } + + /** + * Get a reader to read a log stream as a sequence of bytes + * + * @return the writer interface to generate log records + */ + public AppendOnlyStreamReader getAppendOnlyStreamReader() throws IOException { + return new AppendOnlyStreamReader(this); + } + + /** + * Begin writing to the log stream identified by the name + * + * @return the writer interface to generate log records + */ + @Override + public BKSyncLogWriter startLogSegmentNonPartitioned() throws IOException { + checkClosedOrInError("startLogSegmentNonPartitioned"); + BKSyncLogWriter writer = new BKSyncLogWriter(conf, dynConf, this); + boolean success = false; + try { + writer.createAndCacheWriteHandler(); + BKLogWriteHandler writeHandler = writer.getWriteHandler(); + FutureUtils.result(writeHandler.lockHandler()); + success = true; + return writer; + } finally { + if (!success) { + writer.abort(); + } + } + } + + /** + * Begin writing to the log stream identified by the name + * + * @return the writer interface to generate log records + */ + @Override + public BKAsyncLogWriter startAsyncLogSegmentNonPartitioned() throws IOException { + return (BKAsyncLogWriter) FutureUtils.result(openAsyncLogWriter()); + } + + @Override + public Future<AsyncLogWriter> openAsyncLogWriter() { + try { + checkClosedOrInError("startLogSegmentNonPartitioned"); + } catch (AlreadyClosedException e) { + return Future.exception(e); + } + + Future<BKLogWriteHandler> createWriteHandleFuture; + synchronized (this) { + // 1. create the locked write handler + createWriteHandleFuture = asyncCreateWriteHandler(true); + } + return createWriteHandleFuture.flatMap(new AbstractFunction1<BKLogWriteHandler, Future<AsyncLogWriter>>() { + @Override + public Future<AsyncLogWriter> apply(final BKLogWriteHandler writeHandler) { + final BKAsyncLogWriter writer; + synchronized (BKDistributedLogManager.this) { + // 2. create the writer with the handler + writer = new BKAsyncLogWriter( + conf, + dynConf, + BKDistributedLogManager.this, + writeHandler, + featureProvider, + statsLogger); + } + // 3. recover the incomplete log segments + return writeHandler.recoverIncompleteLogSegments() + .map(new AbstractFunction1<Long, AsyncLogWriter>() { + @Override + public AsyncLogWriter apply(Long lastTxId) { + // 4. update last tx id if successfully recovered + writer.setLastTxId(lastTxId); + return writer; + } + }).onFailure(new AbstractFunction1<Throwable, BoxedUnit>() { + @Override + public BoxedUnit apply(Throwable cause) { + // 5. close the writer if recovery failed + writer.asyncAbort(); + return BoxedUnit.UNIT; + } + }); + } + }); + } + + @Override + public Future<DLSN> getDLSNNotLessThanTxId(final long fromTxnId) { + return getLogSegmentsAsync().flatMap(new AbstractFunction1<List<LogSegmentMetadata>, Future<DLSN>>() { + @Override + public Future<DLSN> apply(List<LogSegmentMetadata> segments) { + return getDLSNNotLessThanTxId(fromTxnId, segments); + } + }); + } + + private Future<DLSN> getDLSNNotLessThanTxId(long fromTxnId, + final List<LogSegmentMetadata> segments) { + if (segments.isEmpty()) { + return getLastDLSNAsync(); + } + final int segmentIdx = DLUtils.findLogSegmentNotLessThanTxnId(segments, fromTxnId); + if (segmentIdx < 0) { + return Future.value(new DLSN(segments.get(0).getLogSegmentSequenceNumber(), 0L, 0L)); + } + return getDLSNNotLessThanTxIdInSegment( + fromTxnId, + segmentIdx, + segments, + driver.getLogSegmentEntryStore(READER) + ); + } + + private Future<DLSN> getDLSNNotLessThanTxIdInSegment(final long fromTxnId, + final int segmentIdx, + final List<LogSegmentMetadata> segments, + final LogSegmentEntryStore entryStore) { + final LogSegmentMetadata segment = segments.get(segmentIdx); + return ReadUtils.getLogRecordNotLessThanTxId( + name, + segment, + fromTxnId, + scheduler, + entryStore, + Math.max(2, dynConf.getReadAheadBatchSize()) + ).flatMap(new AbstractFunction1<Optional<LogRecordWithDLSN>, Future<DLSN>>() { + @Override + public Future<DLSN> apply(Optional<LogRecordWithDLSN> foundRecord) { + if (foundRecord.isPresent()) { + return Future.value(foundRecord.get().getDlsn()); + } + if ((segments.size() - 1) == segmentIdx) { + return getLastLogRecordAsync().map(new AbstractFunction1<LogRecordWithDLSN, DLSN>() { + @Override + public DLSN apply(LogRecordWithDLSN record) { + if (record.getTransactionId() >= fromTxnId) { + return record.getDlsn(); + } + return record.getDlsn().getNextDLSN(); + } + }); + } else { + return getDLSNNotLessThanTxIdInSegment( + fromTxnId, + segmentIdx + 1, + segments, + entryStore); + } + } + }); + } + + /** + * Get the input stream starting with fromTxnId for the specified log + * + * @param fromTxnId - the first transaction id we want to read + * @return the stream starting with transaction fromTxnId + * @throws IOException if a stream cannot be found. + */ + @Override + public LogReader getInputStream(long fromTxnId) + throws IOException { + return getInputStreamInternal(fromTxnId); + } + + @Override + public LogReader getInputStream(DLSN fromDLSN) throws IOException { + return getInputStreamInternal(fromDLSN, Optional.<Long>absent()); + } + + @Override + public AsyncLogReader getAsyncLogReader(long fromTxnId) throws IOException { + return FutureUtils.result(openAsyncLogReader(fromTxnId)); + } + + /** + * Opening a log reader positioning by transaction id <code>fromTxnId</code>. + * + * <p> + * - retrieve log segments for the stream + * - if the log segment list is empty, positioning by the last dlsn + * - otherwise, find the first log segment that contains the records whose transaction ids are not less than + * the provided transaction id <code>fromTxnId</code> + * - if all log segments' records' transaction ids are more than <code>fromTxnId</code>, positioning + * on the first record. + * - otherwise, search the log segment to find the log record + * - if the log record is found, positioning the reader by that found record's dlsn + * - otherwise, positioning by the last dlsn + * </p> + * + * @see DLUtils#findLogSegmentNotLessThanTxnId(List, long) + * @see ReadUtils#getLogRecordNotLessThanTxId(String, LogSegmentMetadata, long, ExecutorService, LogSegmentEntryStore, int) + * @param fromTxnId + * transaction id to start reading from + * @return future representing the open result. + */ + @Override + public Future<AsyncLogReader> openAsyncLogReader(long fromTxnId) { + final Promise<DLSN> dlsnPromise = new Promise<DLSN>(); + getDLSNNotLessThanTxId(fromTxnId).addEventListener(new FutureEventListener<DLSN>() { + + @Override + public void onSuccess(DLSN dlsn) { + dlsnPromise.setValue(dlsn); + } + + @Override + public void onFailure(Throwable cause) { + if (cause instanceof LogEmptyException) { + dlsnPromise.setValue(DLSN.InitialDLSN); + } else { + dlsnPromise.setException(cause); + } + } + }); + return dlsnPromise.flatMap(new AbstractFunction1<DLSN, Future<AsyncLogReader>>() { + @Override + public Future<AsyncLogReader> apply(DLSN dlsn) { + return openAsyncLogReader(dlsn); + } + }); + } + + @Override + public AsyncLogReader getAsyncLogReader(DLSN fromDLSN) throws IOException { + return FutureUtils.result(openAsyncLogReader(fromDLSN)); + } + + @Override + public Future<AsyncLogReader> openAsyncLogReader(DLSN fromDLSN) { + Optional<String> subscriberId = Optional.absent(); + AsyncLogReader reader = new BKAsyncLogReader( + this, + scheduler, + fromDLSN, + subscriberId, + false, + statsLogger); + pendingReaders.add(reader); + return Future.value(reader); + } + + /** + * Note the lock here is a sort of elective exclusive lock. I.e. acquiring this lock will only prevent other + * people who try to acquire the lock from reading from the stream. Normal readers (and writers) will not be + * blocked. + */ + @Override + public Future<AsyncLogReader> getAsyncLogReaderWithLock(final DLSN fromDLSN) { + Optional<String> subscriberId = Optional.absent(); + return getAsyncLogReaderWithLock(Optional.of(fromDLSN), subscriberId); + } + + @Override + public Future<AsyncLogReader> getAsyncLogReaderWithLock(final DLSN fromDLSN, final String subscriberId) { + return getAsyncLogReaderWithLock(Optional.of(fromDLSN), Optional.of(subscriberId)); + } + + @Override + public Future<AsyncLogReader> getAsyncLogReaderWithLock(String subscriberId) { + Optional<DLSN> fromDLSN = Optional.absent(); + return getAsyncLogReaderWithLock(fromDLSN, Optional.of(subscriberId)); + } + + protected Future<AsyncLogReader> getAsyncLogReaderWithLock(final Optional<DLSN> fromDLSN, + final Optional<String> subscriberId) { + if (!fromDLSN.isPresent() && !subscriberId.isPresent()) { + return Future.exception(new UnexpectedException("Neither from dlsn nor subscriber id is provided.")); + } + final BKAsyncLogReader reader = new BKAsyncLogReader( + BKDistributedLogManager.this, + scheduler, + fromDLSN.isPresent() ? fromDLSN.get() : DLSN.InitialDLSN, + subscriberId, + false, + statsLogger); + pendingReaders.add(reader); + final Future<Void> lockFuture = reader.lockStream(); + final Promise<AsyncLogReader> createPromise = new Promise<AsyncLogReader>( + new Function<Throwable, BoxedUnit>() { + @Override + public BoxedUnit apply(Throwable cause) { + // cancel the lock when the creation future is cancelled + lockFuture.cancel(); + return BoxedUnit.UNIT; + } + }); + // lock the stream - fetch the last commit position on success + lockFuture.flatMap(new Function<Void, Future<AsyncLogReader>>() { + @Override + public Future<AsyncLogReader> apply(Void complete) { + if (fromDLSN.isPresent()) { + return Future.value((AsyncLogReader) reader); + } + LOG.info("Reader {} @ {} reading last commit position from subscription store after acquired lock.", + subscriberId.get(), name); + // we acquired lock + final SubscriptionsStore subscriptionsStore = driver.getSubscriptionsStore(getStreamName()); + return subscriptionsStore.getLastCommitPosition(subscriberId.get()) + .map(new ExceptionalFunction<DLSN, AsyncLogReader>() { + @Override + public AsyncLogReader applyE(DLSN lastCommitPosition) throws UnexpectedException { + LOG.info("Reader {} @ {} positioned to last commit position {}.", + new Object[] { subscriberId.get(), name, lastCommitPosition }); + reader.setStartDLSN(lastCommitPosition); + return reader; + } + }); + } + }).addEventListener(new FutureEventListener<AsyncLogReader>() { + @Override + public void onSuccess(AsyncLogReader r) { + pendingReaders.remove(reader); + FutureUtils.setValue(createPromise, r); + } + + @Override + public void onFailure(final Throwable cause) { + pendingReaders.remove(reader); + reader.asyncClose().ensure(new AbstractFunction0<BoxedUnit>() { + @Override + public BoxedUnit apply() { + FutureUtils.setException(createPromise, cause); + return BoxedUnit.UNIT; + } + }); + } + }); + return createPromise; + } + + /** + * Get the input stream starting with fromTxnId for the specified log + * + * @param fromTxnId + * transaction id to start reading from + * @return log reader + * @throws IOException + */ + LogReader getInputStreamInternal(long fromTxnId) + throws IOException { + DLSN fromDLSN; + try { + fromDLSN = FutureUtils.result(getDLSNNotLessThanTxId(fromTxnId)); + } catch (LogEmptyException lee) { + fromDLSN = DLSN.InitialDLSN; + } + return getInputStreamInternal(fromDLSN, Optional.of(fromTxnId)); + } + + LogReader getInputStreamInternal(DLSN fromDLSN, Optional<Long> fromTxnId) + throws IOException { + LOG.info("Create sync reader starting from {}", fromDLSN); + checkClosedOrInError("getInputStream"); + return new BKSyncLogReader( + conf, + this, + fromDLSN, + fromTxnId, + statsLogger); + } + + /** + * Get the last log record in the stream + * + * @return the last log record in the stream + * @throws java.io.IOException if a stream cannot be found. + */ + @Override + public LogRecordWithDLSN getLastLogRecord() throws IOException { + checkClosedOrInError("getLastLogRecord"); + return FutureUtils.result(getLastLogRecordAsync()); + } + + @Override + public long getFirstTxId() throws IOException { + checkClosedOrInError("getFirstTxId"); + return FutureUtils.result(getFirstRecordAsyncInternal()).getTransactionId(); + } + + @Override + public long getLastTxId() throws IOException { + checkClosedOrInError("getLastTxId"); + return FutureUtils.result(getLastTxIdAsync()); + } + + @Override + public DLSN getLastDLSN() throws IOException { + checkClosedOrInError("getLastDLSN"); + return FutureUtils.result(getLastLogRecordAsyncInternal(false, false)).getDlsn(); + } + + /** + * Get Latest log record in the log + * + * @return latest log record + */ + @Override + public Future<LogRecordWithDLSN> getLastLogRecordAsync() { + return getLastLogRecordAsyncInternal(false, false); + } + + private Future<LogRecordWithDLSN> getLastLogRecordAsyncInternal(final boolean recover, + final boolean includeEndOfStream) { + return processReaderOperation(new Function<BKLogReadHandler, Future<LogRecordWithDLSN>>() { + @Override + public Future<LogRecordWithDLSN> apply(final BKLogReadHandler ledgerHandler) { + return ledgerHandler.getLastLogRecordAsync(recover, includeEndOfStream); + } + }); + } + + /** + * Get Latest Transaction Id in the log + * + * @return latest transaction id + */ + @Override + public Future<Long> getLastTxIdAsync() { + return getLastLogRecordAsyncInternal(false, false) + .map(RECORD_2_TXID_FUNCTION); + } + + /** + * Get first DLSN in the log. + * + * @return first dlsn in the stream + */ + @Override + public Future<DLSN> getFirstDLSNAsync() { + return getFirstRecordAsyncInternal().map(RECORD_2_DLSN_FUNCTION); + } + + private Future<LogRecordWithDLSN> getFirstRecordAsyncInternal() { + return processReaderOperation(new Function<BKLogReadHandler, Future<LogRecordWithDLSN>>() { + @Override + public Future<LogRecordWithDLSN> apply(final BKLogReadHandler ledgerHandler) { + return ledgerHandler.asyncGetFirstLogRecord(); + } + }); + } + + /** + * Get Latest DLSN in the log. + * + * @return latest transaction id + */ + @Override + public Future<DLSN> getLastDLSNAsync() { + return getLastLogRecordAsyncInternal(false, false) + .map(RECORD_2_DLSN_FUNCTION); + } + + /** + * Get the number of log records in the active portion of the log + * Any log segments that have already been truncated will not be included + * + * @return number of log records + * @throws IOException + */ + @Override + public long getLogRecordCount() throws IOException { + checkClosedOrInError("getLogRecordCount"); + return FutureUtils.result(getLogRecordCountAsync(DLSN.InitialDLSN)); + } + + /** + * Get the number of log records in the active portion of the log asynchronously. + * Any log segments that have already been truncated will not be included + * + * @return future number of log records + * @throws IOException + */ + @Override + public Future<Long> getLogRecordCountAsync(final DLSN beginDLSN) { + return processReaderOperation(new Function<BKLogReadHandler, Future<Long>>() { + @Override + public Future<Long> apply(BKLogReadHandler ledgerHandler) { + return ledgerHandler.asyncGetLogRecordCount(beginDLSN); + } + }); + } + + @Override + public void recover() throws IOException { + recoverInternal(conf.getUnpartitionedStreamName()); + } + + /** + * Recover a specified stream within the log container + * The writer implicitly recovers a topic when it resumes writing. + * This allows applications to recover a container explicitly so + * that application may read a fully recovered log before resuming + * the writes + * + * @throws IOException if the recovery fails + */ + private void recoverInternal(String streamIdentifier) throws IOException { + checkClosedOrInError("recoverInternal"); + BKLogWriteHandler ledgerHandler = createWriteHandler(true); + try { + FutureUtils.result(ledgerHandler.recoverIncompleteLogSegments()); + } finally { + Utils.closeQuietly(ledgerHandler); + } + } + + /** + * Delete all the partitions of the specified log + * + * @throws IOException if the deletion fails + */ + @Override + public void delete() throws IOException { + FutureUtils.result(driver.getLogStreamMetadataStore(WRITER) + .deleteLog(uri, getStreamName())); + } + + /** + * The DistributedLogManager may archive/purge any logs for transactionId + * less than or equal to minImageTxId. + * This is to be used only when the client explicitly manages deletion. If + * the cleanup policy is based on sliding time window, then this method need + * not be called. + * + * @param minTxIdToKeep the earliest txid that must be retained + * @throws IOException if purging fails + */ + @Override + public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException { + Preconditions.checkArgument(minTxIdToKeep > 0, "Invalid transaction id " + minTxIdToKeep); + checkClosedOrInError("purgeLogSegmentsOlderThan"); + BKLogWriteHandler ledgerHandler = createWriteHandler(true); + try { + LOG.info("Purging logs for {} older than {}", ledgerHandler.getFullyQualifiedName(), minTxIdToKeep); + FutureUtils.result(ledgerHandler.purgeLogSegmentsOlderThanTxnId(minTxIdToKeep)); + } finally { + Utils.closeQuietly(ledgerHandler); + } + } + + static class PendingReaders implements AsyncCloseable { + + final ExecutorService executorService; + final Set<AsyncCloseable> readers = new HashSet<AsyncCloseable>(); + + PendingReaders(ExecutorService executorService) { + this.executorService = executorService; + } + + public synchronized void remove(AsyncCloseable reader) { + readers.remove(reader); + } + + public synchronized void add(AsyncCloseable reader) { + readers.add(reader); + } + + @Override + public Future<Void> asyncClose() { + return Utils.closeSequence(executorService, true, readers.toArray(new AsyncLogReader[readers.size()])) + .onSuccess(new AbstractFunction1<Void, BoxedUnit>() { + @Override + public BoxedUnit apply(Void value) { + readers.clear(); + return BoxedUnit.UNIT; + } + }); + } + }; + + /** + * Close the distributed log manager, freeing any resources it may hold. + */ + @Override + public Future<Void> asyncClose() { + Promise<Void> closeFuture; + BKLogReadHandler readHandlerToClose; + synchronized (this) { + if (null != closePromise) { + return closePromise; + } + closeFuture = closePromise = new Promise<Void>(); + readHandlerToClose = readHandlerForListener; + } + + Future<Void> closeResult = Utils.closeSequence(null, true, + readHandlerToClose, + pendingReaders, + resourcesCloseable.or(AsyncCloseable.NULL)); + closeResult.proxyTo(closeFuture); + return closeFuture; + } + + @Override + public void close() throws IOException { + FutureUtils.result(asyncClose()); + } + + @Override + public String toString() { + return String.format("DLM:%s:%s", getUri(), getStreamName()); + } + + public void raiseAlert(String msg, Object... args) { + alertStatsLogger.raise(msg, args); + } + + @Override + public SubscriptionsStore getSubscriptionsStore() { + return driver.getSubscriptionsStore(getStreamName()); + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java new file mode 100644 index 0000000..0a4608e --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java @@ -0,0 +1,320 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog; + +import com.google.common.base.Optional; +import com.google.common.base.Ticker; +import org.apache.distributedlog.acl.AccessControlManager; +import org.apache.distributedlog.callback.NamespaceListener; +import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; +import org.apache.distributedlog.exceptions.AlreadyClosedException; +import org.apache.distributedlog.exceptions.InvalidStreamNameException; +import org.apache.distributedlog.exceptions.LogNotFoundException; +import org.apache.distributedlog.injector.AsyncFailureInjector; +import org.apache.distributedlog.io.AsyncCloseable; +import org.apache.distributedlog.logsegment.LogSegmentMetadataCache; +import org.apache.distributedlog.namespace.DistributedLogNamespace; +import org.apache.distributedlog.namespace.NamespaceDriver; +import org.apache.distributedlog.util.ConfUtils; +import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor; +import org.apache.distributedlog.util.OrderedScheduler; +import org.apache.distributedlog.util.PermitLimiter; +import org.apache.distributedlog.util.SchedulerUtils; +import org.apache.distributedlog.util.Utils; +import org.apache.bookkeeper.feature.FeatureProvider; +import org.apache.bookkeeper.stats.StatsLogger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.Iterator; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.distributedlog.namespace.NamespaceDriver.Role.WRITER; +import static org.apache.distributedlog.util.DLUtils.validateName; + +/** + * BKDistributedLogNamespace is the default implementation of {@link DistributedLogNamespace}. It uses + * zookeeper for metadata storage and bookkeeper for data storage. + * <h3>Metrics</h3> + * + * <h4>ZooKeeper Client</h4> + * See {@link ZooKeeperClient} for detail sub-stats. + * <ul> + * <li> `scope`/dlzk_factory_writer_shared/* : stats about the zookeeper client shared by all DL writers. + * <li> `scope`/dlzk_factory_reader_shared/* : stats about the zookeeper client shared by all DL readers. + * <li> `scope`/bkzk_factory_writer_shared/* : stats about the zookeeper client used by bookkeeper client + * shared by all DL writers. + * <li> `scope`/bkzk_factory_reader_shared/* : stats about the zookeeper client used by bookkeeper client + * shared by all DL readers. + * </ul> + * + * <h4>BookKeeper Client</h4> + * BookKeeper client stats are exposed directly to current scope. See {@link BookKeeperClient} for detail stats. + * + * <h4>Utils</h4> + * <ul> + * <li> `scope`/factory/thread_pool/* : stats about the ordered scheduler used by this namespace. + * See {@link OrderedScheduler}. + * <li> `scope`/factory/readahead_thread_pool/* : stats about the readahead thread pool executor + * used by this namespace. See {@link MonitoredScheduledThreadPoolExecutor}. + * <li> `scope`/writeLimiter/* : stats about the global write limiter used by this namespace. + * See {@link PermitLimiter}. + * </ul> + * + * <h4>DistributedLogManager</h4> + * + * All the core stats about reader and writer are exposed under current scope via {@link BKDistributedLogManager}. + */ +public class BKDistributedLogNamespace implements DistributedLogNamespace { + static final Logger LOG = LoggerFactory.getLogger(BKDistributedLogNamespace.class); + + private final String clientId; + private final int regionId; + private final DistributedLogConfiguration conf; + private final URI namespace; + // namespace driver + private final NamespaceDriver driver; + // resources + private final OrderedScheduler scheduler; + private final PermitLimiter writeLimiter; + private final AsyncFailureInjector failureInjector; + // log segment metadata store + private final LogSegmentMetadataCache logSegmentMetadataCache; + // feature provider + private final FeatureProvider featureProvider; + // Stats Loggers + private final StatsLogger statsLogger; + private final StatsLogger perLogStatsLogger; + + protected final AtomicBoolean closed = new AtomicBoolean(false); + + public BKDistributedLogNamespace( + DistributedLogConfiguration conf, + URI uri, + NamespaceDriver driver, + OrderedScheduler scheduler, + FeatureProvider featureProvider, + PermitLimiter writeLimiter, + AsyncFailureInjector failureInjector, + StatsLogger statsLogger, + StatsLogger perLogStatsLogger, + String clientId, + int regionId) { + this.conf = conf; + this.namespace = uri; + this.driver = driver; + this.scheduler = scheduler; + this.featureProvider = featureProvider; + this.writeLimiter = writeLimiter; + this.failureInjector = failureInjector; + this.statsLogger = statsLogger; + this.perLogStatsLogger = perLogStatsLogger; + this.clientId = clientId; + this.regionId = regionId; + + // create a log segment metadata cache + this.logSegmentMetadataCache = new LogSegmentMetadataCache(conf, Ticker.systemTicker()); + } + + @Override + public NamespaceDriver getNamespaceDriver() { + return driver; + } + + // + // Namespace Methods + // + + @Override + public void createLog(String logName) + throws InvalidStreamNameException, IOException { + checkState(); + validateName(logName); + URI uri = FutureUtils.result(driver.getLogMetadataStore().createLog(logName)); + FutureUtils.result(driver.getLogStreamMetadataStore(WRITER).getLog(uri, logName, true, true)); + } + + @Override + public void deleteLog(String logName) + throws InvalidStreamNameException, LogNotFoundException, IOException { + checkState(); + validateName(logName); + Optional<URI> uri = FutureUtils.result(driver.getLogMetadataStore().getLogLocation(logName)); + if (!uri.isPresent()) { + throw new LogNotFoundException("Log " + logName + " isn't found."); + } + DistributedLogManager dlm = openLogInternal( + uri.get(), + logName, + Optional.<DistributedLogConfiguration>absent(), + Optional.<DynamicDistributedLogConfiguration>absent()); + dlm.delete(); + } + + @Override + public DistributedLogManager openLog(String logName) + throws InvalidStreamNameException, IOException { + return openLog(logName, + Optional.<DistributedLogConfiguration>absent(), + Optional.<DynamicDistributedLogConfiguration>absent(), + Optional.<StatsLogger>absent()); + } + + @Override + public DistributedLogManager openLog(String logName, + Optional<DistributedLogConfiguration> logConf, + Optional<DynamicDistributedLogConfiguration> dynamicLogConf, + Optional<StatsLogger> perStreamStatsLogger) + throws InvalidStreamNameException, IOException { + checkState(); + validateName(logName); + Optional<URI> uri = FutureUtils.result(driver.getLogMetadataStore().getLogLocation(logName)); + if (!uri.isPresent()) { + throw new LogNotFoundException("Log " + logName + " isn't found."); + } + return openLogInternal( + uri.get(), + logName, + logConf, + dynamicLogConf); + } + + @Override + public boolean logExists(String logName) + throws IOException, IllegalArgumentException { + checkState(); + Optional<URI> uri = FutureUtils.result(driver.getLogMetadataStore().getLogLocation(logName)); + if (uri.isPresent()) { + try { + FutureUtils.result(driver.getLogStreamMetadataStore(WRITER) + .logExists(uri.get(), logName)); + return true; + } catch (LogNotFoundException lnfe) { + return false; + } + } else { + return false; + } + } + + @Override + public Iterator<String> getLogs() throws IOException { + checkState(); + return FutureUtils.result(driver.getLogMetadataStore().getLogs()); + } + + @Override + public void registerNamespaceListener(NamespaceListener listener) { + driver.getLogMetadataStore().registerNamespaceListener(listener); + } + + @Override + public synchronized AccessControlManager createAccessControlManager() throws IOException { + checkState(); + return driver.getAccessControlManager(); + } + + /** + * Open the log in location <i>uri</i>. + * + * @param uri + * location to store the log + * @param nameOfLogStream + * name of the log + * @param logConfiguration + * optional stream configuration + * @param dynamicLogConfiguration + * dynamic stream configuration overrides. + * @return distributedlog manager instance. + * @throws InvalidStreamNameException if the stream name is invalid + * @throws IOException + */ + protected DistributedLogManager openLogInternal( + URI uri, + String nameOfLogStream, + Optional<DistributedLogConfiguration> logConfiguration, + Optional<DynamicDistributedLogConfiguration> dynamicLogConfiguration) + throws InvalidStreamNameException, IOException { + // Make sure the name is well formed + checkState(); + validateName(nameOfLogStream); + + DistributedLogConfiguration mergedConfiguration = new DistributedLogConfiguration(); + mergedConfiguration.addConfiguration(conf); + mergedConfiguration.loadStreamConf(logConfiguration); + // If dynamic config was not provided, default to a static view of the global configuration. + DynamicDistributedLogConfiguration dynConf = null; + if (dynamicLogConfiguration.isPresent()) { + dynConf = dynamicLogConfiguration.get(); + } else { + dynConf = ConfUtils.getConstDynConf(mergedConfiguration); + } + + return new BKDistributedLogManager( + nameOfLogStream, /* Log Name */ + mergedConfiguration, /* Configuration */ + dynConf, /* Dynamic Configuration */ + uri, /* Namespace URI */ + driver, /* Namespace Driver */ + logSegmentMetadataCache, /* Log Segment Metadata Cache */ + scheduler, /* DL scheduler */ + clientId, /* Client Id */ + regionId, /* Region Id */ + writeLimiter, /* Write Limiter */ + featureProvider.scope("dl"), /* Feature Provider */ + failureInjector, /* Failure Injector */ + statsLogger, /* Stats Logger */ + perLogStatsLogger, /* Per Log Stats Logger */ + Optional.<AsyncCloseable>absent() /* shared resources, we don't need to close any resources in dlm */ + ); + } + + /** + * Check the namespace state. + * + * @throws IOException + */ + private void checkState() throws IOException { + if (closed.get()) { + LOG.error("BK namespace {} is already closed", namespace); + throw new AlreadyClosedException("BK namespace " + namespace + " is already closed"); + } + } + + /** + * Close the distributed log manager factory, freeing any resources it may hold. + */ + @Override + public void close() { + if (!closed.compareAndSet(false, true)) { + return; + } + // shutdown the driver + Utils.close(driver); + // close the write limiter + this.writeLimiter.close(); + // Shutdown the schedulers + SchedulerUtils.shutdownScheduler(scheduler, conf.getSchedulerShutdownTimeoutMs(), + TimeUnit.MILLISECONDS); + LOG.info("Executor Service Stopped."); + } +}