DL-159: ReadAhead Improvement (part 2) - New ReadAhead Reader using the LogSegmentEntryReader interface
Provide a new ReadAhead reader using the log segment entry reader interface. It does reading entries in a batch in parallel for batches, rather than reading entries in batch by batch. This would help mitigate the slow bookie problem. The core change is the new ReadAheadEntryReader. Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/7a977972 Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/7a977972 Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/7a977972 Branch: refs/heads/master Commit: 7a977972200da9e86f4557bb002ce16cb60d236a Parents: 27c04f3 Author: Sijie Guo <sij...@twitter.com> Authored: Wed Dec 28 15:09:38 2016 -0800 Committer: Sijie Guo <sij...@twitter.com> Committed: Thu Dec 29 02:11:40 2016 -0800 ---------------------------------------------------------------------- .../distributedlog/BKAsyncLogReaderDLSN.java | 198 ++-- .../distributedlog/BKDistributedLogManager.java | 54 +- .../distributedlog/BKLogReadHandler.java | 90 +- .../distributedlog/BKSyncLogReaderDLSN.java | 80 +- .../java/com/twitter/distributedlog/Entry.java | 14 + .../twitter/distributedlog/EntryPosition.java | 63 ++ .../distributedlog/EnvelopedEntryReader.java | 10 + .../com/twitter/distributedlog/MaxTxId.java | 2 +- .../distributedlog/ReadAheadEntryReader.java | 966 +++++++++++++++++++ .../logsegment/BKLogSegmentEntryReader.java | 118 ++- .../impl/logsegment/BKLogSegmentEntryStore.java | 127 +++ .../injector/AsyncFailureInjector.java | 7 +- .../injector/AsyncRandomFailureInjector.java | 15 +- .../logsegment/LogSegmentEntryReader.java | 21 + .../logsegment/LogSegmentEntryStore.java | 4 +- .../rate/MovingAverageRateFactory.java | 2 +- .../readahead/ReadAheadWorker.java | 12 +- .../distributedlog/util/OrderedScheduler.java | 8 + .../distributedlog/TestAsyncReaderLock.java | 2 +- .../distributedlog/TestAsyncReaderWriter.java | 39 +- .../distributedlog/TestBKSyncLogReader.java | 5 +- .../distributedlog/TestEntryPosition.java | 59 ++ .../TestNonBlockingReadsMultiReader.java | 5 +- .../twitter/distributedlog/TestReadAhead.java | 158 --- .../TestReadAheadEntryReader.java | 423 ++++++++ .../com/twitter/distributedlog/TestReader.java | 4 + .../distributedlog/TestRollLogSegments.java | 12 +- .../twitter/distributedlog/TestTruncate.java | 1 - .../logsegment/TestBKLogSegmentEntryReader.java | 38 +- .../LogSegmentIsTruncatedException.java | 33 + .../src/main/thrift/service.thrift | 2 + 31 files changed, 2114 insertions(+), 458 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java index 2ca064c..e347012 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java @@ -20,6 +20,7 @@ package com.twitter.distributedlog; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Stopwatch; +import com.google.common.base.Ticker; import com.twitter.distributedlog.exceptions.DLIllegalStateException; import com.twitter.distributedlog.exceptions.DLInterruptedException; import com.twitter.distributedlog.exceptions.EndOfStreamException; @@ -27,9 +28,7 @@ import com.twitter.distributedlog.exceptions.IdleReaderException; import com.twitter.distributedlog.exceptions.LogNotFoundException; import com.twitter.distributedlog.exceptions.ReadCancelledException; import com.twitter.distributedlog.exceptions.UnexpectedException; -import com.twitter.distributedlog.injector.AsyncFailureInjector; -import com.twitter.distributedlog.injector.AsyncRandomFailureInjector; -import com.twitter.distributedlog.util.FutureUtils; +import com.twitter.distributedlog.util.Utils; import com.twitter.util.Future; import com.twitter.util.FutureEventListener; import com.twitter.util.Promise; @@ -48,10 +47,12 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.versioning.Versioned; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Function1; import scala.runtime.AbstractFunction1; +import scala.runtime.BoxedUnit; /** * BookKeeper based {@link AsyncLogReader} implementation. @@ -83,7 +84,7 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio }; protected final BKDistributedLogManager bkDistributedLogManager; - protected final BKLogReadHandler bkLedgerManager; + protected final BKLogReadHandler readHandler; private final AtomicReference<Throwable> lastException = new AtomicReference<Throwable>(); private final ScheduledExecutorService executorService; private final ConcurrentLinkedQueue<PendingReadRequest> pendingRequests = new ConcurrentLinkedQueue<PendingReadRequest>(); @@ -92,19 +93,19 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio final private Stopwatch scheduleDelayStopwatch; final private Stopwatch readNextDelayStopwatch; private DLSN startDLSN; - private boolean readAheadStarted = false; + private ReadAheadEntryReader readAheadReader = null; private int lastPosition = 0; private final boolean positionGapDetectionEnabled; private final int idleErrorThresholdMillis; final ScheduledFuture<?> idleReaderTimeoutTask; private ScheduledFuture<?> backgroundScheduleTask = null; + // last process time + private final Stopwatch lastProcessTime; protected Promise<Void> closeFuture = null; private boolean lockStream = false; - private boolean disableReadAheadLogSegmentsNotification = false; - private final boolean returnEndOfStreamRecord; private final Runnable BACKGROUND_READ_SCHEDULER = new Runnable() { @@ -122,7 +123,6 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio private LogRecordWithDLSN nextRecord = null; // Failure Injector - private final AsyncFailureInjector failureInjector; private boolean disableProcessingReadRequests = false; // Stats @@ -198,7 +198,7 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio void complete() { if (LOG.isTraceEnabled()) { - LOG.trace("{} : Satisfied promise with {} records", bkLedgerManager.getFullyQualifiedName(), records.size()); + LOG.trace("{} : Satisfied promise with {} records", readHandler.getFullyQualifiedName(), records.size()); } delayUntilPromiseSatisfied.registerSuccessfulEvent(enqueueTime.stop().elapsed(TimeUnit.MICROSECONDS)); Stopwatch stopwatch = Stopwatch.createStarted(); @@ -212,12 +212,11 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio DLSN startDLSN, Optional<String> subscriberId, boolean returnEndOfStreamRecord, - boolean deserializeRecordSet, StatsLogger statsLogger) { this.bkDistributedLogManager = bkdlm; this.executorService = executorService; - this.bkLedgerManager = bkDistributedLogManager.createReadHandler(subscriberId, - this, deserializeRecordSet, true); + this.readHandler = bkDistributedLogManager.createReadHandler(subscriberId, + this, true); LOG.debug("Starting async reader at {}", startDLSN); this.startDLSN = startDLSN; this.scheduleDelayStopwatch = Stopwatch.createUnstarted(); @@ -226,16 +225,6 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio this.idleErrorThresholdMillis = bkdlm.getConf().getReaderIdleErrorThresholdMillis(); this.returnEndOfStreamRecord = returnEndOfStreamRecord; - // Failure Injection - this.failureInjector = AsyncRandomFailureInjector.newBuilder() - .injectDelays(bkdlm.getConf().getEIInjectReadAheadDelay(), - bkdlm.getConf().getEIInjectReadAheadDelayPercent(), - bkdlm.getConf().getEIInjectMaxReadAheadDelayMs()) - .injectErrors(false, 10) - .injectStops(bkdlm.getConf().getEIInjectReadAheadStall(), 10) - .injectCorruption(bkdlm.getConf().getEIInjectReadAheadBrokenEntries()) - .build(); - // Stats StatsLogger asyncReaderStatsLogger = statsLogger.scope("async_reader"); futureSetLatency = asyncReaderStatsLogger.getOpStatsLogger("future_set"); @@ -252,6 +241,7 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio // Lock the stream if requested. The lock will be released when the reader is closed. this.lockStream = false; this.idleReaderTimeoutTask = scheduleIdleReaderTaskIfNecessary(); + this.lastProcessTime = Stopwatch.createStarted(); } private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() { @@ -276,7 +266,7 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio return; } - ReadAheadCache cache = bkLedgerManager.getReadAheadCache(); + ReadAheadEntryReader readAheadReader = getReadAheadReader(); // read request has been idle // - cache has records but read request are idle, @@ -284,32 +274,27 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio // - cache is empty and readahead is idle (no records added for a long time) idleReaderCheckIdleReadAheadCount.inc(); try { - if (!hasMoreRecords(cache) - && !cache.isReadAheadIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS)) { + if (null == readAheadReader || (!hasMoreRecords() && + readAheadReader.isReaderIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS))) { + markReaderAsIdle(); return; + } else if (lastProcessTime.elapsed(TimeUnit.MILLISECONDS) > idleErrorThresholdMillis) { + markReaderAsIdle();; } } catch (IOException e) { - // we encountered exceptions on checking more records setLastException(e); return; } - - idleReaderError.inc(); - IdleReaderException ire = new IdleReaderException("Reader on stream " - + bkLedgerManager.getFullyQualifiedName() - + " is idle for " + idleErrorThresholdMillis +" ms"); - setLastException(ire); - // cancel all pending reads directly rather than notifying on error - // because idle reader could happen on idle read requests that usually means something wrong - // in scheduling reads - cancelAllPendingReads(ire); } }, period, period, TimeUnit.MILLISECONDS); } - return null; } + synchronized ReadAheadEntryReader getReadAheadReader() { + return readAheadReader; + } + void cancelIdleReaderTask() { // Do this after we have checked that the reader was not previously closed try { @@ -317,12 +302,24 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio idleReaderTimeoutTask.cancel(true); } } catch (Exception exc) { - LOG.info("{}: Failed to cancel the background idle reader timeout task", bkLedgerManager.getFullyQualifiedName()); + LOG.info("{}: Failed to cancel the background idle reader timeout task", readHandler.getFullyQualifiedName()); } } + private void markReaderAsIdle() { + idleReaderError.inc(); + IdleReaderException ire = new IdleReaderException("Reader on stream " + + readHandler.getFullyQualifiedName() + + " is idle for " + idleErrorThresholdMillis +" ms"); + setLastException(ire); + // cancel all pending reads directly rather than notifying on error + // because idle reader could happen on idle read requests that usually means something wrong + // in scheduling reads + cancelAllPendingReads(ire); + } + protected synchronized void setStartDLSN(DLSN fromDLSN) throws UnexpectedException { - if (readAheadStarted) { + if (null != readAheadReader) { throw new UnexpectedException("Could't reset from dlsn after reader already starts reading."); } startDLSN = fromDLSN; @@ -335,14 +332,14 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio public Future<Void> lockStream() { this.lockStream = true; - return bkLedgerManager.lockStream(); + return readHandler.lockStream(); } private boolean checkClosedOrInError(String operation) { if (null == lastException.get()) { try { - if (null != bkLedgerManager && null != bkLedgerManager.readAheadWorker) { - bkLedgerManager.readAheadWorker.checkClosedOrInError(); + if (null != readHandler && null != getReadAheadReader()) { + getReadAheadReader().checkLastException(); } bkDistributedLogManager.checkClosedOrInError(operation); @@ -353,7 +350,7 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio if (lockStream) { try { - bkLedgerManager.checkReadLock(); + readHandler.checkReadLock(); } catch (IOException ex) { setLastException(ex); } @@ -411,28 +408,44 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio readNextDelayStopwatch.reset().start(); final PendingReadRequest readRequest = new PendingReadRequest(numEntries, deadlineTime, deadlineTimeUnit); - if (!readAheadStarted) { - bkLedgerManager.checkLogStreamExists().addEventListener(new FutureEventListener<Void>() { - @Override - public void onSuccess(Void value) { - try { - bkLedgerManager.startReadAhead( - new LedgerReadPosition(getStartDLSN()), - failureInjector); - if (disableReadAheadLogSegmentsNotification) { - bkLedgerManager.disableReadAheadLogSegmentsNotification(); + if (null == readAheadReader) { + try { + final ReadAheadEntryReader readAheadEntryReader = this.readAheadReader = new ReadAheadEntryReader( + getStreamName(), + getStartDLSN(), + bkDistributedLogManager.getConf(), + readHandler, + bkDistributedLogManager.getReaderEntryStore(), + bkDistributedLogManager.getScheduler(), + Ticker.systemTicker(), + bkDistributedLogManager.alertStatsLogger); + readHandler.checkLogStreamExists().addEventListener(new FutureEventListener<Void>() { + @Override + public void onSuccess(Void value) { + try { + readHandler.registerListener(readAheadEntryReader); + readHandler.asyncStartFetchLogSegments() + .map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() { + @Override + public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) { + readAheadEntryReader.addStateChangeNotification(BKAsyncLogReaderDLSN.this); + readAheadEntryReader.start(logSegments.getValue()); + return BoxedUnit.UNIT; + } + }); + } catch (Exception exc) { + notifyOnError(exc); } - } catch (Exception exc) { - notifyOnError(exc); } - } - @Override - public void onFailure(Throwable cause) { - notifyOnError(cause); - } - }); - readAheadStarted = true; + @Override + public void onFailure(Throwable cause) { + notifyOnError(cause); + } + }); + } catch (IOException ioe) { + notifyOnError(ioe); + } } if (checkClosedOrInError("readNext")) { @@ -475,7 +488,7 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio return closeFuture; } closePromise = closeFuture = new Promise<Void>(); - exception = new ReadCancelledException(bkLedgerManager.getFullyQualifiedName(), "Reader was closed"); + exception = new ReadCancelledException(readHandler.getFullyQualifiedName(), "Reader was closed"); setLastException(exception); } @@ -490,7 +503,15 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio cancelAllPendingReads(exception); - FutureUtils.ignore(bkLedgerManager.asyncClose()).proxyTo(closePromise); + ReadAheadEntryReader readAheadReader = getReadAheadReader(); + if (null != readAheadReader) { + readHandler.unregisterListener(readAheadReader); + readAheadReader.removeStateChangeNotification(this); + } + Utils.closeSequence(bkDistributedLogManager.getScheduler(), true, + readAheadReader, + readHandler + ).proxyTo(closePromise); return closePromise; } @@ -501,25 +522,26 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio pendingRequests.clear(); } - boolean hasMoreRecords() throws IOException { - return hasMoreRecords(bkLedgerManager.readAheadCache); - } - - private synchronized boolean hasMoreRecords(ReadAheadCache cache) throws IOException { - if (cache.getNumCachedEntries() > 0 || null != nextRecord) { + synchronized boolean hasMoreRecords() throws IOException { + if (null == readAheadReader) { + return false; + } + if (readAheadReader.getNumCachedEntries() > 0 || null != nextRecord) { return true; } else if (null != currentEntry) { nextRecord = currentEntry.nextRecord(); return null != nextRecord; - } else { - return false; } + return false; } private synchronized LogRecordWithDLSN readNextRecord() throws IOException { + if (null == readAheadReader) { + return null; + } if (null == currentEntry) { - currentEntry = bkLedgerManager.getNextReadAheadEntry(); - // no current entry after reading from read head then return null + currentEntry = readAheadReader.getNextReadAheadEntry(0L, TimeUnit.MILLISECONDS); + // no entry after reading from read ahead then return null if (null == currentEntry) { return null; } @@ -551,10 +573,10 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio Stopwatch runTime = Stopwatch.createStarted(); int iterations = 0; long scheduleCountLocal = scheduleCount.get(); - LOG.debug("{}: Scheduled Background Reader", bkLedgerManager.getFullyQualifiedName()); + LOG.debug("{}: Scheduled Background Reader", readHandler.getFullyQualifiedName()); while(true) { if (LOG.isTraceEnabled()) { - LOG.trace("{}: Executing Iteration: {}", bkLedgerManager.getFullyQualifiedName(), iterations++); + LOG.trace("{}: Executing Iteration: {}", readHandler.getFullyQualifiedName(), iterations++); } PendingReadRequest nextRequest = null; @@ -563,31 +585,32 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio // Queue is empty, nothing to read, return if (null == nextRequest) { - LOG.trace("{}: Queue Empty waiting for Input", bkLedgerManager.getFullyQualifiedName()); + LOG.trace("{}: Queue Empty waiting for Input", readHandler.getFullyQualifiedName()); scheduleCount.set(0); backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS)); return; } if (disableProcessingReadRequests) { - LOG.info("Reader of {} is forced to stop processing read requests", bkLedgerManager.getFullyQualifiedName()); + LOG.info("Reader of {} is forced to stop processing read requests", readHandler.getFullyQualifiedName()); return; } } + lastProcessTime.reset().start(); // If the oldest pending promise is interrupted then we must mark // the reader in error and abort all pending reads since we dont // know the last consumed read if (null == lastException.get()) { if (nextRequest.getPromise().isInterrupted().isDefined()) { - setLastException(new DLInterruptedException("Interrupted on reading " + bkLedgerManager.getFullyQualifiedName() + " : ", + setLastException(new DLInterruptedException("Interrupted on reading " + readHandler.getFullyQualifiedName() + " : ", nextRequest.getPromise().isInterrupted().get())); } } if (checkClosedOrInError("readNext")) { if (!(lastException.get().getCause() instanceof LogNotFoundException)) { - LOG.warn("{}: Exception", bkLedgerManager.getFullyQualifiedName(), lastException.get()); + LOG.warn("{}: Exception", readHandler.getFullyQualifiedName(), lastException.get()); } backgroundReaderRunTime.registerFailedEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS)); return; @@ -595,7 +618,7 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio try { // Fail 10% of the requests when asked to simulate errors - if (failureInjector.shouldInjectErrors()) { + if (bkDistributedLogManager.getFailureInjector().shouldInjectErrors()) { throw new IOException("Reader Simulated Exception"); } LogRecordWithDLSN record; @@ -609,15 +632,15 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio } else { if (record.isEndOfStream() && !returnEndOfStreamRecord) { setLastException(new EndOfStreamException("End of Stream Reached for " - + bkLedgerManager.getFullyQualifiedName())); + + readHandler.getFullyQualifiedName())); break; } // gap detection if (recordPositionsContainsGap(record, lastPosition)) { - bkDistributedLogManager.raiseAlert("Gap detected between records at dlsn = {}", record.getDlsn()); + bkDistributedLogManager.raiseAlert("Gap detected between records at record = {}", record); if (positionGapDetectionEnabled) { - throw new DLIllegalStateException("Gap detected between records at dlsn = " + record.getDlsn()); + throw new DLIllegalStateException("Gap detected between records at record = " + record); } } lastPosition = record.getLastPositionWithinLogSegment(); @@ -628,7 +651,7 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio } catch (IOException exc) { setLastException(exc); if (!(exc instanceof LogNotFoundException)) { - LOG.warn("{} : read with skip Exception", bkLedgerManager.getFullyQualifiedName(), lastException.get()); + LOG.warn("{} : read with skip Exception", readHandler.getFullyQualifiedName(), lastException.get()); } continue; } @@ -709,13 +732,12 @@ class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotificatio @VisibleForTesting void simulateErrors() { - failureInjector.injectErrors(true); + bkDistributedLogManager.getFailureInjector().injectErrors(true); } @VisibleForTesting synchronized void disableReadAheadLogSegmentsNotification() { - disableReadAheadLogSegmentsNotification = true; - bkLedgerManager.disableReadAheadLogSegmentsNotification(); + readHandler.disableReadAheadLogSegmentsNotification(); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java index cd3f359..4963787 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java @@ -34,6 +34,10 @@ import com.twitter.distributedlog.exceptions.LogNotFoundException; import com.twitter.distributedlog.exceptions.UnexpectedException; import com.twitter.distributedlog.function.CloseAsyncCloseableFunction; import com.twitter.distributedlog.function.GetVersionedValueFunction; +import com.twitter.distributedlog.impl.logsegment.BKLogSegmentEntryStore; +import com.twitter.distributedlog.injector.AsyncFailureInjector; +import com.twitter.distributedlog.injector.AsyncRandomFailureInjector; +import com.twitter.distributedlog.logsegment.LogSegmentEntryStore; import com.twitter.distributedlog.metadata.LogMetadataForReader; import com.twitter.distributedlog.metadata.LogMetadataForWriter; import com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore; @@ -141,7 +145,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL private final FeatureProvider featureProvider; private final StatsLogger statsLogger; private final StatsLogger perLogStatsLogger; - private final AlertStatsLogger alertStatsLogger; + final AlertStatsLogger alertStatsLogger; // log stream metadata stores private final LogStreamMetadataStore writerMetadataStore; @@ -159,6 +163,7 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL private final boolean ownWriterBKC; private final BookKeeperClientBuilder readerBKCBuilder; private final BookKeeperClient readerBKC; + private LogSegmentEntryStore readerEntryStore = null; private final boolean ownReaderBKC; // @@ -176,6 +181,9 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL private final PendingReaders pendingReaders; private final ReadAheadExceptionsLogger readAheadExceptionsLogger; + // Failure Injector + private final AsyncFailureInjector failureInjector; + /** * Create a DLM for testing. * @@ -303,6 +311,16 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL this.ledgerAllocator = ledgerAllocator; this.writeLimiter = writeLimiter; + // Failure Injection + this.failureInjector = AsyncRandomFailureInjector.newBuilder() + .injectDelays(conf.getEIInjectReadAheadDelay(), + conf.getEIInjectReadAheadDelayPercent(), + conf.getEIInjectMaxReadAheadDelayMs()) + .injectErrors(false, 10) + .injectStops(conf.getEIInjectReadAheadStall(), 10) + .injectCorruption(conf.getEIInjectReadAheadBrokenEntries()) + .build(); + if (null == writerMetadataStore) { this.writerMetadataStore = new ZKLogStreamMetadataStore( clientId, @@ -413,6 +431,18 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL return this.readerBKC; } + synchronized LogSegmentEntryStore getReaderEntryStore() throws IOException { + if (null == readerEntryStore) { + readerEntryStore = new BKLogSegmentEntryStore( + conf, + readerBKC.get(), + scheduler, + statsLogger, + failureInjector); + } + return this.readerEntryStore; + } + @VisibleForTesting FuturePool getReaderFuturePool() { return this.readerFuturePool; @@ -423,6 +453,10 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL return this.featureProvider; } + AsyncFailureInjector getFailureInjector() { + return this.failureInjector; + } + private synchronized BKLogReadHandler getReadHandlerAndRegisterListener( boolean create, LogSegmentListener listener) { if (null == readHandlerForListener && create) { @@ -432,7 +466,9 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL readHandlerForListener.asyncStartFetchLogSegments(); return readHandlerForListener; } - readHandlerForListener.registerListener(listener); + if (null != readHandlerForListener && null != listener) { + readHandlerForListener.registerListener(listener); + } return readHandlerForListener; } @@ -493,13 +529,11 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL return createReadHandler( subscriberId, null, - true, /* deserialize record set */ isHandleForReading); } synchronized BKLogReadHandler createReadHandler(Optional<String> subscriberId, AsyncNotification notification, - boolean deserializeRecordSet, boolean isHandleForReading) { LogMetadataForReader logMetadata = LogMetadataForReader.of(uri, name, streamIdentifier); return new BKLogReadHandler( @@ -511,15 +545,12 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL readerMetadataStore, logSegmentMetadataCache, scheduler, - readAheadScheduler, alertStatsLogger, - readAheadExceptionsLogger, statsLogger, perLogStatsLogger, clientId, notification, - isHandleForReading, - deserializeRecordSet); + isHandleForReading); } // Create Ledger Allocator @@ -930,7 +961,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL fromDLSN, subscriberId, false, - dynConf.getDeserializeRecordSetOnReads(), statsLogger); pendingReaders.add(reader); return Future.value(reader); @@ -969,7 +999,6 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL fromDLSN.isPresent() ? fromDLSN.get() : DLSN.InitialDLSN, subscriberId, false, - dynConf.getDeserializeRecordSetOnReads(), statsLogger); pendingReaders.add(reader); final Future<Void> lockFuture = reader.lockStream(); @@ -1046,15 +1075,14 @@ class BKDistributedLogManager extends ZKMetadataAccessor implements DistributedL LogReader getInputStreamInternal(DLSN fromDLSN, Optional<Long> fromTxnId) throws IOException { - LOG.info("Create async reader starting from {}", fromDLSN); + LOG.info("Create sync reader starting from {}", fromDLSN); checkClosedOrInError("getInputStream"); - LogReader reader = new BKSyncLogReaderDLSN( + return new BKSyncLogReaderDLSN( conf, this, fromDLSN, fromTxnId, statsLogger); - return reader; } /** http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java index 9cfe1a6..67c584c 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java @@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; -import com.google.common.base.Ticker; import com.twitter.distributedlog.callback.LogSegmentListener; import com.twitter.distributedlog.callback.LogSegmentNamesListener; import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; @@ -36,14 +35,10 @@ import com.twitter.distributedlog.exceptions.LogNotFoundException; import com.twitter.distributedlog.exceptions.LogSegmentNotFoundException; import com.twitter.distributedlog.exceptions.UnexpectedException; import com.twitter.distributedlog.metadata.LogMetadataForReader; -import com.twitter.distributedlog.injector.AsyncFailureInjector; import com.twitter.distributedlog.lock.DistributedLock; import com.twitter.distributedlog.logsegment.LogSegmentFilter; import com.twitter.distributedlog.logsegment.LogSegmentMetadataCache; import com.twitter.distributedlog.metadata.LogStreamMetadataStore; -import com.twitter.distributedlog.readahead.ReadAheadWorker; -import com.twitter.distributedlog.stats.BroadCastStatsLogger; -import com.twitter.distributedlog.stats.ReadAheadExceptionsLogger; import com.twitter.distributedlog.util.FutureUtils; import com.twitter.distributedlog.util.OrderedScheduler; import com.twitter.distributedlog.util.Utils; @@ -111,13 +106,9 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { static final Logger LOG = LoggerFactory.getLogger(BKLogReadHandler.class); protected final LogMetadataForReader logMetadataForReader; - protected final ReadAheadCache readAheadCache; protected final LedgerHandleCache handleCache; - protected final OrderedScheduler readAheadExecutor; protected final DynamicDistributedLogConfiguration dynConf; - protected ReadAheadWorker readAheadWorker = null; - private final boolean isHandleForReading; private final Optional<String> subscriberId; private DistributedLock readLock; @@ -134,10 +125,7 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { new Versioned<List<LogSegmentMetadata>>(null, Version.NEW); // stats - private final AlertStatsLogger alertStatsLogger; - private final StatsLogger handlerStatsLogger; private final StatsLogger perLogStatsLogger; - private final ReadAheadExceptionsLogger readAheadExceptionsLogger; /** * Construct a Bookkeeper journal manager. @@ -150,15 +138,12 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { LogStreamMetadataStore streamMetadataStore, LogSegmentMetadataCache metadataCache, OrderedScheduler scheduler, - OrderedScheduler readAheadExecutor, AlertStatsLogger alertStatsLogger, - ReadAheadExceptionsLogger readAheadExceptionsLogger, StatsLogger statsLogger, StatsLogger perLogStatsLogger, String clientId, AsyncNotification readerStateNotification, - boolean isHandleForReading, - boolean deserializeRecordSet) { + boolean isHandleForReading) { super(logMetadata, conf, bkcBuilder, @@ -170,13 +155,8 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { clientId); this.logMetadataForReader = logMetadata; this.dynConf = dynConf; - this.readAheadExecutor = readAheadExecutor; - this.alertStatsLogger = alertStatsLogger; this.perLogStatsLogger = isHandleForReading ? perLogStatsLogger : NullStatsLogger.INSTANCE; - this.handlerStatsLogger = - BroadCastStatsLogger.masterslave(this.perLogStatsLogger, statsLogger); - this.readAheadExceptionsLogger = readAheadExceptionsLogger; this.readerStateNotification = readerStateNotification; handleCache = LedgerHandleCache.newBuilder() @@ -184,16 +164,7 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { .conf(conf) .statsLogger(statsLogger) .build(); - readAheadCache = new ReadAheadCache( - getFullyQualifiedName(), - alertStatsLogger, - readerStateNotification, - dynConf.getReadAheadMaxRecords(), - deserializeRecordSet, - Ticker.systemTicker()); - this.subscriberId = subscriberId; - this.isHandleForReading = isHandleForReading; } @VisibleForTesting @@ -290,16 +261,10 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { } lockToClose = readLock; } - return Utils.closeSequence(scheduler, readAheadWorker, lockToClose) + return Utils.closeSequence(scheduler, lockToClose) .flatMap(new AbstractFunction1<Void, Future<Void>>() { @Override public Future<Void> apply(Void result) { - if (null != readAheadCache) { - readAheadCache.clear(); - } - if (null != readAheadWorker) { - unregisterListener(readAheadWorker); - } if (null != handleCache) { handleCache.clear(); } @@ -361,57 +326,6 @@ class BKLogReadHandler extends BKLogHandler implements LogSegmentNamesListener { }); } - public void startReadAhead(LedgerReadPosition startPosition, - AsyncFailureInjector failureInjector) { - if (null == readAheadWorker) { - readAheadWorker = new ReadAheadWorker( - conf, - dynConf, - logMetadataForReader, - this, - readAheadExecutor, - handleCache, - startPosition, - readAheadCache, - isHandleForReading, - readAheadExceptionsLogger, - handlerStatsLogger, - perLogStatsLogger, - alertStatsLogger, - failureInjector, - readerStateNotification); - registerListener(readAheadWorker); - // start the readahead worker after the log segments are fetched - asyncStartFetchLogSegments().map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() { - @Override - public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) { - readAheadWorker.start(logSegments.getValue()); - return BoxedUnit.UNIT; - } - }); - } - } - - public boolean isReadAheadCaughtUp() { - return null != readAheadWorker && readAheadWorker.isCaughtUp(); - } - - public LedgerHandleCache getHandleCache() { - return handleCache; - } - - public Entry.Reader getNextReadAheadEntry() throws IOException { - return readAheadCache.getNextReadAheadEntry(); - } - - public Entry.Reader getNextReadAheadEntry(long waitTime, TimeUnit waitTimeUnit) throws IOException { - return readAheadCache.getNextReadAheadEntry(waitTime, waitTimeUnit); - } - - public ReadAheadCache getReadAheadCache() { - return readAheadCache; - } - @VisibleForTesting void disableReadAheadLogSegmentsNotification() { logSegmentsNotificationDisabled = true; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java index 0f6db75..adf49a1 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKSyncLogReaderDLSN.java @@ -19,14 +19,18 @@ package com.twitter.distributedlog; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; +import com.google.common.base.Ticker; import com.twitter.distributedlog.exceptions.EndOfStreamException; import com.twitter.distributedlog.exceptions.IdleReaderException; -import com.twitter.distributedlog.injector.AsyncFailureInjector; import com.twitter.distributedlog.util.FutureUtils; +import com.twitter.distributedlog.util.Utils; import com.twitter.util.Future; import com.twitter.util.Promise; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.versioning.Versioned; +import scala.runtime.AbstractFunction1; +import scala.runtime.BoxedUnit; import java.io.IOException; import java.util.LinkedList; @@ -39,6 +43,7 @@ import java.util.concurrent.atomic.AtomicReference; */ class BKSyncLogReaderDLSN implements LogReader, AsyncNotification { + private final BKDistributedLogManager bkdlm; private final BKLogReadHandler readHandler; private final AtomicReference<IOException> readerException = new AtomicReference<IOException>(null); @@ -48,6 +53,9 @@ class BKSyncLogReaderDLSN implements LogReader, AsyncNotification { private boolean positioned = false; private Entry.Reader currentEntry = null; + // readahead reader + ReadAheadEntryReader readAheadReader = null; + // idle reader settings private final boolean shouldCheckIdleReader; private final int idleErrorThresholdMillis; @@ -59,19 +67,19 @@ class BKSyncLogReaderDLSN implements LogReader, AsyncNotification { BKDistributedLogManager bkdlm, DLSN startDLSN, Optional<Long> startTransactionId, - StatsLogger statsLogger) { + StatsLogger statsLogger) throws IOException { + this.bkdlm = bkdlm; this.readHandler = bkdlm.createReadHandler( Optional.<String>absent(), this, - conf.getDeserializeRecordSetOnReads(), true); this.maxReadAheadWaitTime = conf.getReadAheadWaitTime(); this.idleErrorThresholdMillis = conf.getReaderIdleErrorThresholdMillis(); this.shouldCheckIdleReader = idleErrorThresholdMillis > 0 && idleErrorThresholdMillis < Integer.MAX_VALUE; this.startTransactionId = startTransactionId; - readHandler.startReadAhead( - new LedgerReadPosition(startDLSN), - AsyncFailureInjector.NULL); + + // start readahead + startReadAhead(startDLSN); if (!startTransactionId.isPresent()) { positioned = true; } @@ -81,32 +89,55 @@ class BKSyncLogReaderDLSN implements LogReader, AsyncNotification { idleReaderError = syncReaderStatsLogger.getCounter("idle_reader_error"); } + private void startReadAhead(DLSN startDLSN) throws IOException { + readAheadReader = new ReadAheadEntryReader( + bkdlm.getStreamName(), + startDLSN, + bkdlm.getConf(), + readHandler, + bkdlm.getReaderEntryStore(), + bkdlm.getScheduler(), + Ticker.systemTicker(), + bkdlm.alertStatsLogger); + readHandler.registerListener(readAheadReader); + readHandler.asyncStartFetchLogSegments() + .map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() { + @Override + public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) { + readAheadReader.addStateChangeNotification(BKSyncLogReaderDLSN.this); + readAheadReader.start(logSegments.getValue()); + return BoxedUnit.UNIT; + } + }); + } + + @VisibleForTesting + ReadAheadEntryReader getReadAheadReader() { + return readAheadReader; + } + @VisibleForTesting BKLogReadHandler getReadHandler() { return readHandler; } - // reader is still catching up, waiting for next record - private Entry.Reader readNextEntry(boolean nonBlocking) throws IOException { Entry.Reader entry = null; if (nonBlocking) { - return readHandler.getNextReadAheadEntry(); + return readAheadReader.getNextReadAheadEntry(0L, TimeUnit.MILLISECONDS); } else { - while (!readHandler.isReadAheadCaughtUp() + while (!readAheadReader.isReadAheadCaughtUp() && null == readerException.get() && null == entry) { - entry = readHandler.getNextReadAheadEntry(maxReadAheadWaitTime, - TimeUnit.MILLISECONDS); + entry = readAheadReader.getNextReadAheadEntry(maxReadAheadWaitTime, TimeUnit.MILLISECONDS); } if (null != entry) { return entry; } // reader is caught up - if (readHandler.isReadAheadCaughtUp() + if (readAheadReader.isReadAheadCaughtUp() && null == readerException.get()) { - entry = readHandler.getNextReadAheadEntry(maxReadAheadWaitTime, - TimeUnit.MILLISECONDS); + entry = readAheadReader.getNextReadAheadEntry(maxReadAheadWaitTime, TimeUnit.MILLISECONDS); } return entry; } @@ -121,30 +152,24 @@ class BKSyncLogReaderDLSN implements LogReader, AsyncNotification { throw ire; } - @Override public synchronized LogRecordWithDLSN readNext(boolean nonBlocking) throws IOException { if (null != readerException.get()) { throw readerException.get(); } - LogRecordWithDLSN record = doReadNext(nonBlocking); - // no record is returned, check if the reader becomes idle if (null == record && shouldCheckIdleReader) { - ReadAheadCache cache = readHandler.getReadAheadCache(); - if (cache.getNumCachedEntries() <= 0 && - cache.isReadAheadIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS)) { + if (readAheadReader.getNumCachedEntries() <= 0 && + readAheadReader.isReaderIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS)) { markReaderAsIdle(); } } - return record; } - private synchronized LogRecordWithDLSN doReadNext(boolean nonBlocking) - throws IOException { + private LogRecordWithDLSN doReadNext(boolean nonBlocking) throws IOException { LogRecordWithDLSN record = null; do { @@ -217,7 +242,12 @@ class BKSyncLogReaderDLSN implements LogReader, AsyncNotification { } closeFuture = closePromise = new Promise<Void>(); } - readHandler.asyncClose().proxyTo(closePromise); + readHandler.unregisterListener(readAheadReader); + readAheadReader.removeStateChangeNotification(this); + Utils.closeSequence(bkdlm.getScheduler(), true, + readAheadReader, + readHandler + ).proxyTo(closePromise); return closePromise; } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/Entry.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/Entry.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/Entry.java index b1bd701..bf315fc 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/Entry.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/Entry.java @@ -358,6 +358,20 @@ public class Entry { public interface Reader { /** + * Get the log segment sequence number. + * + * @return the log segment sequence number. + */ + long getLSSN(); + + /** + * Return the entry id. + * + * @return the entry id. + */ + long getEntryId(); + + /** * Read next log record from this record set. * * @return next log record from this record set. http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryPosition.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryPosition.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryPosition.java new file mode 100644 index 0000000..0a15d29 --- /dev/null +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/EntryPosition.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog; + +/** + * The position of an entry, identified by log segment sequence number and entry id. + */ +class EntryPosition { + + private long lssn; + private long entryId; + + EntryPosition(long lssn, long entryId) { + this.lssn = lssn; + this.entryId = entryId; + } + + public synchronized long getLogSegmentSequenceNumber() { + return lssn; + } + + public synchronized long getEntryId() { + return entryId; + } + + public synchronized boolean advance(long lssn, long entryId) { + if (lssn == this.lssn) { + if (entryId <= this.entryId) { + return false; + } + this.entryId = entryId; + return true; + } else if (lssn > this.lssn) { + this.lssn = lssn; + this.entryId = entryId; + return true; + } else { + return false; + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("(").append(lssn).append(", ").append(entryId).append(")"); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryReader.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryReader.java index 79e4408..038bb18 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryReader.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/EnvelopedEntryReader.java @@ -57,6 +57,16 @@ class EnvelopedEntryReader implements Entry.Reader, RecordStream { } @Override + public long getLSSN() { + return logSegmentSeqNo; + } + + @Override + public long getEntryId() { + return entryId; + } + + @Override public LogRecordWithDLSN nextRecord() throws IOException { return reader.readOp(); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7a977972/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java index ed7218e..c3948df 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/MaxTxId.java @@ -66,7 +66,7 @@ class MaxTxId { } public Versioned<Long> getVersionedData(long txId) { - return new Versioned<Long>(Math.max(txId, currentMax), version); + return new Versioned<Long>(Math.max(txId, get()), version); } }