http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java deleted file mode 100644 index adf49a1..0000000 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java +++ /dev/null @@ -1,276 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.base.Ticker; -import com.twitter.distributedlog.exceptions.EndOfStreamException; -import com.twitter.distributedlog.exceptions.IdleReaderException; -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.distributedlog.util.Utils; -import com.twitter.util.Future; -import com.twitter.util.Promise; -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.versioning.Versioned; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; - -import java.io.IOException; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -/** - * Synchronous Log Reader based on {@link AsyncLogReader} - */ -class BKSyncLogReaderDLSN implements LogReader, AsyncNotification { - - private final BKDistributedLogManager bkdlm; - private final BKLogReadHandler readHandler; - private final AtomicReference<IOException> readerException = - new AtomicReference<IOException>(null); - private final int maxReadAheadWaitTime; - private Promise<Void> closeFuture; - private final Optional<Long> startTransactionId; - private boolean positioned = false; - private Entry.Reader currentEntry = null; - - // readahead reader - ReadAheadEntryReader readAheadReader = null; - - // idle reader settings - private final boolean shouldCheckIdleReader; - private final int idleErrorThresholdMillis; - - // Stats - private final Counter idleReaderError; - - BKSyncLogReaderDLSN(DistributedLogConfiguration conf, - BKDistributedLogManager bkdlm, - DLSN startDLSN, - Optional<Long> startTransactionId, - StatsLogger statsLogger) throws IOException { - this.bkdlm = bkdlm; - this.readHandler = bkdlm.createReadHandler( - Optional.<String>absent(), - this, - true); - this.maxReadAheadWaitTime = conf.getReadAheadWaitTime(); - this.idleErrorThresholdMillis = conf.getReaderIdleErrorThresholdMillis(); - this.shouldCheckIdleReader = idleErrorThresholdMillis > 0 && idleErrorThresholdMillis < Integer.MAX_VALUE; - this.startTransactionId = startTransactionId; - - // start readahead - startReadAhead(startDLSN); - if (!startTransactionId.isPresent()) { - positioned = true; - } - - // Stats - StatsLogger syncReaderStatsLogger = statsLogger.scope("sync_reader"); - idleReaderError = syncReaderStatsLogger.getCounter("idle_reader_error"); - } - - private void startReadAhead(DLSN startDLSN) throws IOException { - readAheadReader = new ReadAheadEntryReader( - bkdlm.getStreamName(), - startDLSN, - bkdlm.getConf(), - readHandler, - bkdlm.getReaderEntryStore(), - bkdlm.getScheduler(), - Ticker.systemTicker(), - bkdlm.alertStatsLogger); - readHandler.registerListener(readAheadReader); - readHandler.asyncStartFetchLogSegments() - .map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() { - @Override - public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) { - readAheadReader.addStateChangeNotification(BKSyncLogReaderDLSN.this); - readAheadReader.start(logSegments.getValue()); - return BoxedUnit.UNIT; - } - }); - } - - @VisibleForTesting - ReadAheadEntryReader getReadAheadReader() { - return readAheadReader; - } - - @VisibleForTesting - BKLogReadHandler getReadHandler() { - return readHandler; - } - - private Entry.Reader readNextEntry(boolean nonBlocking) throws IOException { - Entry.Reader entry = null; - if (nonBlocking) { - return readAheadReader.getNextReadAheadEntry(0L, TimeUnit.MILLISECONDS); - } else { - while (!readAheadReader.isReadAheadCaughtUp() - && null == readerException.get() - && null == entry) { - entry = readAheadReader.getNextReadAheadEntry(maxReadAheadWaitTime, TimeUnit.MILLISECONDS); - } - if (null != entry) { - return entry; - } - // reader is caught up - if (readAheadReader.isReadAheadCaughtUp() - && null == readerException.get()) { - entry = readAheadReader.getNextReadAheadEntry(maxReadAheadWaitTime, TimeUnit.MILLISECONDS); - } - return entry; - } - } - - private void markReaderAsIdle() throws IdleReaderException { - idleReaderError.inc(); - IdleReaderException ire = new IdleReaderException("Sync reader on stream " - + readHandler.getFullyQualifiedName() - + " is idle for more than " + idleErrorThresholdMillis + " ms"); - readerException.compareAndSet(null, ire); - throw ire; - } - - @Override - public synchronized LogRecordWithDLSN readNext(boolean nonBlocking) - throws IOException { - if (null != readerException.get()) { - throw readerException.get(); - } - LogRecordWithDLSN record = doReadNext(nonBlocking); - // no record is returned, check if the reader becomes idle - if (null == record && shouldCheckIdleReader) { - if (readAheadReader.getNumCachedEntries() <= 0 && - readAheadReader.isReaderIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS)) { - markReaderAsIdle(); - } - } - return record; - } - - private LogRecordWithDLSN doReadNext(boolean nonBlocking) throws IOException { - LogRecordWithDLSN record = null; - - do { - // fetch one record until we don't find any entry available in the readahead cache - while (null == record) { - if (null == currentEntry) { - currentEntry = readNextEntry(nonBlocking); - if (null == currentEntry) { - return null; - } - } - record = currentEntry.nextRecord(); - if (null == record) { - currentEntry = null; - } - } - - // check if we reached the end of stream - if (record.isEndOfStream()) { - EndOfStreamException eos = new EndOfStreamException("End of Stream Reached for " - + readHandler.getFullyQualifiedName()); - readerException.compareAndSet(null, eos); - throw eos; - } - // skip control records - if (record.isControl()) { - record = null; - continue; - } - if (!positioned) { - if (record.getTransactionId() < startTransactionId.get()) { - record = null; - continue; - } else { - positioned = true; - break; - } - } else { - break; - } - } while (true); - return record; - } - - @Override - public synchronized List<LogRecordWithDLSN> readBulk(boolean nonBlocking, int numLogRecords) - throws IOException { - LinkedList<LogRecordWithDLSN> retList = - new LinkedList<LogRecordWithDLSN>(); - - int numRead = 0; - LogRecordWithDLSN record = readNext(nonBlocking); - while ((null != record)) { - retList.add(record); - numRead++; - if (numRead >= numLogRecords) { - break; - } - record = readNext(nonBlocking); - } - return retList; - } - - @Override - public Future<Void> asyncClose() { - Promise<Void> closePromise; - synchronized (this) { - if (null != closeFuture) { - return closeFuture; - } - closeFuture = closePromise = new Promise<Void>(); - } - readHandler.unregisterListener(readAheadReader); - readAheadReader.removeStateChangeNotification(this); - Utils.closeSequence(bkdlm.getScheduler(), true, - readAheadReader, - readHandler - ).proxyTo(closePromise); - return closePromise; - } - - @Override - public void close() throws IOException { - FutureUtils.result(asyncClose()); - } - - // - // Notification From ReadHandler - // - - @Override - public void notifyOnError(Throwable cause) { - if (cause instanceof IOException) { - readerException.compareAndSet(null, (IOException) cause); - } else { - readerException.compareAndSet(null, new IOException(cause)); - } - } - - @Override - public void notifyOnOperationComplete() { - // no-op - } -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/main/resources/findbugsExclude.xml ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/resources/findbugsExclude.xml b/distributedlog-core/src/main/resources/findbugsExclude.xml index 684b827..c07fad9 100644 --- a/distributedlog-core/src/main/resources/findbugsExclude.xml +++ b/distributedlog-core/src/main/resources/findbugsExclude.xml @@ -33,7 +33,7 @@ <Bug pattern="EI_EXPOSE_REP" /> </Match> <Match> - <Class name="com.twitter.distributedlog.BKAsyncLogReaderDLSN" /> + <Class name="com.twitter.distributedlog.BKAsyncLogReader" /> <Method name="run" /> <Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER" /> </Match> http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/test/java/com/twitter/distributedlog/NonBlockingReadsTestUtil.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/NonBlockingReadsTestUtil.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/NonBlockingReadsTestUtil.java index c34eb6e..3a1ab88 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/NonBlockingReadsTestUtil.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/NonBlockingReadsTestUtil.java @@ -46,15 +46,15 @@ class NonBlockingReadsTestUtil { boolean forceStall, long segmentSize, boolean waitForIdle) throws Exception { - BKSyncLogReaderDLSN reader = null; + BKSyncLogReader reader = null; try { - reader = (BKSyncLogReaderDLSN) dlm.getInputStream(1); + reader = (BKSyncLogReader) dlm.getInputStream(1); } catch (LogNotFoundException lnfe) { } while (null == reader) { TimeUnit.MILLISECONDS.sleep(20); try { - reader = (BKSyncLogReaderDLSN) dlm.getInputStream(1); + reader = (BKSyncLogReader) dlm.getInputStream(1); } catch (LogNotFoundException lnfe) { } catch (LogEmptyException lee) { } catch (IOException ioe) { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java index 9927616..a6cffbb 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderLock.java @@ -75,7 +75,7 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { writer.closeAndComplete(); Future<AsyncLogReader> futureReader1 = dlm.getAsyncLogReaderWithLock(DLSN.InitialDLSN); - BKAsyncLogReaderDLSN reader1 = (BKAsyncLogReaderDLSN) Await.result(futureReader1); + BKAsyncLogReader reader1 = (BKAsyncLogReader) Await.result(futureReader1); LogRecordWithDLSN record = Await.result(reader1.readNext()); assertEquals(1L, record.getTransactionId()); assertEquals(0L, record.getSequenceId()); @@ -542,7 +542,7 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { writer.closeAndComplete(); } - BKAsyncLogReaderDLSN reader0 = (BKAsyncLogReaderDLSN) Await.result(dlm.getAsyncLogReaderWithLock(subscriberId)); + BKAsyncLogReader reader0 = (BKAsyncLogReader) Await.result(dlm.getAsyncLogReaderWithLock(subscriberId)); assertEquals(DLSN.NonInclusiveLowerBound, reader0.getStartDLSN()); long numTxns = 0; LogRecordWithDLSN record = Await.result(reader0.readNext()); @@ -562,7 +562,7 @@ public class TestAsyncReaderLock extends TestDistributedLogBase { SubscriptionsStore subscriptionsStore = dlm.getSubscriptionsStore(); Await.result(subscriptionsStore.advanceCommitPosition(subscriberId, readDLSN)); - BKAsyncLogReaderDLSN reader1 = (BKAsyncLogReaderDLSN) Await.result(dlm.getAsyncLogReaderWithLock(subscriberId)); + BKAsyncLogReader reader1 = (BKAsyncLogReader) Await.result(dlm.getAsyncLogReaderWithLock(subscriberId)); assertEquals(readDLSN, reader1.getStartDLSN()); numTxns = 0; long startTxID = 10L; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java index 95d760e..65507ac 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java @@ -1397,7 +1397,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { }, 0, TimeUnit.MILLISECONDS); latch.await(); - BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN) dlm.getAsyncLogReader(DLSN.InitialDLSN); + BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN); if (simulateReaderStall) { reader.disableProcessingReadRequests(); } @@ -1538,7 +1538,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { }, 0, TimeUnit.MILLISECONDS); latch.await(); - BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN)dlm.getAsyncLogReader(DLSN.InitialDLSN); + BKAsyncLogReader reader = (BKAsyncLogReader)dlm.getAsyncLogReader(DLSN.InitialDLSN); reader.disableReadAheadLogSegmentsNotification(); boolean exceptionEncountered = false; int recordCount = 0; @@ -1616,7 +1616,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { record.setControl(); Await.result(writer.write(record)); - BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN) dlm.getAsyncLogReader(DLSN.InitialDLSN); + BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN); record = Await.result(reader.readNext()); LOG.info("Read record {}", record); assertEquals(1L, record.getTransactionId()); @@ -1668,7 +1668,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { } catch (EndOfStreamException ex) { } - BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN) dlm.getAsyncLogReader(DLSN.InitialDLSN); + BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN); LogRecord record = null; for (int j = 0; j < NUM_RECORDS; j++) { record = Await.result(reader.readNext()); @@ -1702,7 +1702,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { } writer.close(); - BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN) dlm.getAsyncLogReader(DLSN.InitialDLSN); + BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN); try { LogRecord record = Await.result(reader.readNext()); fail("Should have thrown"); @@ -1727,7 +1727,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { controlRecord.setControl(); FutureUtils.result(writer.write(controlRecord)); - BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN) dlm.getAsyncLogReader(DLSN.InitialDLSN); + BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN); Future<List<LogRecordWithDLSN>> bulkReadFuture = reader.readBulk(2, Long.MAX_VALUE, TimeUnit.MILLISECONDS); Future<LogRecordWithDLSN> readFuture = reader.readNext(); @@ -1772,7 +1772,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase { controlRecord.setControl(); FutureUtils.result(writer.write(controlRecord)); - BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN) dlm.getAsyncLogReader(DLSN.InitialDLSN); + BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN); Future<List<LogRecordWithDLSN>> bulkReadFuture = reader.readBulk(2, 0, TimeUnit.MILLISECONDS); Future<LogRecordWithDLSN> readFuture = reader.readNext(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKSyncLogReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKSyncLogReader.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKSyncLogReader.java index 54177c8..d28b62c 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKSyncLogReader.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestBKSyncLogReader.java @@ -159,7 +159,7 @@ public class TestBKSyncLogReader extends TestDistributedLogBase { // all 10 records are added to the stream // then open a reader to read - BKSyncLogReaderDLSN reader = (BKSyncLogReaderDLSN) dlm.getInputStream(1L); + BKSyncLogReader reader = (BKSyncLogReader) dlm.getInputStream(1L); // wait until readahead caught up while (!reader.getReadAheadReader().isReadAheadCaughtUp()) { @@ -226,7 +226,7 @@ public class TestBKSyncLogReader extends TestDistributedLogBase { logger.info("Write first 10 records"); // open a reader to read - BKSyncLogReaderDLSN reader = (BKSyncLogReaderDLSN) dlm.getInputStream(1L); + BKSyncLogReader reader = (BKSyncLogReader) dlm.getInputStream(1L); // resume reading from sync reader. so it should be able to read all 10 records // and return null to claim it as caughtup LogRecord record = reader.readNext(false); @@ -283,7 +283,7 @@ public class TestBKSyncLogReader extends TestDistributedLogBase { }, 0, 400, TimeUnit.MILLISECONDS); // open a reader to read - BKSyncLogReaderDLSN reader = (BKSyncLogReaderDLSN) dlm.getInputStream(1L); + BKSyncLogReader reader = (BKSyncLogReader) dlm.getInputStream(1L); // resume reading from sync reader. so it should be able to read all 10 records // and return null to claim it as caughtup LogRecord record = reader.readNext(false); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReadsMultiReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReadsMultiReader.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReadsMultiReader.java index 3f47337..bfa9156 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReadsMultiReader.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReadsMultiReader.java @@ -99,7 +99,7 @@ public class TestNonBlockingReadsMultiReader extends TestDistributedLogBase { DistributedLogManager dlmread = createNewDLM(conf, name); - BKSyncLogReaderDLSN reader0 = (BKSyncLogReaderDLSN) dlmread.getInputStream(0); + BKSyncLogReader reader0 = (BKSyncLogReader) dlmread.getInputStream(0); try { ReaderThread[] readerThreads = new ReaderThread[1]; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReader.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReader.java index 1739b47..6040549 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReader.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestReader.java @@ -118,7 +118,7 @@ public class TestReader implements FutureEventListener<LogRecordWithDLSN> { try { AsyncLogReader reader = dlm.getAsyncLogReader(dlsn); if (simulateErrors) { - ((BKAsyncLogReaderDLSN) reader).simulateErrors(); + ((BKAsyncLogReader) reader).simulateErrors(); } nextDLSN = dlsn; LOG.info("Positioned reader {} at {}", readerName, dlsn); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c7751804/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java index ee53362..99ef041 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestRollLogSegments.java @@ -312,7 +312,7 @@ public class TestRollLogSegments extends TestDistributedLogBase { } private void checkAndWaitWriterReaderPosition(BKLogSegmentWriter writer, long expectedWriterPosition, - BKAsyncLogReaderDLSN reader, long expectedReaderPosition, + BKAsyncLogReader reader, long expectedReaderPosition, LedgerHandle inspector, long expectedLac) throws Exception { while (getLedgerHandle(writer).getLastAddConfirmed() < expectedWriterPosition) { Thread.sleep(1000); @@ -357,7 +357,7 @@ public class TestRollLogSegments extends TestDistributedLogBase { } BKDistributedLogManager readDLM = (BKDistributedLogManager) createNewDLM(confLocal, name); - final BKAsyncLogReaderDLSN reader = (BKAsyncLogReaderDLSN) readDLM.getAsyncLogReader(DLSN.InitialDLSN); + final BKAsyncLogReader reader = (BKAsyncLogReader) readDLM.getAsyncLogReader(DLSN.InitialDLSN); // 2) reader should be able to read 5 entries. for (long i = 1; i <= numEntries; i++) {