http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java deleted file mode 100644 index a3959b0..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java +++ /dev/null @@ -1,1106 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.twitter.distributedlog.callback.LogSegmentListener; -import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; -import com.twitter.distributedlog.exceptions.AlreadyClosedException; -import com.twitter.distributedlog.exceptions.LogEmptyException; -import com.twitter.distributedlog.exceptions.LogNotFoundException; -import com.twitter.distributedlog.exceptions.UnexpectedException; -import com.twitter.distributedlog.function.CloseAsyncCloseableFunction; -import com.twitter.distributedlog.function.GetVersionedValueFunction; -import com.twitter.distributedlog.injector.AsyncFailureInjector; -import com.twitter.distributedlog.logsegment.LogSegmentEntryStore; -import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter; -import com.twitter.distributedlog.metadata.LogMetadataForReader; -import com.twitter.distributedlog.metadata.LogMetadataForWriter; -import com.twitter.distributedlog.io.AsyncCloseable; -import com.twitter.distributedlog.lock.DistributedLock; -import com.twitter.distributedlog.lock.NopDistributedLock; -import com.twitter.distributedlog.lock.ZKDistributedLock; -import com.twitter.distributedlog.logsegment.LogSegmentFilter; -import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache; -import com.twitter.distributedlog.metadata.LogStreamMetadataStore; -import com.twitter.distributedlog.namespace.NamespaceDriver; -import com.twitter.distributedlog.stats.BroadCastStatsLogger; -import com.twitter.distributedlog.subscription.SubscriptionsStore; -import com.twitter.distributedlog.util.Allocator; -import com.twitter.distributedlog.util.DLUtils; -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.distributedlog.util.MonitoredFuturePool; -import com.twitter.distributedlog.util.OrderedScheduler; -import com.twitter.distributedlog.util.PermitLimiter; -import com.twitter.distributedlog.util.PermitManager; -import com.twitter.distributedlog.util.SchedulerUtils; -import com.twitter.distributedlog.util.Utils; -import com.twitter.util.ExceptionalFunction; -import com.twitter.util.ExceptionalFunction0; -import com.twitter.util.Function; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import org.apache.bookkeeper.feature.FeatureProvider; -import org.apache.bookkeeper.stats.AlertStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction0; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; - -import java.io.Closeable; -import java.io.IOException; -import java.net.URI; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -import static com.twitter.distributedlog.namespace.NamespaceDriver.Role.READER; -import static com.twitter.distributedlog.namespace.NamespaceDriver.Role.WRITER; - -/** - * <h3>Metrics</h3> - * <ul> - * <li> `log_writer/*`: all asynchronous writer related metrics are exposed under scope `log_writer`. - * See {@link BKAsyncLogWriter} for detail stats. - * <li> `async_reader/*`: all asyncrhonous reader related metrics are exposed under scope `async_reader`. - * See {@link BKAsyncLogReader} for detail stats. - * <li> `writer_future_pool/*`: metrics about the future pools that used by writers are exposed under - * scope `writer_future_pool`. See {@link MonitoredFuturePool} for detail stats. - * <li> `reader_future_pool/*`: metrics about the future pools that used by readers are exposed under - * scope `reader_future_pool`. See {@link MonitoredFuturePool} for detail stats. - * <li> `lock/*`: metrics about the locks used by writers. See {@link ZKDistributedLock} for detail - * stats. - * <li> `read_lock/*`: metrics about the locks used by readers. See {@link ZKDistributedLock} for - * detail stats. - * <li> `logsegments/*`: metrics about basic operations on log segments. See {@link BKLogHandler} for details. - * <li> `segments/*`: metrics about write operations on log segments. See {@link BKLogWriteHandler} for details. - * <li> `readahead_worker/*`: metrics about readahead workers used by readers. See {@link BKLogReadHandler} - * for details. - * </ul> - */ -class BKDistributedLogManager implements DistributedLogManager { - static final Logger LOG = LoggerFactory.getLogger(BKDistributedLogManager.class); - - static final Function<LogRecordWithDLSN, Long> RECORD_2_TXID_FUNCTION = - new Function<LogRecordWithDLSN, Long>() { - @Override - public Long apply(LogRecordWithDLSN record) { - return record.getTransactionId(); - } - }; - - static final Function<LogRecordWithDLSN, DLSN> RECORD_2_DLSN_FUNCTION = - new Function<LogRecordWithDLSN, DLSN>() { - @Override - public DLSN apply(LogRecordWithDLSN record) { - return record.getDlsn(); - } - }; - - private final URI uri; - private final String name; - private final String clientId; - private final int regionId; - private final String streamIdentifier; - private final DistributedLogConfiguration conf; - private final DynamicDistributedLogConfiguration dynConf; - private final NamespaceDriver driver; - private Promise<Void> closePromise; - private final OrderedScheduler scheduler; - private final FeatureProvider featureProvider; - private final AsyncFailureInjector failureInjector; - private final StatsLogger statsLogger; - private final StatsLogger perLogStatsLogger; - final AlertStatsLogger alertStatsLogger; - - // log segment metadata cache - private final LogSegmentMetadataCache logSegmentMetadataCache; - - // - // Writer Related Variables - // - private final PermitLimiter writeLimiter; - - // - // Reader Related Variables - /// - // read handler for listener. - private BKLogReadHandler readHandlerForListener = null; - private final PendingReaders pendingReaders; - - // resource to close - private final Optional<AsyncCloseable> resourcesCloseable; - - /** - * Create a {@link DistributedLogManager} with supplied resources. - * - * @param name log name - * @param conf distributedlog configuration - * @param dynConf dynamic distributedlog configuration - * @param uri uri location for the log - * @param driver namespace driver - * @param logSegmentMetadataCache log segment metadata cache - * @param scheduler ordered scheduled used by readers and writers - * @param clientId client id that used to initiate the locks - * @param regionId region id that would be encrypted as part of log segment metadata - * to indicate which region that the log segment will be created - * @param writeLimiter write limiter - * @param featureProvider provider to offer features - * @param statsLogger stats logger to receive stats - * @param perLogStatsLogger stats logger to receive per log stats - * @throws IOException - */ - BKDistributedLogManager(String name, - DistributedLogConfiguration conf, - DynamicDistributedLogConfiguration dynConf, - URI uri, - NamespaceDriver driver, - LogSegmentMetadataCache logSegmentMetadataCache, - OrderedScheduler scheduler, - String clientId, - Integer regionId, - PermitLimiter writeLimiter, - FeatureProvider featureProvider, - AsyncFailureInjector failureInjector, - StatsLogger statsLogger, - StatsLogger perLogStatsLogger, - Optional<AsyncCloseable> resourcesCloseable) { - this.name = name; - this.conf = conf; - this.dynConf = dynConf; - this.uri = uri; - this.driver = driver; - this.logSegmentMetadataCache = logSegmentMetadataCache; - this.scheduler = scheduler; - this.statsLogger = statsLogger; - this.perLogStatsLogger = BroadCastStatsLogger.masterslave(perLogStatsLogger, statsLogger); - this.pendingReaders = new PendingReaders(scheduler); - this.regionId = regionId; - this.clientId = clientId; - this.streamIdentifier = conf.getUnpartitionedStreamName(); - this.writeLimiter = writeLimiter; - // Feature Provider - this.featureProvider = featureProvider; - // Failure Injector - this.failureInjector = failureInjector; - // Stats - this.alertStatsLogger = new AlertStatsLogger(this.perLogStatsLogger, "dl_alert"); - this.resourcesCloseable = resourcesCloseable; - } - - @Override - public String getStreamName() { - return name; - } - - @Override - public NamespaceDriver getNamespaceDriver() { - return driver; - } - - URI getUri() { - return uri; - } - - DistributedLogConfiguration getConf() { - return conf; - } - - OrderedScheduler getScheduler() { - return scheduler; - } - - AsyncFailureInjector getFailureInjector() { - return failureInjector; - } - - // - // Test Methods - // - - @VisibleForTesting - LogStreamMetadataStore getWriterMetadataStore() { - return driver.getLogStreamMetadataStore(WRITER); - } - - @VisibleForTesting - LogSegmentEntryStore getReaderEntryStore() { - return driver.getLogSegmentEntryStore(READER); - } - - @VisibleForTesting - FeatureProvider getFeatureProvider() { - return this.featureProvider; - } - - private synchronized BKLogReadHandler getReadHandlerAndRegisterListener( - boolean create, LogSegmentListener listener) { - if (null == readHandlerForListener && create) { - readHandlerForListener = createReadHandler(); - readHandlerForListener.registerListener(listener); - // start fetch the log segments after created the listener - readHandlerForListener.asyncStartFetchLogSegments(); - return readHandlerForListener; - } - if (null != readHandlerForListener && null != listener) { - readHandlerForListener.registerListener(listener); - } - return readHandlerForListener; - } - - @Override - public List<LogSegmentMetadata> getLogSegments() throws IOException { - return FutureUtils.result(getLogSegmentsAsync()); - } - - protected Future<List<LogSegmentMetadata>> getLogSegmentsAsync() { - final BKLogReadHandler readHandler = createReadHandler(); - return readHandler.readLogSegmentsFromStore( - LogSegmentMetadata.COMPARATOR, - LogSegmentFilter.DEFAULT_FILTER, - null) - .map(GetVersionedValueFunction.GET_LOGSEGMENT_LIST_FUNC) - .ensure(CloseAsyncCloseableFunction.of(readHandler)); - } - - @Override - public void registerListener(LogSegmentListener listener) throws IOException { - getReadHandlerAndRegisterListener(true, listener); - } - - @Override - public synchronized void unregisterListener(LogSegmentListener listener) { - if (null != readHandlerForListener) { - readHandlerForListener.unregisterListener(listener); - } - } - - public void checkClosedOrInError(String operation) throws AlreadyClosedException { - synchronized (this) { - if (null != closePromise) { - throw new AlreadyClosedException("Executing " + operation + " on already closed DistributedLogManager"); - } - } - } - - // Create Read Handler - - synchronized BKLogReadHandler createReadHandler() { - Optional<String> subscriberId = Optional.absent(); - return createReadHandler(subscriberId, false); - } - - synchronized BKLogReadHandler createReadHandler(Optional<String> subscriberId) { - return createReadHandler(subscriberId, false); - } - - synchronized BKLogReadHandler createReadHandler(Optional<String> subscriberId, - boolean isHandleForReading) { - return createReadHandler( - subscriberId, - null, - isHandleForReading); - } - - synchronized BKLogReadHandler createReadHandler(Optional<String> subscriberId, - AsyncNotification notification, - boolean isHandleForReading) { - LogMetadataForReader logMetadata = LogMetadataForReader.of(uri, name, streamIdentifier); - return new BKLogReadHandler( - logMetadata, - subscriberId, - conf, - dynConf, - driver.getLogStreamMetadataStore(READER), - logSegmentMetadataCache, - driver.getLogSegmentEntryStore(READER), - scheduler, - alertStatsLogger, - statsLogger, - perLogStatsLogger, - clientId, - notification, - isHandleForReading); - } - - // Create Ledger Allocator - - - - // Create Write Handler - - public BKLogWriteHandler createWriteHandler(boolean lockHandler) - throws IOException { - return FutureUtils.result(asyncCreateWriteHandler(lockHandler)); - } - - Future<BKLogWriteHandler> asyncCreateWriteHandler(final boolean lockHandler) { - // Fetching Log Metadata (create if not exists) - return driver.getLogStreamMetadataStore(WRITER).getLog( - uri, - name, - true, - conf.getCreateStreamIfNotExists() - ).flatMap(new AbstractFunction1<LogMetadataForWriter, Future<BKLogWriteHandler>>() { - @Override - public Future<BKLogWriteHandler> apply(LogMetadataForWriter logMetadata) { - Promise<BKLogWriteHandler> createPromise = new Promise<BKLogWriteHandler>(); - createWriteHandler(logMetadata, lockHandler, createPromise); - return createPromise; - } - }); - } - - private void createWriteHandler(LogMetadataForWriter logMetadata, - boolean lockHandler, - final Promise<BKLogWriteHandler> createPromise) { - // Build the locks - DistributedLock lock; - if (conf.isWriteLockEnabled()) { - lock = driver.getLogStreamMetadataStore(WRITER).createWriteLock(logMetadata); - } else { - lock = NopDistributedLock.INSTANCE; - } - - Allocator<LogSegmentEntryWriter, Object> segmentAllocator; - try { - segmentAllocator = driver.getLogSegmentEntryStore(WRITER) - .newLogSegmentAllocator(logMetadata, dynConf); - } catch (IOException ioe) { - FutureUtils.setException(createPromise, ioe); - return; - } - - // Make sure writer handler created before resources are initialized - final BKLogWriteHandler writeHandler = new BKLogWriteHandler( - logMetadata, - conf, - driver.getLogStreamMetadataStore(WRITER), - logSegmentMetadataCache, - driver.getLogSegmentEntryStore(WRITER), - scheduler, - segmentAllocator, - statsLogger, - perLogStatsLogger, - alertStatsLogger, - clientId, - regionId, - writeLimiter, - featureProvider, - dynConf, - lock); - if (lockHandler) { - writeHandler.lockHandler().addEventListener(new FutureEventListener<DistributedLock>() { - @Override - public void onSuccess(DistributedLock lock) { - FutureUtils.setValue(createPromise, writeHandler); - } - - @Override - public void onFailure(final Throwable cause) { - writeHandler.asyncClose().ensure(new AbstractFunction0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - FutureUtils.setException(createPromise, cause); - return BoxedUnit.UNIT; - } - }); - } - }); - } else { - FutureUtils.setValue(createPromise, writeHandler); - } - } - - PermitManager getLogSegmentRollingPermitManager() { - return driver.getLogStreamMetadataStore(WRITER).getPermitManager(); - } - - <T> Future<T> processReaderOperation(final Function<BKLogReadHandler, Future<T>> func) { - return scheduler.apply(new ExceptionalFunction0<BKLogReadHandler>() { - @Override - public BKLogReadHandler applyE() throws Throwable { - return getReadHandlerAndRegisterListener(true, null); - } - }).flatMap(new ExceptionalFunction<BKLogReadHandler, Future<T>>() { - @Override - public Future<T> applyE(final BKLogReadHandler readHandler) throws Throwable { - return func.apply(readHandler); - } - }); - } - - /** - * Check if an end of stream marker was added to the stream - * A stream with an end of stream marker cannot be appended to - * - * @return true if the marker was added to the stream, false otherwise - */ - @Override - public boolean isEndOfStreamMarked() throws IOException { - checkClosedOrInError("isEndOfStreamMarked"); - long lastTxId = FutureUtils.result(getLastLogRecordAsyncInternal(false, true)).getTransactionId(); - return lastTxId == DistributedLogConstants.MAX_TXID; - } - - /** - * Begin appending to the end of the log stream which is being treated as a sequence of bytes - * - * @return the writer interface to generate log records - */ - public AppendOnlyStreamWriter getAppendOnlyStreamWriter() throws IOException { - long position; - try { - position = FutureUtils.result(getLastLogRecordAsyncInternal(true, false)).getTransactionId(); - if (DistributedLogConstants.INVALID_TXID == position || - DistributedLogConstants.EMPTY_LOGSEGMENT_TX_ID == position) { - position = 0; - } - } catch (LogEmptyException ex) { - position = 0; - } catch (LogNotFoundException ex) { - position = 0; - } - return new AppendOnlyStreamWriter(startAsyncLogSegmentNonPartitioned(), position); - } - - /** - * Get a reader to read a log stream as a sequence of bytes - * - * @return the writer interface to generate log records - */ - public AppendOnlyStreamReader getAppendOnlyStreamReader() throws IOException { - return new AppendOnlyStreamReader(this); - } - - /** - * Begin writing to the log stream identified by the name - * - * @return the writer interface to generate log records - */ - @Override - public BKSyncLogWriter startLogSegmentNonPartitioned() throws IOException { - checkClosedOrInError("startLogSegmentNonPartitioned"); - BKSyncLogWriter writer = new BKSyncLogWriter(conf, dynConf, this); - boolean success = false; - try { - writer.createAndCacheWriteHandler(); - BKLogWriteHandler writeHandler = writer.getWriteHandler(); - FutureUtils.result(writeHandler.lockHandler()); - success = true; - return writer; - } finally { - if (!success) { - writer.abort(); - } - } - } - - /** - * Begin writing to the log stream identified by the name - * - * @return the writer interface to generate log records - */ - @Override - public BKAsyncLogWriter startAsyncLogSegmentNonPartitioned() throws IOException { - return (BKAsyncLogWriter) FutureUtils.result(openAsyncLogWriter()); - } - - @Override - public Future<AsyncLogWriter> openAsyncLogWriter() { - try { - checkClosedOrInError("startLogSegmentNonPartitioned"); - } catch (AlreadyClosedException e) { - return Future.exception(e); - } - - Future<BKLogWriteHandler> createWriteHandleFuture; - synchronized (this) { - // 1. create the locked write handler - createWriteHandleFuture = asyncCreateWriteHandler(true); - } - return createWriteHandleFuture.flatMap(new AbstractFunction1<BKLogWriteHandler, Future<AsyncLogWriter>>() { - @Override - public Future<AsyncLogWriter> apply(final BKLogWriteHandler writeHandler) { - final BKAsyncLogWriter writer; - synchronized (BKDistributedLogManager.this) { - // 2. create the writer with the handler - writer = new BKAsyncLogWriter( - conf, - dynConf, - BKDistributedLogManager.this, - writeHandler, - featureProvider, - statsLogger); - } - // 3. recover the incomplete log segments - return writeHandler.recoverIncompleteLogSegments() - .map(new AbstractFunction1<Long, AsyncLogWriter>() { - @Override - public AsyncLogWriter apply(Long lastTxId) { - // 4. update last tx id if successfully recovered - writer.setLastTxId(lastTxId); - return writer; - } - }).onFailure(new AbstractFunction1<Throwable, BoxedUnit>() { - @Override - public BoxedUnit apply(Throwable cause) { - // 5. close the writer if recovery failed - writer.asyncAbort(); - return BoxedUnit.UNIT; - } - }); - } - }); - } - - @Override - public Future<DLSN> getDLSNNotLessThanTxId(final long fromTxnId) { - return getLogSegmentsAsync().flatMap(new AbstractFunction1<List<LogSegmentMetadata>, Future<DLSN>>() { - @Override - public Future<DLSN> apply(List<LogSegmentMetadata> segments) { - return getDLSNNotLessThanTxId(fromTxnId, segments); - } - }); - } - - private Future<DLSN> getDLSNNotLessThanTxId(long fromTxnId, - final List<LogSegmentMetadata> segments) { - if (segments.isEmpty()) { - return getLastDLSNAsync(); - } - final int segmentIdx = DLUtils.findLogSegmentNotLessThanTxnId(segments, fromTxnId); - if (segmentIdx < 0) { - return Future.value(new DLSN(segments.get(0).getLogSegmentSequenceNumber(), 0L, 0L)); - } - return getDLSNNotLessThanTxIdInSegment( - fromTxnId, - segmentIdx, - segments, - driver.getLogSegmentEntryStore(READER) - ); - } - - private Future<DLSN> getDLSNNotLessThanTxIdInSegment(final long fromTxnId, - final int segmentIdx, - final List<LogSegmentMetadata> segments, - final LogSegmentEntryStore entryStore) { - final LogSegmentMetadata segment = segments.get(segmentIdx); - return ReadUtils.getLogRecordNotLessThanTxId( - name, - segment, - fromTxnId, - scheduler, - entryStore, - Math.max(2, dynConf.getReadAheadBatchSize()) - ).flatMap(new AbstractFunction1<Optional<LogRecordWithDLSN>, Future<DLSN>>() { - @Override - public Future<DLSN> apply(Optional<LogRecordWithDLSN> foundRecord) { - if (foundRecord.isPresent()) { - return Future.value(foundRecord.get().getDlsn()); - } - if ((segments.size() - 1) == segmentIdx) { - return getLastLogRecordAsync().map(new AbstractFunction1<LogRecordWithDLSN, DLSN>() { - @Override - public DLSN apply(LogRecordWithDLSN record) { - if (record.getTransactionId() >= fromTxnId) { - return record.getDlsn(); - } - return record.getDlsn().getNextDLSN(); - } - }); - } else { - return getDLSNNotLessThanTxIdInSegment( - fromTxnId, - segmentIdx + 1, - segments, - entryStore); - } - } - }); - } - - /** - * Get the input stream starting with fromTxnId for the specified log - * - * @param fromTxnId - the first transaction id we want to read - * @return the stream starting with transaction fromTxnId - * @throws IOException if a stream cannot be found. - */ - @Override - public LogReader getInputStream(long fromTxnId) - throws IOException { - return getInputStreamInternal(fromTxnId); - } - - @Override - public LogReader getInputStream(DLSN fromDLSN) throws IOException { - return getInputStreamInternal(fromDLSN, Optional.<Long>absent()); - } - - @Override - public AsyncLogReader getAsyncLogReader(long fromTxnId) throws IOException { - return FutureUtils.result(openAsyncLogReader(fromTxnId)); - } - - /** - * Opening a log reader positioning by transaction id <code>fromTxnId</code>. - * - * <p> - * - retrieve log segments for the stream - * - if the log segment list is empty, positioning by the last dlsn - * - otherwise, find the first log segment that contains the records whose transaction ids are not less than - * the provided transaction id <code>fromTxnId</code> - * - if all log segments' records' transaction ids are more than <code>fromTxnId</code>, positioning - * on the first record. - * - otherwise, search the log segment to find the log record - * - if the log record is found, positioning the reader by that found record's dlsn - * - otherwise, positioning by the last dlsn - * </p> - * - * @see DLUtils#findLogSegmentNotLessThanTxnId(List, long) - * @see ReadUtils#getLogRecordNotLessThanTxId(String, LogSegmentMetadata, long, ExecutorService, LogSegmentEntryStore, int) - * @param fromTxnId - * transaction id to start reading from - * @return future representing the open result. - */ - @Override - public Future<AsyncLogReader> openAsyncLogReader(long fromTxnId) { - final Promise<DLSN> dlsnPromise = new Promise<DLSN>(); - getDLSNNotLessThanTxId(fromTxnId).addEventListener(new FutureEventListener<DLSN>() { - - @Override - public void onSuccess(DLSN dlsn) { - dlsnPromise.setValue(dlsn); - } - - @Override - public void onFailure(Throwable cause) { - if (cause instanceof LogEmptyException) { - dlsnPromise.setValue(DLSN.InitialDLSN); - } else { - dlsnPromise.setException(cause); - } - } - }); - return dlsnPromise.flatMap(new AbstractFunction1<DLSN, Future<AsyncLogReader>>() { - @Override - public Future<AsyncLogReader> apply(DLSN dlsn) { - return openAsyncLogReader(dlsn); - } - }); - } - - @Override - public AsyncLogReader getAsyncLogReader(DLSN fromDLSN) throws IOException { - return FutureUtils.result(openAsyncLogReader(fromDLSN)); - } - - @Override - public Future<AsyncLogReader> openAsyncLogReader(DLSN fromDLSN) { - Optional<String> subscriberId = Optional.absent(); - AsyncLogReader reader = new BKAsyncLogReader( - this, - scheduler, - fromDLSN, - subscriberId, - false, - statsLogger); - pendingReaders.add(reader); - return Future.value(reader); - } - - /** - * Note the lock here is a sort of elective exclusive lock. I.e. acquiring this lock will only prevent other - * people who try to acquire the lock from reading from the stream. Normal readers (and writers) will not be - * blocked. - */ - @Override - public Future<AsyncLogReader> getAsyncLogReaderWithLock(final DLSN fromDLSN) { - Optional<String> subscriberId = Optional.absent(); - return getAsyncLogReaderWithLock(Optional.of(fromDLSN), subscriberId); - } - - @Override - public Future<AsyncLogReader> getAsyncLogReaderWithLock(final DLSN fromDLSN, final String subscriberId) { - return getAsyncLogReaderWithLock(Optional.of(fromDLSN), Optional.of(subscriberId)); - } - - @Override - public Future<AsyncLogReader> getAsyncLogReaderWithLock(String subscriberId) { - Optional<DLSN> fromDLSN = Optional.absent(); - return getAsyncLogReaderWithLock(fromDLSN, Optional.of(subscriberId)); - } - - protected Future<AsyncLogReader> getAsyncLogReaderWithLock(final Optional<DLSN> fromDLSN, - final Optional<String> subscriberId) { - if (!fromDLSN.isPresent() && !subscriberId.isPresent()) { - return Future.exception(new UnexpectedException("Neither from dlsn nor subscriber id is provided.")); - } - final BKAsyncLogReader reader = new BKAsyncLogReader( - BKDistributedLogManager.this, - scheduler, - fromDLSN.isPresent() ? fromDLSN.get() : DLSN.InitialDLSN, - subscriberId, - false, - statsLogger); - pendingReaders.add(reader); - final Future<Void> lockFuture = reader.lockStream(); - final Promise<AsyncLogReader> createPromise = new Promise<AsyncLogReader>( - new Function<Throwable, BoxedUnit>() { - @Override - public BoxedUnit apply(Throwable cause) { - // cancel the lock when the creation future is cancelled - lockFuture.cancel(); - return BoxedUnit.UNIT; - } - }); - // lock the stream - fetch the last commit position on success - lockFuture.flatMap(new Function<Void, Future<AsyncLogReader>>() { - @Override - public Future<AsyncLogReader> apply(Void complete) { - if (fromDLSN.isPresent()) { - return Future.value((AsyncLogReader) reader); - } - LOG.info("Reader {} @ {} reading last commit position from subscription store after acquired lock.", - subscriberId.get(), name); - // we acquired lock - final SubscriptionsStore subscriptionsStore = driver.getSubscriptionsStore(getStreamName()); - return subscriptionsStore.getLastCommitPosition(subscriberId.get()) - .map(new ExceptionalFunction<DLSN, AsyncLogReader>() { - @Override - public AsyncLogReader applyE(DLSN lastCommitPosition) throws UnexpectedException { - LOG.info("Reader {} @ {} positioned to last commit position {}.", - new Object[] { subscriberId.get(), name, lastCommitPosition }); - reader.setStartDLSN(lastCommitPosition); - return reader; - } - }); - } - }).addEventListener(new FutureEventListener<AsyncLogReader>() { - @Override - public void onSuccess(AsyncLogReader r) { - pendingReaders.remove(reader); - FutureUtils.setValue(createPromise, r); - } - - @Override - public void onFailure(final Throwable cause) { - pendingReaders.remove(reader); - reader.asyncClose().ensure(new AbstractFunction0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - FutureUtils.setException(createPromise, cause); - return BoxedUnit.UNIT; - } - }); - } - }); - return createPromise; - } - - /** - * Get the input stream starting with fromTxnId for the specified log - * - * @param fromTxnId - * transaction id to start reading from - * @return log reader - * @throws IOException - */ - LogReader getInputStreamInternal(long fromTxnId) - throws IOException { - DLSN fromDLSN; - try { - fromDLSN = FutureUtils.result(getDLSNNotLessThanTxId(fromTxnId)); - } catch (LogEmptyException lee) { - fromDLSN = DLSN.InitialDLSN; - } - return getInputStreamInternal(fromDLSN, Optional.of(fromTxnId)); - } - - LogReader getInputStreamInternal(DLSN fromDLSN, Optional<Long> fromTxnId) - throws IOException { - LOG.info("Create sync reader starting from {}", fromDLSN); - checkClosedOrInError("getInputStream"); - return new BKSyncLogReader( - conf, - this, - fromDLSN, - fromTxnId, - statsLogger); - } - - /** - * Get the last log record in the stream - * - * @return the last log record in the stream - * @throws java.io.IOException if a stream cannot be found. - */ - @Override - public LogRecordWithDLSN getLastLogRecord() throws IOException { - checkClosedOrInError("getLastLogRecord"); - return FutureUtils.result(getLastLogRecordAsync()); - } - - @Override - public long getFirstTxId() throws IOException { - checkClosedOrInError("getFirstTxId"); - return FutureUtils.result(getFirstRecordAsyncInternal()).getTransactionId(); - } - - @Override - public long getLastTxId() throws IOException { - checkClosedOrInError("getLastTxId"); - return FutureUtils.result(getLastTxIdAsync()); - } - - @Override - public DLSN getLastDLSN() throws IOException { - checkClosedOrInError("getLastDLSN"); - return FutureUtils.result(getLastLogRecordAsyncInternal(false, false)).getDlsn(); - } - - /** - * Get Latest log record in the log - * - * @return latest log record - */ - @Override - public Future<LogRecordWithDLSN> getLastLogRecordAsync() { - return getLastLogRecordAsyncInternal(false, false); - } - - private Future<LogRecordWithDLSN> getLastLogRecordAsyncInternal(final boolean recover, - final boolean includeEndOfStream) { - return processReaderOperation(new Function<BKLogReadHandler, Future<LogRecordWithDLSN>>() { - @Override - public Future<LogRecordWithDLSN> apply(final BKLogReadHandler ledgerHandler) { - return ledgerHandler.getLastLogRecordAsync(recover, includeEndOfStream); - } - }); - } - - /** - * Get Latest Transaction Id in the log - * - * @return latest transaction id - */ - @Override - public Future<Long> getLastTxIdAsync() { - return getLastLogRecordAsyncInternal(false, false) - .map(RECORD_2_TXID_FUNCTION); - } - - /** - * Get first DLSN in the log. - * - * @return first dlsn in the stream - */ - @Override - public Future<DLSN> getFirstDLSNAsync() { - return getFirstRecordAsyncInternal().map(RECORD_2_DLSN_FUNCTION); - } - - private Future<LogRecordWithDLSN> getFirstRecordAsyncInternal() { - return processReaderOperation(new Function<BKLogReadHandler, Future<LogRecordWithDLSN>>() { - @Override - public Future<LogRecordWithDLSN> apply(final BKLogReadHandler ledgerHandler) { - return ledgerHandler.asyncGetFirstLogRecord(); - } - }); - } - - /** - * Get Latest DLSN in the log. - * - * @return latest transaction id - */ - @Override - public Future<DLSN> getLastDLSNAsync() { - return getLastLogRecordAsyncInternal(false, false) - .map(RECORD_2_DLSN_FUNCTION); - } - - /** - * Get the number of log records in the active portion of the log - * Any log segments that have already been truncated will not be included - * - * @return number of log records - * @throws IOException - */ - @Override - public long getLogRecordCount() throws IOException { - checkClosedOrInError("getLogRecordCount"); - return FutureUtils.result(getLogRecordCountAsync(DLSN.InitialDLSN)); - } - - /** - * Get the number of log records in the active portion of the log asynchronously. - * Any log segments that have already been truncated will not be included - * - * @return future number of log records - * @throws IOException - */ - @Override - public Future<Long> getLogRecordCountAsync(final DLSN beginDLSN) { - return processReaderOperation(new Function<BKLogReadHandler, Future<Long>>() { - @Override - public Future<Long> apply(BKLogReadHandler ledgerHandler) { - return ledgerHandler.asyncGetLogRecordCount(beginDLSN); - } - }); - } - - @Override - public void recover() throws IOException { - recoverInternal(conf.getUnpartitionedStreamName()); - } - - /** - * Recover a specified stream within the log container - * The writer implicitly recovers a topic when it resumes writing. - * This allows applications to recover a container explicitly so - * that application may read a fully recovered log before resuming - * the writes - * - * @throws IOException if the recovery fails - */ - private void recoverInternal(String streamIdentifier) throws IOException { - checkClosedOrInError("recoverInternal"); - BKLogWriteHandler ledgerHandler = createWriteHandler(true); - try { - FutureUtils.result(ledgerHandler.recoverIncompleteLogSegments()); - } finally { - Utils.closeQuietly(ledgerHandler); - } - } - - /** - * Delete all the partitions of the specified log - * - * @throws IOException if the deletion fails - */ - @Override - public void delete() throws IOException { - FutureUtils.result(driver.getLogStreamMetadataStore(WRITER) - .deleteLog(uri, getStreamName())); - } - - /** - * The DistributedLogManager may archive/purge any logs for transactionId - * less than or equal to minImageTxId. - * This is to be used only when the client explicitly manages deletion. If - * the cleanup policy is based on sliding time window, then this method need - * not be called. - * - * @param minTxIdToKeep the earliest txid that must be retained - * @throws IOException if purging fails - */ - @Override - public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException { - Preconditions.checkArgument(minTxIdToKeep > 0, "Invalid transaction id " + minTxIdToKeep); - checkClosedOrInError("purgeLogSegmentsOlderThan"); - BKLogWriteHandler ledgerHandler = createWriteHandler(true); - try { - LOG.info("Purging logs for {} older than {}", ledgerHandler.getFullyQualifiedName(), minTxIdToKeep); - FutureUtils.result(ledgerHandler.purgeLogSegmentsOlderThanTxnId(minTxIdToKeep)); - } finally { - Utils.closeQuietly(ledgerHandler); - } - } - - static class PendingReaders implements AsyncCloseable { - - final ExecutorService executorService; - final Set<AsyncCloseable> readers = new HashSet<AsyncCloseable>(); - - PendingReaders(ExecutorService executorService) { - this.executorService = executorService; - } - - public synchronized void remove(AsyncCloseable reader) { - readers.remove(reader); - } - - public synchronized void add(AsyncCloseable reader) { - readers.add(reader); - } - - @Override - public Future<Void> asyncClose() { - return Utils.closeSequence(executorService, true, readers.toArray(new AsyncLogReader[readers.size()])) - .onSuccess(new AbstractFunction1<Void, BoxedUnit>() { - @Override - public BoxedUnit apply(Void value) { - readers.clear(); - return BoxedUnit.UNIT; - } - }); - } - }; - - /** - * Close the distributed log manager, freeing any resources it may hold. - */ - @Override - public Future<Void> asyncClose() { - Promise<Void> closeFuture; - BKLogReadHandler readHandlerToClose; - synchronized (this) { - if (null != closePromise) { - return closePromise; - } - closeFuture = closePromise = new Promise<Void>(); - readHandlerToClose = readHandlerForListener; - } - - Future<Void> closeResult = Utils.closeSequence(null, true, - readHandlerToClose, - pendingReaders, - resourcesCloseable.or(AsyncCloseable.NULL)); - closeResult.proxyTo(closeFuture); - return closeFuture; - } - - @Override - public void close() throws IOException { - FutureUtils.result(asyncClose()); - } - - @Override - public String toString() { - return String.format("DLM:%s:%s", getUri(), getStreamName()); - } - - public void raiseAlert(String msg, Object... args) { - alertStatsLogger.raise(msg, args); - } - - @Override - public SubscriptionsStore getSubscriptionsStore() { - return driver.getSubscriptionsStore(getStreamName()); - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java deleted file mode 100644 index a8b1f77..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java +++ /dev/null @@ -1,320 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.google.common.base.Optional; -import com.google.common.base.Ticker; -import com.twitter.distributedlog.acl.AccessControlManager; -import com.twitter.distributedlog.callback.NamespaceListener; -import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; -import com.twitter.distributedlog.exceptions.AlreadyClosedException; -import com.twitter.distributedlog.exceptions.InvalidStreamNameException; -import com.twitter.distributedlog.exceptions.LogNotFoundException; -import com.twitter.distributedlog.injector.AsyncFailureInjector; -import com.twitter.distributedlog.io.AsyncCloseable; -import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache; -import com.twitter.distributedlog.namespace.DistributedLogNamespace; -import com.twitter.distributedlog.namespace.NamespaceDriver; -import com.twitter.distributedlog.util.ConfUtils; -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.distributedlog.util.MonitoredScheduledThreadPoolExecutor; -import com.twitter.distributedlog.util.OrderedScheduler; -import com.twitter.distributedlog.util.PermitLimiter; -import com.twitter.distributedlog.util.SchedulerUtils; -import com.twitter.distributedlog.util.Utils; -import org.apache.bookkeeper.feature.FeatureProvider; -import org.apache.bookkeeper.stats.StatsLogger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.URI; -import java.util.Iterator; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import static com.twitter.distributedlog.namespace.NamespaceDriver.Role.WRITER; -import static com.twitter.distributedlog.util.DLUtils.validateName; - -/** - * BKDistributedLogNamespace is the default implementation of {@link DistributedLogNamespace}. It uses - * zookeeper for metadata storage and bookkeeper for data storage. - * <h3>Metrics</h3> - * - * <h4>ZooKeeper Client</h4> - * See {@link ZooKeeperClient} for detail sub-stats. - * <ul> - * <li> `scope`/dlzk_factory_writer_shared/* : stats about the zookeeper client shared by all DL writers. - * <li> `scope`/dlzk_factory_reader_shared/* : stats about the zookeeper client shared by all DL readers. - * <li> `scope`/bkzk_factory_writer_shared/* : stats about the zookeeper client used by bookkeeper client - * shared by all DL writers. - * <li> `scope`/bkzk_factory_reader_shared/* : stats about the zookeeper client used by bookkeeper client - * shared by all DL readers. - * </ul> - * - * <h4>BookKeeper Client</h4> - * BookKeeper client stats are exposed directly to current scope. See {@link BookKeeperClient} for detail stats. - * - * <h4>Utils</h4> - * <ul> - * <li> `scope`/factory/thread_pool/* : stats about the ordered scheduler used by this namespace. - * See {@link OrderedScheduler}. - * <li> `scope`/factory/readahead_thread_pool/* : stats about the readahead thread pool executor - * used by this namespace. See {@link MonitoredScheduledThreadPoolExecutor}. - * <li> `scope`/writeLimiter/* : stats about the global write limiter used by this namespace. - * See {@link PermitLimiter}. - * </ul> - * - * <h4>DistributedLogManager</h4> - * - * All the core stats about reader and writer are exposed under current scope via {@link BKDistributedLogManager}. - */ -public class BKDistributedLogNamespace implements DistributedLogNamespace { - static final Logger LOG = LoggerFactory.getLogger(BKDistributedLogNamespace.class); - - private final String clientId; - private final int regionId; - private final DistributedLogConfiguration conf; - private final URI namespace; - // namespace driver - private final NamespaceDriver driver; - // resources - private final OrderedScheduler scheduler; - private final PermitLimiter writeLimiter; - private final AsyncFailureInjector failureInjector; - // log segment metadata store - private final LogSegmentMetadataCache logSegmentMetadataCache; - // feature provider - private final FeatureProvider featureProvider; - // Stats Loggers - private final StatsLogger statsLogger; - private final StatsLogger perLogStatsLogger; - - protected final AtomicBoolean closed = new AtomicBoolean(false); - - public BKDistributedLogNamespace( - DistributedLogConfiguration conf, - URI uri, - NamespaceDriver driver, - OrderedScheduler scheduler, - FeatureProvider featureProvider, - PermitLimiter writeLimiter, - AsyncFailureInjector failureInjector, - StatsLogger statsLogger, - StatsLogger perLogStatsLogger, - String clientId, - int regionId) { - this.conf = conf; - this.namespace = uri; - this.driver = driver; - this.scheduler = scheduler; - this.featureProvider = featureProvider; - this.writeLimiter = writeLimiter; - this.failureInjector = failureInjector; - this.statsLogger = statsLogger; - this.perLogStatsLogger = perLogStatsLogger; - this.clientId = clientId; - this.regionId = regionId; - - // create a log segment metadata cache - this.logSegmentMetadataCache = new LogSegmentMetadataCache(conf, Ticker.systemTicker()); - } - - @Override - public NamespaceDriver getNamespaceDriver() { - return driver; - } - - // - // Namespace Methods - // - - @Override - public void createLog(String logName) - throws InvalidStreamNameException, IOException { - checkState(); - validateName(logName); - URI uri = FutureUtils.result(driver.getLogMetadataStore().createLog(logName)); - FutureUtils.result(driver.getLogStreamMetadataStore(WRITER).getLog(uri, logName, true, true)); - } - - @Override - public void deleteLog(String logName) - throws InvalidStreamNameException, LogNotFoundException, IOException { - checkState(); - validateName(logName); - Optional<URI> uri = FutureUtils.result(driver.getLogMetadataStore().getLogLocation(logName)); - if (!uri.isPresent()) { - throw new LogNotFoundException("Log " + logName + " isn't found."); - } - DistributedLogManager dlm = openLogInternal( - uri.get(), - logName, - Optional.<DistributedLogConfiguration>absent(), - Optional.<DynamicDistributedLogConfiguration>absent()); - dlm.delete(); - } - - @Override - public DistributedLogManager openLog(String logName) - throws InvalidStreamNameException, IOException { - return openLog(logName, - Optional.<DistributedLogConfiguration>absent(), - Optional.<DynamicDistributedLogConfiguration>absent(), - Optional.<StatsLogger>absent()); - } - - @Override - public DistributedLogManager openLog(String logName, - Optional<DistributedLogConfiguration> logConf, - Optional<DynamicDistributedLogConfiguration> dynamicLogConf, - Optional<StatsLogger> perStreamStatsLogger) - throws InvalidStreamNameException, IOException { - checkState(); - validateName(logName); - Optional<URI> uri = FutureUtils.result(driver.getLogMetadataStore().getLogLocation(logName)); - if (!uri.isPresent()) { - throw new LogNotFoundException("Log " + logName + " isn't found."); - } - return openLogInternal( - uri.get(), - logName, - logConf, - dynamicLogConf); - } - - @Override - public boolean logExists(String logName) - throws IOException, IllegalArgumentException { - checkState(); - Optional<URI> uri = FutureUtils.result(driver.getLogMetadataStore().getLogLocation(logName)); - if (uri.isPresent()) { - try { - FutureUtils.result(driver.getLogStreamMetadataStore(WRITER) - .logExists(uri.get(), logName)); - return true; - } catch (LogNotFoundException lnfe) { - return false; - } - } else { - return false; - } - } - - @Override - public Iterator<String> getLogs() throws IOException { - checkState(); - return FutureUtils.result(driver.getLogMetadataStore().getLogs()); - } - - @Override - public void registerNamespaceListener(NamespaceListener listener) { - driver.getLogMetadataStore().registerNamespaceListener(listener); - } - - @Override - public synchronized AccessControlManager createAccessControlManager() throws IOException { - checkState(); - return driver.getAccessControlManager(); - } - - /** - * Open the log in location <i>uri</i>. - * - * @param uri - * location to store the log - * @param nameOfLogStream - * name of the log - * @param logConfiguration - * optional stream configuration - * @param dynamicLogConfiguration - * dynamic stream configuration overrides. - * @return distributedlog manager instance. - * @throws InvalidStreamNameException if the stream name is invalid - * @throws IOException - */ - protected DistributedLogManager openLogInternal( - URI uri, - String nameOfLogStream, - Optional<DistributedLogConfiguration> logConfiguration, - Optional<DynamicDistributedLogConfiguration> dynamicLogConfiguration) - throws InvalidStreamNameException, IOException { - // Make sure the name is well formed - checkState(); - validateName(nameOfLogStream); - - DistributedLogConfiguration mergedConfiguration = new DistributedLogConfiguration(); - mergedConfiguration.addConfiguration(conf); - mergedConfiguration.loadStreamConf(logConfiguration); - // If dynamic config was not provided, default to a static view of the global configuration. - DynamicDistributedLogConfiguration dynConf = null; - if (dynamicLogConfiguration.isPresent()) { - dynConf = dynamicLogConfiguration.get(); - } else { - dynConf = ConfUtils.getConstDynConf(mergedConfiguration); - } - - return new BKDistributedLogManager( - nameOfLogStream, /* Log Name */ - mergedConfiguration, /* Configuration */ - dynConf, /* Dynamic Configuration */ - uri, /* Namespace URI */ - driver, /* Namespace Driver */ - logSegmentMetadataCache, /* Log Segment Metadata Cache */ - scheduler, /* DL scheduler */ - clientId, /* Client Id */ - regionId, /* Region Id */ - writeLimiter, /* Write Limiter */ - featureProvider.scope("dl"), /* Feature Provider */ - failureInjector, /* Failure Injector */ - statsLogger, /* Stats Logger */ - perLogStatsLogger, /* Per Log Stats Logger */ - Optional.<AsyncCloseable>absent() /* shared resources, we don't need to close any resources in dlm */ - ); - } - - /** - * Check the namespace state. - * - * @throws IOException - */ - private void checkState() throws IOException { - if (closed.get()) { - LOG.error("BK namespace {} is already closed", namespace); - throw new AlreadyClosedException("BK namespace " + namespace + " is already closed"); - } - } - - /** - * Close the distributed log manager factory, freeing any resources it may hold. - */ - @Override - public void close() { - if (!closed.compareAndSet(false, true)) { - return; - } - // shutdown the driver - Utils.close(driver); - // close the write limiter - this.writeLimiter.close(); - // Shutdown the schedulers - SchedulerUtils.shutdownScheduler(scheduler, conf.getSchedulerShutdownTimeoutMs(), - TimeUnit.MILLISECONDS); - LOG.info("Executor Service Stopped."); - } -}