http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/LogWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/LogWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/LogWriter.java deleted file mode 100644 index 8a4a30b..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/LogWriter.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog; - -import org.apache.distributedlog.io.Abortable; - -import java.io.Closeable; -import java.io.IOException; -import java.util.List; - -/* -* A generic interface class to support writing log records into -* a persistent distributed log. -*/ -public interface LogWriter extends Closeable, Abortable { - /** - * Write a log record to the stream. - * - * @param record single log record - * @throws IOException - */ - public void write(LogRecord record) throws IOException; - - - /** - * Write a list of log records to the stream. - * - * @param records list of log records - * @throws IOException - */ - @Deprecated - public int writeBulk(List<LogRecord> records) throws IOException; - - /** - * All data that has been written to the stream so far will be sent to - * persistent storage. - * The transmission is asynchronous and new data can be still written to the - * stream while flushing is performed. - * - * TODO: rename this to flush() - */ - public long setReadyToFlush() throws IOException; - - /** - * Flush and sync all data that is ready to be flush - * {@link #setReadyToFlush()} into underlying persistent store. - * @throws IOException - * - * TODO: rename this to commit() - */ - public long flushAndSync() throws IOException; - - /** - * Flushes all the data up to this point, - * adds the end of stream marker and marks the stream - * as read-only in the metadata. No appends to the - * stream will be allowed after this point - * - * @throws IOException - */ - public void markEndOfStream() throws IOException; - -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/MetadataAccessor.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/MetadataAccessor.java b/distributedlog-core/src/main/java/org/apache/distributedlog/MetadataAccessor.java deleted file mode 100644 index 3d1d601..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/MetadataAccessor.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog; - -import org.apache.distributedlog.io.AsyncCloseable; - -import java.io.Closeable; -import java.io.IOException; - -public interface MetadataAccessor extends Closeable, AsyncCloseable { - /** - * Get the name of the stream managed by this log manager - * @return streamName - */ - public String getStreamName(); - - public void createOrUpdateMetadata(byte[] metadata) throws IOException; - - public void deleteMetadata() throws IOException; - - public byte[] getMetadata() throws IOException; - - /** - * Close the distributed log metadata, freeing any resources it may hold. - */ - public void close() throws IOException; - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java index f94a6e0..386a9a1 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java @@ -21,6 +21,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import com.google.common.base.Ticker; import com.google.common.collect.Lists; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledFuture; +import java.util.function.Function; import org.apache.distributedlog.callback.LogSegmentListener; import org.apache.distributedlog.exceptions.AlreadyTruncatedTransactionException; import org.apache.distributedlog.exceptions.DLIllegalStateException; @@ -32,19 +35,13 @@ import org.apache.distributedlog.io.AsyncCloseable; import org.apache.distributedlog.logsegment.LogSegmentEntryReader; import org.apache.distributedlog.logsegment.LogSegmentEntryStore; import org.apache.distributedlog.logsegment.LogSegmentFilter; +import org.apache.distributedlog.common.concurrent.FutureEventListener; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.distributedlog.util.OrderedScheduler; -import com.twitter.util.Function0; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Futures; -import com.twitter.util.Promise; import org.apache.bookkeeper.stats.AlertStatsLogger; import org.apache.bookkeeper.versioning.Versioned; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Function1; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; import java.io.IOException; import java.util.LinkedList; @@ -52,7 +49,6 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -75,12 +71,9 @@ class ReadAheadEntryReader implements // Static Functions // - private static AbstractFunction1<LogSegmentEntryReader, BoxedUnit> START_READER_FUNC = new AbstractFunction1<LogSegmentEntryReader, BoxedUnit>() { - @Override - public BoxedUnit apply(LogSegmentEntryReader reader) { - reader.start(); - return BoxedUnit.UNIT; - } + private static Function<LogSegmentEntryReader, Void> START_READER_FUNC = reader -> { + reader.start(); + return null; }; // @@ -91,7 +84,7 @@ class ReadAheadEntryReader implements private LogSegmentMetadata metadata; private final long startEntryId; - private Future<LogSegmentEntryReader> openFuture = null; + private CompletableFuture<LogSegmentEntryReader> openFuture = null; private LogSegmentEntryReader reader = null; private boolean isStarted = false; private boolean isClosed = false; @@ -122,7 +115,7 @@ class ReadAheadEntryReader implements if (null != openFuture) { return; } - openFuture = entryStore.openReader(metadata, startEntryId).addEventListener(this); + openFuture = entryStore.openReader(metadata, startEntryId).whenComplete(this); } synchronized boolean isReaderStarted() { @@ -137,16 +130,16 @@ class ReadAheadEntryReader implements if (null != reader) { reader.start(); } else { - openFuture.onSuccess(START_READER_FUNC); + openFuture.thenApply(START_READER_FUNC); } } - synchronized Future<List<Entry.Reader>> readNext() { + synchronized CompletableFuture<List<Entry.Reader>> readNext() { if (null != reader) { checkCatchingUpStatus(reader); return reader.readNext(numReadAheadEntries); } else { - return openFuture.flatMap(readFunc); + return openFuture.thenCompose(readFunc); } } @@ -155,14 +148,10 @@ class ReadAheadEntryReader implements reader.onLogSegmentMetadataUpdated(segment); this.metadata = segment; } else { - openFuture.onSuccess(new AbstractFunction1<LogSegmentEntryReader, BoxedUnit>() { - @Override - public BoxedUnit apply(LogSegmentEntryReader reader) { - reader.onLogSegmentMetadataUpdated(segment); - synchronized (SegmentReader.this) { - SegmentReader.this.metadata = segment; - } - return BoxedUnit.UNIT; + openFuture.thenAccept(reader1 -> { + reader1.onLogSegmentMetadataUpdated(segment); + synchronized (SegmentReader.this) { + SegmentReader.this.metadata = segment; } }); } @@ -185,28 +174,21 @@ class ReadAheadEntryReader implements return isClosed; } - synchronized Future<Void> close() { + synchronized CompletableFuture<Void> close() { if (null == openFuture) { - return Future.Void(); + return FutureUtils.Void(); } - return openFuture.flatMap(new AbstractFunction1<LogSegmentEntryReader, Future<Void>>() { - @Override - public Future<Void> apply(LogSegmentEntryReader reader) { - return reader.asyncClose(); - } - }).ensure(new Function0<BoxedUnit>() { - @Override - public BoxedUnit apply() { + return FutureUtils.ensure( + openFuture.thenCompose(reader1 -> reader1.asyncClose()), + () -> { synchronized (SegmentReader.this) { isClosed = true; } - return null; - } - }); + }); } } - private class ReadEntriesFunc extends AbstractFunction1<LogSegmentEntryReader, Future<List<Entry.Reader>>> { + private class ReadEntriesFunc implements Function<LogSegmentEntryReader, CompletableFuture<List<Entry.Reader>>> { private final int numEntries; @@ -215,7 +197,7 @@ class ReadAheadEntryReader implements } @Override - public Future<List<Entry.Reader>> apply(LogSegmentEntryReader reader) { + public CompletableFuture<List<Entry.Reader>> apply(LogSegmentEntryReader reader) { checkCatchingUpStatus(reader); return reader.readNext(numEntries); } @@ -244,14 +226,8 @@ class ReadAheadEntryReader implements // // Functions // - private final Function1<LogSegmentEntryReader, Future<List<Entry.Reader>>> readFunc; - private final Function0<BoxedUnit> removeClosedSegmentReadersFunc = new Function0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - removeClosedSegmentReaders(); - return BoxedUnit.UNIT; - } - }; + private final Function<LogSegmentEntryReader, CompletableFuture<List<Entry.Reader>>> readFunc; + private final Runnable removeClosedSegmentReadersFunc = () -> removeClosedSegmentReaders(); // // Resources @@ -282,7 +258,7 @@ class ReadAheadEntryReader implements private final AtomicBoolean started = new AtomicBoolean(false); private boolean isInitialized = false; private boolean readAheadPaused = false; - private Promise<Void> closePromise = null; + private CompletableFuture<Void> closePromise = null; // segment readers private long currentSegmentSequenceNumber; private SegmentReader currentSegmentReader; @@ -344,15 +320,12 @@ class ReadAheadEntryReader implements private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() { if (idleWarnThresholdMillis < Integer.MAX_VALUE && idleWarnThresholdMillis > 0) { - return scheduler.scheduleAtFixedRate(streamName, new Runnable() { - @Override - public void run() { - if (!isReaderIdle(idleWarnThresholdMillis, TimeUnit.MILLISECONDS)) { - return; - } - // the readahead has been idle - unsafeCheckIfReadAheadIsIdle(); + return scheduler.scheduleAtFixedRate(streamName, () -> { + if (!isReaderIdle(idleWarnThresholdMillis, TimeUnit.MILLISECONDS)) { + return; } + // the readahead has been idle + unsafeCheckIfReadAheadIsIdle(); }, idleWarnThresholdMillis, idleWarnThresholdMillis, TimeUnit.MILLISECONDS); } return null; @@ -366,7 +339,7 @@ class ReadAheadEntryReader implements LogSegmentMetadata.COMPARATOR, LogSegmentFilter.DEFAULT_FILTER, null - ).addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { + ).whenComplete(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() { @Override public void onFailure(Throwable cause) { // do nothing here since it would be retried on next idle reader check task @@ -459,13 +432,13 @@ class ReadAheadEntryReader implements } @Override - public Future<Void> asyncClose() { - final Promise<Void> closeFuture; + public CompletableFuture<Void> asyncClose() { + final CompletableFuture<Void> closeFuture; synchronized (this) { if (null != closePromise) { return closePromise; } - closePromise = closeFuture = new Promise<Void>(); + closePromise = closeFuture = new CompletableFuture<Void>(); } // cancel the idle reader task @@ -489,8 +462,8 @@ class ReadAheadEntryReader implements return closeFuture; } - private void unsafeAsyncClose(Promise<Void> closePromise) { - List<Future<Void>> closeFutures = Lists.newArrayListWithExpectedSize( + private void unsafeAsyncClose(CompletableFuture<Void> closePromise) { + List<CompletableFuture<Void>> closeFutures = Lists.newArrayListWithExpectedSize( segmentReaders.size() + segmentReadersToClose.size() + 1); if (null != currentSegmentReader) { segmentReadersToClose.add(currentSegmentReader); @@ -505,7 +478,9 @@ class ReadAheadEntryReader implements for (SegmentReader reader : segmentReadersToClose) { closeFutures.add(reader.close()); } - Futures.collect(closeFutures).proxyTo(closePromise); + FutureUtils.proxyTo( + FutureUtils.collect(closeFutures).thenApply((value) -> null), + closePromise); } // @@ -921,7 +896,9 @@ class ReadAheadEntryReader implements private void unsafeMoveToNextLogSegment() { if (null != currentSegmentReader) { segmentReadersToClose.add(currentSegmentReader); - currentSegmentReader.close().ensure(removeClosedSegmentReadersFunc); + FutureUtils.ensure( + currentSegmentReader.close(), + removeClosedSegmentReadersFunc); logger.debug("close current segment reader {}", currentSegmentReader.getSegment()); currentSegmentReader = null; } @@ -971,7 +948,7 @@ class ReadAheadEntryReader implements } private void unsafeReadNext(SegmentReader reader) { - reader.readNext().addEventListener(this); + reader.readNext().whenComplete(this); } @Override http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/ReadUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/ReadUtils.java b/distributedlog-core/src/main/java/org/apache/distributedlog/ReadUtils.java index 9935d5f..bf4e140 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/ReadUtils.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/ReadUtils.java @@ -19,6 +19,7 @@ package org.apache.distributedlog; import java.io.IOException; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -31,14 +32,10 @@ import org.apache.distributedlog.selector.FirstDLSNNotLessThanSelector; import org.apache.distributedlog.selector.FirstTxIdNotLessThanSelector; import org.apache.distributedlog.selector.LastRecordSelector; import org.apache.distributedlog.selector.LogRecordSelector; -import org.apache.distributedlog.util.FutureUtils.FutureEventListenerRunnable; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; +import org.apache.distributedlog.common.concurrent.FutureEventListener; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction0; -import scala.runtime.BoxedUnit; /** * Utility function for readers @@ -78,7 +75,7 @@ public class ReadUtils { * log segment entry store * @return a future with last record. */ - public static Future<LogRecordWithDLSN> asyncReadLastRecord( + public static CompletableFuture<LogRecordWithDLSN> asyncReadLastRecord( final String streamName, final LogSegmentMetadata l, final boolean fence, @@ -116,7 +113,7 @@ public class ReadUtils { * threshold dlsn * @return a future with last record. */ - public static Future<LogRecordWithDLSN> asyncReadFirstUserRecord( + public static CompletableFuture<LogRecordWithDLSN> asyncReadFirstUserRecord( final String streamName, final LogSegmentMetadata l, final int scanStartBatchSize, @@ -243,14 +240,14 @@ public class ReadUtils { * scan context * @return a future with the log record. */ - private static Future<LogRecordWithDLSN> asyncReadRecordFromEntries( + private static CompletableFuture<LogRecordWithDLSN> asyncReadRecordFromEntries( final String streamName, final LogSegmentRandomAccessEntryReader reader, final LogSegmentMetadata metadata, final ExecutorService executorService, final ScanContext context, final LogRecordSelector selector) { - final Promise<LogRecordWithDLSN> promise = new Promise<LogRecordWithDLSN>(); + final CompletableFuture<LogRecordWithDLSN> promise = new CompletableFuture<LogRecordWithDLSN>(); final long startEntryId = context.curStartEntryId.get(); final long endEntryId = context.curEndEntryId.get(); if (LOG.isDebugEnabled()) { @@ -271,7 +268,7 @@ public class ReadUtils { } catch (IOException ioe) { // exception is only thrown due to bad ledger entry, so it might be corrupted // we shouldn't do anything beyond this point. throw the exception to application - promise.setException(ioe); + promise.completeExceptionally(ioe); return; } } @@ -282,16 +279,16 @@ public class ReadUtils { new Object[]{streamName, startEntryId, endEntryId, metadata, record}); } - promise.setValue(record); + promise.complete(record); } @Override public void onFailure(final Throwable cause) { - promise.setException(cause); + promise.completeExceptionally(cause); } }; reader.readEntries(startEntryId, endEntryId) - .addEventListener(FutureEventListenerRunnable.of(readEntriesListener, executorService)); + .whenCompleteAsync(readEntriesListener, executorService); return promise; } @@ -343,7 +340,7 @@ public class ReadUtils { final LogSegmentRandomAccessEntryReader reader, final LogSegmentMetadata metadata, final ExecutorService executorService, - final Promise<LogRecordWithDLSN> promise, + final CompletableFuture<LogRecordWithDLSN> promise, final ScanContext context, final LogRecordSelector selector) { FutureEventListener<LogRecordWithDLSN> readEntriesListener = @@ -356,12 +353,12 @@ public class ReadUtils { metadata, value}); } if (null != value) { - promise.setValue(value); + promise.complete(value); return; } if (!context.moveToNextRange()) { // no entries to read again - promise.setValue(null); + promise.complete(null); return; } // scan next range @@ -376,11 +373,11 @@ public class ReadUtils { @Override public void onFailure(Throwable cause) { - promise.setException(cause); + promise.completeExceptionally(cause); } }; asyncReadRecordFromEntries(streamName, reader, metadata, executorService, context, selector) - .addEventListener(FutureEventListenerRunnable.of(readEntriesListener, executorService)); + .whenCompleteAsync(readEntriesListener, executorService); } private static void asyncReadRecordFromLogSegment( @@ -392,7 +389,7 @@ public class ReadUtils { final int scanMaxBatchSize, final boolean includeControl, final boolean includeEndOfStream, - final Promise<LogRecordWithDLSN> promise, + final CompletableFuture<LogRecordWithDLSN> promise, final AtomicInteger numRecordsScanned, final LogRecordSelector selector, final boolean backward, @@ -402,7 +399,7 @@ public class ReadUtils { if (LOG.isDebugEnabled()) { LOG.debug("Log segment {} is empty for {}.", new Object[] { metadata, streamName }); } - promise.setValue(null); + promise.complete(null); return; } final ScanContext context = new ScanContext( @@ -413,7 +410,7 @@ public class ReadUtils { promise, context, selector); } - private static Future<LogRecordWithDLSN> asyncReadRecord( + private static CompletableFuture<LogRecordWithDLSN> asyncReadRecord( final String streamName, final LogSegmentMetadata l, final boolean fence, @@ -428,7 +425,7 @@ public class ReadUtils { final boolean backward, final long startEntryId) { - final Promise<LogRecordWithDLSN> promise = new Promise<LogRecordWithDLSN>(); + final CompletableFuture<LogRecordWithDLSN> promise = new CompletableFuture<LogRecordWithDLSN>(); FutureEventListener<LogSegmentRandomAccessEntryReader> openReaderListener = new FutureEventListener<LogSegmentRandomAccessEntryReader>() { @@ -438,13 +435,7 @@ public class ReadUtils { LOG.debug("{} Opened log segment {} for reading record", streamName, l); } - promise.ensure(new AbstractFunction0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - reader.asyncClose(); - return BoxedUnit.UNIT; - } - }); + promise.whenComplete((value, cause) -> reader.asyncClose()); if (LOG.isDebugEnabled()) { LOG.debug("{} {} scanning {}.", new Object[]{ (backward ? "backward" : "forward"), streamName, l}); @@ -458,11 +449,11 @@ public class ReadUtils { @Override public void onFailure(final Throwable cause) { - promise.setException(cause); + promise.completeExceptionally(cause); } }; entryStore.openRandomAccessReader(l, fence) - .addEventListener(FutureEventListenerRunnable.of(openReaderListener, executorService)); + .whenCompleteAsync(openReaderListener, executorService); return promise; } @@ -499,7 +490,7 @@ public class ReadUtils { * how many number of entries to search in parallel * @return found log record. none if all transaction ids are less than provided <code>transactionId</code>. */ - public static Future<Optional<LogRecordWithDLSN>> getLogRecordNotLessThanTxId( + public static CompletableFuture<Optional<LogRecordWithDLSN>> getLogRecordNotLessThanTxId( final String logName, final LogSegmentMetadata segment, final long transactionId, @@ -511,30 +502,23 @@ public class ReadUtils { // all log records whose transaction id is less than provided transactionId // then return none Optional<LogRecordWithDLSN> noneRecord = Optional.absent(); - return Future.value(noneRecord); + return FutureUtils.value(noneRecord); } } - final Promise<Optional<LogRecordWithDLSN>> promise = - new Promise<Optional<LogRecordWithDLSN>>(); + final CompletableFuture<Optional<LogRecordWithDLSN>> promise = + new CompletableFuture<Optional<LogRecordWithDLSN>>(); final FutureEventListener<LogSegmentRandomAccessEntryReader> openReaderListener = new FutureEventListener<LogSegmentRandomAccessEntryReader>() { @Override public void onSuccess(final LogSegmentRandomAccessEntryReader reader) { - promise.ensure(new AbstractFunction0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - reader.asyncClose(); - return BoxedUnit.UNIT; - } - - }); + promise.whenComplete((value, cause) -> reader.asyncClose()); long lastEntryId = reader.getLastAddConfirmed(); if (lastEntryId < 0) { // it means that the log segment is created but not written yet or an empty log segment. // it is equivalent to 'all log records whose transaction id is less than provided transactionId' Optional<LogRecordWithDLSN> nonRecord = Optional.absent(); - promise.setValue(nonRecord); + promise.complete(nonRecord); return; } // all log records whose transaction id is not less than provided transactionId @@ -548,15 +532,15 @@ public class ReadUtils { executorService, new SingleEntryScanContext(0L), selector - ).addEventListener(new FutureEventListener<LogRecordWithDLSN>() { + ).whenComplete(new FutureEventListener<LogRecordWithDLSN>() { @Override public void onSuccess(LogRecordWithDLSN value) { - promise.setValue(Optional.of(selector.result())); + promise.complete(Optional.of(selector.result())); } @Override public void onFailure(Throwable cause) { - promise.setException(cause); + promise.completeExceptionally(cause); } }); @@ -576,12 +560,12 @@ public class ReadUtils { @Override public void onFailure(final Throwable cause) { - promise.setException(cause); + promise.completeExceptionally(cause); } }; entryStore.openRandomAccessReader(segment, false) - .addEventListener(FutureEventListenerRunnable.of(openReaderListener, executorService)); + .whenCompleteAsync(openReaderListener, executorService); return promise; } @@ -617,12 +601,12 @@ public class ReadUtils { final List<Long> entriesToSearch, final int nWays, final Optional<LogRecordWithDLSN> prevFoundRecord, - final Promise<Optional<LogRecordWithDLSN>> promise) { - final List<Future<LogRecordWithDLSN>> searchResults = + final CompletableFuture<Optional<LogRecordWithDLSN>> promise) { + final List<CompletableFuture<LogRecordWithDLSN>> searchResults = Lists.newArrayListWithExpectedSize(entriesToSearch.size()); for (Long entryId : entriesToSearch) { LogRecordSelector selector = new FirstTxIdNotLessThanSelector(transactionId); - Future<LogRecordWithDLSN> searchResult = asyncReadRecordFromEntries( + CompletableFuture<LogRecordWithDLSN> searchResult = asyncReadRecordFromEntries( logName, reader, segment, @@ -649,11 +633,11 @@ public class ReadUtils { @Override public void onFailure(Throwable cause) { - promise.setException(cause); + promise.completeExceptionally(cause); } }; - Future.collect(searchResults).addEventListener( - FutureEventListenerRunnable.of(processSearchResultsListener, executorService)); + FutureUtils.collect(searchResults).whenCompleteAsync( + processSearchResultsListener, executorService); } /** @@ -668,7 +652,7 @@ public class ReadUtils { final List<LogRecordWithDLSN> searchResults, final int nWays, final Optional<LogRecordWithDLSN> prevFoundRecord, - final Promise<Optional<LogRecordWithDLSN>> promise) { + final CompletableFuture<Optional<LogRecordWithDLSN>> promise) { int found = -1; for (int i = 0; i < searchResults.size(); i++) { LogRecordWithDLSN record = searchResults.get(i); @@ -678,7 +662,7 @@ public class ReadUtils { } } if (found == -1) { // all log records' transaction id is less than provided transaction id - promise.setValue(prevFoundRecord); + promise.complete(prevFoundRecord); return; } // we found a log record @@ -691,7 +675,7 @@ public class ReadUtils { if (foundRecord.getDlsn().getSlotId() != 0L || found == 0 || foundRecord.getDlsn().getEntryId() == (searchResults.get(found - 1).getDlsn().getEntryId() + 1)) { - promise.setValue(Optional.of(foundRecord)); + promise.complete(Optional.of(foundRecord)); return; } @@ -702,7 +686,7 @@ public class ReadUtils { searchResults.get(found), nWays); if (nextSearchBatch.isEmpty()) { - promise.setValue(prevFoundRecord); + promise.complete(prevFoundRecord); return; } getLogRecordNotLessThanTxIdFromEntries( http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/WriteLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/WriteLimiter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/WriteLimiter.java index a2109f4..04bb9e4 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/WriteLimiter.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/WriteLimiter.java @@ -18,7 +18,7 @@ package org.apache.distributedlog; import org.apache.distributedlog.exceptions.OverCapacityException; -import org.apache.distributedlog.util.PermitLimiter; +import org.apache.distributedlog.common.util.PermitLimiter; class WriteLimiter { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/admin/DistributedLogAdmin.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/admin/DistributedLogAdmin.java b/distributedlog-core/src/main/java/org/apache/distributedlog/admin/DistributedLogAdmin.java index 4e94984..c4939c0 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/admin/DistributedLogAdmin.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/admin/DistributedLogAdmin.java @@ -19,12 +19,19 @@ package org.apache.distributedlog.admin; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import org.apache.distributedlog.DistributedLogManager; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import org.apache.bookkeeper.util.IOUtils; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.distributedlog.api.DistributedLogManager; import org.apache.distributedlog.LogRecordWithDLSN; import org.apache.distributedlog.LogSegmentMetadata; import org.apache.distributedlog.ReadUtils; import org.apache.distributedlog.ZooKeeperClient; import org.apache.distributedlog.ZooKeeperClientBuilder; +import org.apache.distributedlog.api.namespace.Namespace; import org.apache.distributedlog.impl.BKNamespaceDriver; import org.apache.distributedlog.impl.acl.ZKAccessControl; import org.apache.distributedlog.exceptions.DLIllegalStateException; @@ -35,21 +42,12 @@ import org.apache.distributedlog.metadata.DLMetadata; import org.apache.distributedlog.metadata.DryrunLogSegmentMetadataStoreUpdater; import org.apache.distributedlog.metadata.MetadataUpdater; import org.apache.distributedlog.metadata.LogSegmentMetadataStoreUpdater; -import org.apache.distributedlog.namespace.DistributedLogNamespace; import org.apache.distributedlog.namespace.NamespaceDriver; import org.apache.distributedlog.thrift.AccessControlEntry; import org.apache.distributedlog.tools.DistributedLogTool; -import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.distributedlog.util.OrderedScheduler; -import org.apache.distributedlog.util.SchedulerUtils; -import com.twitter.util.Await; -import com.twitter.util.Function; -import com.twitter.util.Future; -import org.apache.bookkeeper.stats.NullStatsLogger; -import org.apache.bookkeeper.util.IOUtils; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; +import org.apache.distributedlog.common.util.SchedulerUtils; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,11 +93,11 @@ public class DistributedLogAdmin extends DistributedLogTool { * is confirmation needed before executing actual action. * @throws IOException */ - public static void fixInprogressSegmentWithLowerSequenceNumber(final DistributedLogNamespace namespace, + public static void fixInprogressSegmentWithLowerSequenceNumber(final Namespace namespace, final MetadataUpdater metadataUpdater, final String streamName, final boolean verbose, - final boolean interactive) throws IOException { + final boolean interactive) throws Exception { DistributedLogManager dlm = namespace.openLog(streamName); try { List<LogSegmentMetadata> segments = dlm.getLogSegments(); @@ -193,21 +191,21 @@ public class DistributedLogAdmin extends DistributedLogTool { } public static void checkAndRepairDLNamespace(final URI uri, - final DistributedLogNamespace namespace, + final Namespace namespace, final MetadataUpdater metadataUpdater, final OrderedScheduler scheduler, final boolean verbose, - final boolean interactive) throws IOException { + final boolean interactive) throws Exception { checkAndRepairDLNamespace(uri, namespace, metadataUpdater, scheduler, verbose, interactive, 1); } public static void checkAndRepairDLNamespace(final URI uri, - final DistributedLogNamespace namespace, + final Namespace namespace, final MetadataUpdater metadataUpdater, final OrderedScheduler scheduler, final boolean verbose, final boolean interactive, - final int concurrency) throws IOException { + final int concurrency) throws Exception { Preconditions.checkArgument(concurrency > 0, "Invalid concurrency " + concurrency + " found."); // 0. getting streams under a given uri. Iterator<String> streamsIter = namespace.getLogs(); @@ -247,7 +245,7 @@ public class DistributedLogAdmin extends DistributedLogTool { } private static Map<String, StreamCandidate> checkStreams( - final DistributedLogNamespace namespace, + final Namespace namespace, final Collection<String> streams, final OrderedScheduler scheduler, final int concurrency) throws IOException { @@ -274,7 +272,7 @@ public class DistributedLogAdmin extends DistributedLogTool { LOG.info("Checking stream {}.", stream); candidate = checkStream(namespace, stream, scheduler); LOG.info("Checked stream {} - {}.", stream, candidate); - } catch (IOException e) { + } catch (Throwable e) { LOG.error("Error on checking stream {} : ", stream, e); doneLatch.countDown(); break; @@ -313,7 +311,7 @@ public class DistributedLogAdmin extends DistributedLogTool { } private static StreamCandidate checkStream( - final DistributedLogNamespace namespace, + final Namespace namespace, final String streamName, final OrderedScheduler scheduler) throws IOException { DistributedLogManager dlm = namespace.openLog(streamName); @@ -322,14 +320,14 @@ public class DistributedLogAdmin extends DistributedLogTool { if (segments.isEmpty()) { return null; } - List<Future<LogSegmentCandidate>> futures = - new ArrayList<Future<LogSegmentCandidate>>(segments.size()); + List<CompletableFuture<LogSegmentCandidate>> futures = + new ArrayList<CompletableFuture<LogSegmentCandidate>>(segments.size()); for (LogSegmentMetadata segment : segments) { futures.add(checkLogSegment(namespace, streamName, segment, scheduler)); } List<LogSegmentCandidate> segmentCandidates; try { - segmentCandidates = Await.result(Future.collect(futures)); + segmentCandidates = FutureUtils.result(FutureUtils.collect(futures)); } catch (Exception e) { throw new IOException("Failed on checking stream " + streamName, e); } @@ -348,13 +346,13 @@ public class DistributedLogAdmin extends DistributedLogTool { } } - private static Future<LogSegmentCandidate> checkLogSegment( - final DistributedLogNamespace namespace, + private static CompletableFuture<LogSegmentCandidate> checkLogSegment( + final Namespace namespace, final String streamName, final LogSegmentMetadata metadata, final OrderedScheduler scheduler) { if (metadata.isInProgress()) { - return Future.value(null); + return FutureUtils.value(null); } final LogSegmentEntryStore entryStore = namespace.getNamespaceDriver() @@ -370,7 +368,7 @@ public class DistributedLogAdmin extends DistributedLogTool { new AtomicInteger(0), scheduler, entryStore - ).map(new Function<LogRecordWithDLSN, LogSegmentCandidate>() { + ).thenApply(new Function<LogRecordWithDLSN, LogSegmentCandidate>() { @Override public LogSegmentCandidate apply(LogRecordWithDLSN record) { if (null != record && @@ -388,7 +386,7 @@ public class DistributedLogAdmin extends DistributedLogTool { private static boolean repairStream(MetadataUpdater metadataUpdater, StreamCandidate streamCandidate, boolean verbose, - boolean interactive) throws IOException { + boolean interactive) throws Exception { if (verbose) { System.out.println("Stream " + streamCandidate.streamName + " : "); for (LogSegmentCandidate segmentCandidate : streamCandidate.segmentCandidates) { @@ -863,7 +861,7 @@ public class DistributedLogAdmin extends DistributedLogTool { protected ZKAccessControl getZKAccessControl(ZooKeeperClient zkc, String zkPath) throws Exception { ZKAccessControl accessControl; try { - accessControl = Await.result(ZKAccessControl.read(zkc, zkPath, null)); + accessControl = FutureUtils.result(ZKAccessControl.read(zkc, zkPath, null)); } catch (KeeperException.NoNodeException nne) { accessControl = new ZKAccessControl(new AccessControlEntry(), zkPath); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/api/AsyncLogReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/AsyncLogReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/AsyncLogReader.java new file mode 100644 index 0000000..3838bf7 --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/AsyncLogReader.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.api; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.apache.distributedlog.LogRecordWithDLSN; +import org.apache.distributedlog.io.AsyncCloseable; + +public interface AsyncLogReader extends AsyncCloseable { + + /** + * Get stream name that the reader reads from. + * + * @return stream name. + */ + public String getStreamName(); + + /** + * Read the next record from the log stream + * + * @return A promise that when satisfied will contain the Log Record with its DLSN. + */ + public CompletableFuture<LogRecordWithDLSN> readNext(); + + /** + * Read next <i>numEntries</i> entries. The future is only satisfied with non-empty list + * of entries. It doesn't block until returning exact <i>numEntries</i>. It is a best effort + * call. + * + * @param numEntries + * num entries + * @return A promise that when satisfied will contain a non-empty list of records with their DLSN. + */ + public CompletableFuture<List<LogRecordWithDLSN>> readBulk(int numEntries); + + /** + * Read next <i>numEntries</i> entries in a given <i>waitTime</i>. + * <p> + * The future is satisfied when either reads <i>numEntries</i> entries or reaches <i>waitTime</i>. + * The only exception is if there isn't any new entries written within <i>waitTime</i>, it would + * wait until new entries are available. + * + * @param numEntries + * max entries to return + * @param waitTime + * maximum wait time if there are entries already for read + * @param timeUnit + * wait time unit + * @return A promise that when satisfied will contain a non-empty list of records with their DLSN. + */ + public CompletableFuture<List<LogRecordWithDLSN>> readBulk(int numEntries, long waitTime, TimeUnit timeUnit); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/api/AsyncLogWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/AsyncLogWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/AsyncLogWriter.java new file mode 100644 index 0000000..9e12de2 --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/AsyncLogWriter.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.api; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.apache.distributedlog.DLSN; +import org.apache.distributedlog.LogRecord; +import org.apache.distributedlog.io.AsyncAbortable; +import org.apache.distributedlog.io.AsyncCloseable; + +public interface AsyncLogWriter extends AsyncCloseable, AsyncAbortable { + + /** + * Get the last committed transaction id. + * + * @return last committed transaction id. + */ + public long getLastTxId(); + + /** + * Write a log record to the stream. + * + * @param record single log record + * @return A Future which contains a DLSN if the record was successfully written + * or an exception if the write fails + */ + public CompletableFuture<DLSN> write(LogRecord record); + + /** + * Write log records to the stream in bulk. Each future in the list represents the result of + * one write operation. The size of the result list is equal to the size of the input list. + * Buffers are written in order, and the list of result futures has the same order. + * + * @param record set of log records + * @return A Future which contains a list of Future DLSNs if the record was successfully written + * or an exception if the operation fails. + */ + public CompletableFuture<List<CompletableFuture<DLSN>>> writeBulk(List<LogRecord> record); + + /** + * Truncate the log until <i>dlsn</i>. + * + * @param dlsn + * dlsn to truncate until. + * @return A Future indicates whether the operation succeeds or not, or an exception + * if the truncation fails. + */ + public CompletableFuture<Boolean> truncate(DLSN dlsn); + + /** + * Get the name of the stream this writer writes data to + */ + public String getStreamName(); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/api/DistributedLogManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/DistributedLogManager.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/DistributedLogManager.java new file mode 100644 index 0000000..60f629d --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/DistributedLogManager.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.api; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.apache.distributedlog.AppendOnlyStreamReader; +import org.apache.distributedlog.AppendOnlyStreamWriter; +import org.apache.distributedlog.DLSN; +import org.apache.distributedlog.LogRecordWithDLSN; +import org.apache.distributedlog.LogSegmentMetadata; +import org.apache.distributedlog.callback.LogSegmentListener; +import org.apache.distributedlog.io.AsyncCloseable; +import org.apache.distributedlog.namespace.NamespaceDriver; +import org.apache.distributedlog.api.subscription.SubscriptionsStore; + +/** + * A DistributedLogManager is responsible for managing a single place of storing + * edit logs. It may correspond to multiple files, a backup node, etc. + * Even when the actual underlying storage is rolled, or failed and restored, + * each conceptual place of storage corresponds to exactly one instance of + * this class, which is created when the EditLog is first opened. + */ +public interface DistributedLogManager extends AsyncCloseable, Closeable { + + /** + * Get the name of the stream managed by this log manager + * @return streamName + */ + public String getStreamName(); + + /** + * Get the namespace driver used by this manager. + * + * @return the namespace driver + */ + public NamespaceDriver getNamespaceDriver(); + + /** + * Get log segments. + * + * @return log segments + * @throws IOException + */ + public List<LogSegmentMetadata> getLogSegments() throws IOException; + + /** + * Register <i>listener</i> on log segment updates of this stream. + * + * @param listener + * listener to receive update log segment list. + */ + public void registerListener(LogSegmentListener listener) throws IOException ; + + /** + * Unregister <i>listener</i> on log segment updates from this stream. + * + * @param listener + * listener to receive update log segment list. + */ + public void unregisterListener(LogSegmentListener listener); + + /** + * Open async log writer to write records to the log stream. + * + * @return result represents the open result + */ + public CompletableFuture<AsyncLogWriter> openAsyncLogWriter(); + + /** + * Begin writing to the log stream identified by the name + * + * @return the writer interface to generate log records + */ + public LogWriter startLogSegmentNonPartitioned() throws IOException; + + /** + * Begin writing to the log stream identified by the name + * + * @return the writer interface to generate log records + */ + // @Deprecated + public AsyncLogWriter startAsyncLogSegmentNonPartitioned() throws IOException; + + /** + * Begin appending to the end of the log stream which is being treated as a sequence of bytes + * + * @return the writer interface to generate log records + */ + public AppendOnlyStreamWriter getAppendOnlyStreamWriter() throws IOException; + + /** + * Get a reader to read a log stream as a sequence of bytes + * + * @return the writer interface to generate log records + */ + public AppendOnlyStreamReader getAppendOnlyStreamReader() throws IOException; + + /** + * Get the input stream starting with fromTxnId for the specified log + * + * @param fromTxnId - the first transaction id we want to read + * @return the stream starting with transaction fromTxnId + * @throws IOException if a stream cannot be found. + */ + public LogReader getInputStream(long fromTxnId) + throws IOException; + + public LogReader getInputStream(DLSN fromDLSN) throws IOException; + + /** + * Open an async log reader to read records from a log starting from <code>fromTxnId</code>. + * + * @param fromTxnId + * transaction id to start reading from + * @return async log reader + */ + public CompletableFuture<AsyncLogReader> openAsyncLogReader(long fromTxnId); + + /** + * Open an async log reader to read records from a log starting from <code>fromDLSN</code> + * + * @param fromDLSN + * dlsn to start reading from + * @return async log reader + */ + public CompletableFuture<AsyncLogReader> openAsyncLogReader(DLSN fromDLSN); + + // @Deprecated + public AsyncLogReader getAsyncLogReader(long fromTxnId) throws IOException; + + // @Deprecated + public AsyncLogReader getAsyncLogReader(DLSN fromDLSN) throws IOException; + + public CompletableFuture<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN); + + /** + * Get a log reader with lock starting from <i>fromDLSN</i> and using <i>subscriberId</i>. + * If two readers tried to open using same subscriberId, one would succeed, while the other + * will be blocked until it gets the lock. + * + * @param fromDLSN + * start dlsn + * @param subscriberId + * subscriber id + * @return async log reader + */ + public CompletableFuture<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN, String subscriberId); + + /** + * Get a log reader using <i>subscriberId</i> with lock. The reader will start reading from + * its last commit position recorded in subscription store. If no last commit position found + * in subscription store, it would start reading from head of the stream. + * + * If the two readers tried to open using same subscriberId, one would succeed, while the other + * will be blocked until it gets the lock. + * + * @param subscriberId + * subscriber id + * @return async log reader + */ + public CompletableFuture<AsyncLogReader> getAsyncLogReaderWithLock(String subscriberId); + + /** + * Get the {@link DLSN} of first log record whose transaction id is not less than <code>transactionId</code>. + * + * @param transactionId + * transaction id + * @return dlsn of first log record whose transaction id is not less than transactionId. + */ + public CompletableFuture<DLSN> getDLSNNotLessThanTxId(long transactionId); + + /** + * Get the last log record in the stream + * + * @return the last log record in the stream + * @throws IOException if a stream cannot be found. + */ + public LogRecordWithDLSN getLastLogRecord() + throws IOException; + + /** + * Get the earliest Transaction Id available in the log + * + * @return earliest transaction id + * @throws IOException + */ + public long getFirstTxId() throws IOException; + + /** + * Get Latest Transaction Id in the log + * + * @return latest transaction id + * @throws IOException + */ + public long getLastTxId() throws IOException; + + /** + * Get Latest DLSN in the log + * + * @return last dlsn + * @throws IOException + */ + public DLSN getLastDLSN() throws IOException; + + /** + * Get Latest log record with DLSN in the log - async + * + * @return latest log record with DLSN + */ + public CompletableFuture<LogRecordWithDLSN> getLastLogRecordAsync(); + + /** + * Get Latest Transaction Id in the log - async + * + * @return latest transaction id + */ + public CompletableFuture<Long> getLastTxIdAsync(); + + /** + * Get first DLSN in the log. + * + * @return first dlsn in the stream + */ + public CompletableFuture<DLSN> getFirstDLSNAsync(); + + /** + * Get Latest DLSN in the log - async + * + * @return latest transaction id + */ + public CompletableFuture<DLSN> getLastDLSNAsync(); + + /** + * Get the number of log records in the active portion of the log + * Any log segments that have already been truncated will not be included + * + * @return number of log records + * @throws IOException + */ + public long getLogRecordCount() throws IOException; + + /** + * Get the number of log records in the active portion of the log - async. + * Any log segments that have already been truncated will not be included + * + * @return future number of log records + * @throws IOException + */ + public CompletableFuture<Long> getLogRecordCountAsync(final DLSN beginDLSN); + + /** + * Run recovery on the log. + * + * @throws IOException + */ + public void recover() throws IOException; + + /** + * Check if an end of stream marker was added to the stream + * A stream with an end of stream marker cannot be appended to + * + * @return true if the marker was added to the stream, false otherwise + * @throws IOException + */ + public boolean isEndOfStreamMarked() throws IOException; + + /** + * Delete the log. + * + * @throws IOException if the deletion fails + */ + public void delete() throws IOException; + + /** + * The DistributedLogManager may archive/purge any logs for transactionId + * less than or equal to minImageTxId. + * This is to be used only when the client explicitly manages deletion. If + * the cleanup policy is based on sliding time window, then this method need + * not be called. + * + * @param minTxIdToKeep the earliest txid that must be retained + * @throws IOException if purging fails + */ + public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException; + + /** + * Get the subscriptions store provided by the distributedlog manager. + * + * @return subscriptions store manages subscriptions for current stream. + */ + public SubscriptionsStore getSubscriptionsStore(); + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/api/LogReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/LogReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/LogReader.java new file mode 100644 index 0000000..631a8a9 --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/LogReader.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.api; + +import org.apache.distributedlog.DLSN; +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.LogRecordWithDLSN; +import org.apache.distributedlog.io.AsyncCloseable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/** + * <i>LogReader</i> is a `synchronous` reader reading records from a DL log. + * + * <h3>Lifecycle of a Reader</h3> + * + * A reader is a <i>sequential</i> reader that read records from a DL log starting + * from a given position. The position could be a <i>DLSN</i> (via {@link DistributedLogManager#getInputStream(DLSN)} + * or a <i>Transaction ID</i> (via {@link DistributedLogManager#getInputStream(long)}. + * <p> + * After the reader is open, it could call {@link #readNext(boolean)} or {@link #readBulk(boolean, int)} + * to read records out the log from provided position. + * <p> + * Closing the reader (via {@link #close()} will release all the resources occupied + * by this reader instance. + * <p> + * Exceptions could be thrown during reading records. Once the exception is thrown, + * the reader is set to an error state and it isn't usable anymore. It is the application's + * responsibility to handle the exceptions and re-create readers if necessary. + * <p> + * Example: + * <pre> + * DistributedLogManager dlm = ...; + * long nextTxId = ...; + * LogReader reader = dlm.getInputStream(nextTxId); + * + * while (true) { // keep reading & processing records + * LogRecord record; + * try { + * record = reader.readNext(false); + * nextTxId = record.getTransactionId(); + * // process the record + * ... + * } catch (IOException ioe) { + * // handle the exception + * ... + * reader = dlm.getInputStream(nextTxId + 1); + * } + * } + * + * </pre> + * + * <h3>Read Records</h3> + * + * Reading records from an <i>endless</i> log in `synchronous` way isn't as + * trivial as in `asynchronous` way (via {@link AsyncLogReader}. Because it + * lacks of callback mechanism. LogReader introduces a flag `nonBlocking` on + * controlling the <i>waiting</i> behavior on `synchronous` reads. + * + * <h4>Blocking vs NonBlocking</h4> + * + * <i>Blocking</i> (nonBlocking = false) means the reads will wait for records + * before returning read calls. While <i>NonBlocking</i> (nonBlocking = true) + * means the reads will only check readahead cache and return whatever records + * available in the readahead cache. + * <p> + * The <i>waiting</i> period varies in <i>blocking</i> mode. If the reader is + * catching up with writer (there are records in the log), the read call will + * wait until records are read and returned. If the reader is caught up with + * writer (there are no more records in the log at read time), the read call + * will wait for a small period of time (defined in + * {@link DistributedLogConfiguration#getReadAheadWaitTime()} and return whatever + * records available in the readahead cache. In other words, if a reader sees + * no record on blocking reads, it means the reader is `caught-up` with the + * writer. + * <p> + * <i>Blocking</i> and <i>NonBlocking</i> modes are useful for building replicated + * state machines. Applications could use <i>blocking</i> reads till caught up + * with latest data. Once they are caught up with latest data, they could start + * serving their service and turn to <i>non-blocking</i> read mode and tail read + * data from the logs. + * <p> + * See examples below. + * + * <h4>Read Single Record</h4> + * + * {@link #readNext(boolean)} is reading individual records from a DL log. + * + * <pre> + * LogReader reader = ... + * + * // keep reading records in blocking way until no records available in the log + * LogRecord record = reader.readNext(false); + * while (null != record) { + * // process the record + * ... + * // read next record + * records = reader.readNext(false); + * } + * + * ... + * + * // reader is caught up with writer, doing non-blocking reads to tail the log + * while (true) { + * record = reader.readNext(true) + * // process the new records + * ... + * } + * </pre> + * + * <h4>Read Batch of Records</h4> + * + * {@link #readBulk(boolean, int)} is a convenient way to read a batch of records + * from a DL log. + * + * <pre> + * LogReader reader = ... + * int N = 10; + * + * // keep reading N records in blocking way until no records available in the log + * List<LogRecord> records = reader.readBulk(false, N); + * while (!records.isEmpty()) { + * // process the list of records + * ... + * if (records.size() < N) { // no more records available in the log + * break; + * } + * // read next N records + * records = reader.readBulk(false, N); + * } + * + * ... + * + * // reader is caught up with writer, doing non-blocking reads to tail the log + * while (true) { + * records = reader.readBulk(true, N) + * // process the new records + * ... + * } + * + * </pre> + * + * <p> + * NOTE: Extending {@link AsyncCloseable}: BKSyncLogReader is implemented based on BKAsyncLogReader, exposing + * the {@link AsyncCloseable} interface so the reader could be closed asynchronously + * + * @see AsyncLogReader + */ +public interface LogReader extends Closeable, AsyncCloseable { + + /** + * Read the next log record from the stream. + * <p> + * If <i>nonBlocking</i> is set to true, the call returns immediately by just polling + * records from read ahead cache. It would return <i>null</i> if there isn't any records + * available in the read ahead cache. + * <p> + * If <i>nonBlocking</i> is set to false, it would does blocking call. The call will + * block until return a record if there are records in the stream (aka catching up). + * Otherwise it would wait up to {@link DistributedLogConfiguration#getReadAheadWaitTime()} + * milliseconds and return null if there isn't any more records in the stream. + * + * @param nonBlocking should the read make blocking calls to the backend or rely on the + * readAhead cache + * @return an operation from the stream or null if at end of stream + * @throws IOException if there is an error reading from the stream + */ + public LogRecordWithDLSN readNext(boolean nonBlocking) throws IOException; + + /** + * Read the next <i>numLogRecords</i> log records from the stream + * + * @param nonBlocking should the read make blocking calls to the backend or rely on the + * readAhead cache + * @param numLogRecords maximum number of log records returned by this call. + * @return an operation from the stream or empty list if at end of stream + * @throws IOException if there is an error reading from the stream + * @see #readNext(boolean) + */ + public List<LogRecordWithDLSN> readBulk(boolean nonBlocking, int numLogRecords) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/api/LogWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/LogWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/LogWriter.java new file mode 100644 index 0000000..46ad1f0 --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/LogWriter.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.api; + +import org.apache.distributedlog.LogRecord; +import org.apache.distributedlog.io.Abortable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/* +* A generic interface class to support writing log records into +* a persistent distributed log. +*/ +public interface LogWriter extends Closeable, Abortable { + /** + * Write a log record to the stream. + * + * @param record single log record + * @throws IOException + */ + public void write(LogRecord record) throws IOException; + + + /** + * Write a list of log records to the stream. + * + * @param records list of log records + * @throws IOException + */ + @Deprecated + public int writeBulk(List<LogRecord> records) throws IOException; + + /** + * All data that has been written to the stream so far will be sent to + * persistent storage. + * The transmission is asynchronous and new data can be still written to the + * stream while flushing is performed. + * + * TODO: rename this to flush() + */ + public long setReadyToFlush() throws IOException; + + /** + * Flush and sync all data that is ready to be flush + * {@link #setReadyToFlush()} into underlying persistent store. + * @throws IOException + * + * TODO: rename this to commit() + */ + public long flushAndSync() throws IOException; + + /** + * Flushes all the data up to this point, + * adds the end of stream marker and marks the stream + * as read-only in the metadata. No appends to the + * stream will be allowed after this point + * + * @throws IOException + */ + public void markEndOfStream() throws IOException; + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/api/MetadataAccessor.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/MetadataAccessor.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/MetadataAccessor.java new file mode 100644 index 0000000..76ef700 --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/MetadataAccessor.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.api; + +import org.apache.distributedlog.io.AsyncCloseable; + +import java.io.Closeable; +import java.io.IOException; + +public interface MetadataAccessor extends Closeable, AsyncCloseable { + /** + * Get the name of the stream managed by this log manager + * @return streamName + */ + public String getStreamName(); + + public void createOrUpdateMetadata(byte[] metadata) throws IOException; + + public void deleteMetadata() throws IOException; + + public byte[] getMetadata() throws IOException; + + /** + * Close the distributed log metadata, freeing any resources it may hold. + */ + public void close() throws IOException; + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java new file mode 100644 index 0000000..818824d --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.api.namespace; + +import com.google.common.annotations.Beta; +import com.google.common.base.Optional; +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.api.DistributedLogManager; +import org.apache.distributedlog.exceptions.LogNotFoundException; +import org.apache.distributedlog.acl.AccessControlManager; +import org.apache.distributedlog.callback.NamespaceListener; +import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; +import org.apache.distributedlog.exceptions.InvalidStreamNameException; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.distributedlog.namespace.NamespaceDriver; + +/** + * A namespace is the basic unit for managing a set of distributedlogs. + * + * <h4>Namespace Interface</h4> + * + * <P> + * The <code>Namespace</code> interface is implemented by different backend providers. + * There are several components are required for an implementation: + * <OL> + * <LI>Log Management -- manage logs in a given namespace. e.g. create/open/delete log, list of logs, + * watch the changes of logs. + * <LI>Access Control -- manage the access controls for logs in the namespace. + * </OL> + * </P> + * + * <h4>Namespace Location</h4> + * + * At the highest level, a <code>Namespace</code> is located by a <code>URI</code>. The location + * URI is in string form has the syntax + * + * <blockquote> + * distributedlog[<tt><b>-</b></tt><i>provider</i>]<tt><b>:</b></tt><i>provider-specific-path</i> + * </blockquote> + * + * where square brackets [...] delineate optional components and the characters <tt><b>-</b></tt> and <tt><b>:</b></tt> + * stand for themselves. + * + * The <code>provider</code> part in the URI indicates what is the backend used for this namespace. For example: + * <i>distributedlog-bk</i> URI is storing logs in bookkeeper, while <i>distributedlog-mem</i> URI is storing logs in + * memory. The <code>provider</code> part is optional. It would use bookkeeper backend if the <i>provider</i> part + * is omitted. + * + * @see DistributedLogManager + * @since 0.3.32 + */ +@Beta +public interface Namespace { + + /** + * Get the namespace driver used by this namespace. + * + * @return namespace driver + */ + NamespaceDriver getNamespaceDriver(); + + // + // Method to operate logs + // + + /** + * Create a log named <i>logName</i>. + * + * @param logName + * name of the log + * @throws InvalidStreamNameException if log name is invalid. + * @throws IOException when encountered issues with backend. + */ + void createLog(String logName) + throws InvalidStreamNameException, IOException; + + /** + * Delete a log named <i>logName</i>. + * + * @param logName + * name of the log + * @throws InvalidStreamNameException if log name is invalid + * @throws LogNotFoundException if log doesn't exist + * @throws IOException when encountered issues with backend + */ + void deleteLog(String logName) + throws InvalidStreamNameException, LogNotFoundException, IOException; + + /** + * Open a log named <i>logName</i>. + * A distributedlog manager is returned to access log <i>logName</i>. + * + * @param logName + * name of the log + * @return distributedlog manager instance. + * @throws InvalidStreamNameException if log name is invalid. + * @throws IOException when encountered issues with backend. + */ + DistributedLogManager openLog(String logName) + throws InvalidStreamNameException, IOException; + + /** + * Open a log named <i>logName</i> with specific log configurations. + * + * <p>This method allows the caller to override global configuration settings by + * supplying log configuration overrides. Log config overrides come in two flavors, + * static and dynamic. Static config never changes in the lifecyle of <code>DistributedLogManager</code>, + * dynamic config changes by reloading periodically and safe to access from any context.</p> + * + * @param logName + * name of the log + * @param logConf + * static log configuration + * @param dynamicLogConf + * dynamic log configuration + * @return distributedlog manager instance. + * @throws InvalidStreamNameException if log name is invalid. + * @throws IOException when encountered issues with backend. + */ + DistributedLogManager openLog(String logName, + Optional<DistributedLogConfiguration> logConf, + Optional<DynamicDistributedLogConfiguration> dynamicLogConf, + Optional<StatsLogger> perStreamStatsLogger) + throws InvalidStreamNameException, IOException; + + /** + * Check whether the log <i>logName</i> exist. + * + * @param logName + * name of the log + * @return <code>true</code> if the log exists, otherwise <code>false</code>. + * @throws IOException when encountered exceptions on checking + */ + boolean logExists(String logName) + throws IOException; + + /** + * Retrieve the logs under the namespace. + * + * @return iterator of the logs under the namespace. + * @throws IOException when encountered issues with backend. + */ + Iterator<String> getLogs() + throws IOException; + + // + // Methods for namespace + // + + /** + * Register namespace listener on stream updates under the namespace. + * + * @param listener + * listener to receive stream updates under the namespace + */ + void registerNamespaceListener(NamespaceListener listener); + + /** + * Create an access control manager to manage/check acl for logs. + * + * @return access control manager for logs under the namespace. + * @throws IOException + */ + AccessControlManager createAccessControlManager() + throws IOException; + + /** + * Close the namespace. + */ + void close(); + +}