This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push: new a0f7da9b23 Introduce Periodic mode to Accord Journal a0f7da9b23 is described below commit a0f7da9b2355d9cf7fddea59c882a76f4acbe73f Author: Benedict Elliott Smith <bened...@apache.org> AuthorDate: Thu May 23 14:20:09 2024 +0100 Introduce Periodic mode to Accord Journal patch by Benedict; reviewed by Aleksey Yeschenko, Alex Petrov and David Capwell for CASSANDRA-19720 --- .../org/apache/cassandra/config/AccordSpec.java | 4 +- .../cassandra/config/DatabaseDescriptor.java | 3 - .../apache/cassandra/journal/ActiveSegment.java | 37 ++- src/java/org/apache/cassandra/journal/Flusher.java | 312 +++++++++++++++------ src/java/org/apache/cassandra/journal/Journal.java | 53 +++- .../apache/cassandra/journal/SegmentWriter.java | 4 +- .../org/apache/cassandra/journal/Segments.java | 15 + .../apache/cassandra/journal/SyncedOffsets.java | 36 ++- .../service/accord/AccordCommandStore.java | 1 + .../cassandra/service/accord/AccordJournal.java | 8 +- .../cassandra/service/accord/AccordKeyspace.java | 68 +++-- .../service/accord/AccordObjectSizes.java | 2 +- .../cassandra/service/accord/AccordService.java | 2 +- .../accord/serializers/CheckStatusSerializers.java | 3 +- .../accord/serializers/CommandSerializers.java | 32 +++ .../accord/serializers/FetchSerializers.java | 13 +- .../accord/serializers/TopologySerializers.java | 15 +- .../accord/serializers/WaitingOnSerializer.java | 81 ++++-- .../org/apache/cassandra/utils/ByteBufferUtil.java | 2 +- test/conf/logback-simulator.xml | 2 +- .../cassandra/distributed/test/TestBaseImpl.java | 4 + .../distributed/test/accord/AccordLoadTest.java | 97 ++++++- .../cassandra/simulator/ClusterSimulation.java | 14 +- .../cassandra/simulator/SimulationRunner.java | 2 +- .../simulator/paxos/AccordClusterSimulation.java | 10 +- .../paxos/PairOfSequencesAccordSimulation.java | 6 +- .../simulator/paxos/PaxosClusterSimulation.java | 5 - .../simulator/paxos/PaxosSimulationRunner.java | 2 +- .../cassandra/journal/SyncedOffsetsTest.java | 4 +- .../serializers/WaitingOnSerializerTest.java | 2 +- 30 files changed, 620 insertions(+), 219 deletions(-) diff --git a/src/java/org/apache/cassandra/config/AccordSpec.java b/src/java/org/apache/cassandra/config/AccordSpec.java index b035b0b9b5..2e0d614957 100644 --- a/src/java/org/apache/cassandra/config/AccordSpec.java +++ b/src/java/org/apache/cassandra/config/AccordSpec.java @@ -30,7 +30,7 @@ public class AccordSpec public volatile OptionaldPositiveInt shard_count = OptionaldPositiveInt.UNDEFINED; - public volatile DurationSpec.IntSecondsBound progress_log_schedule_delay = new DurationSpec.IntSecondsBound(1); + public volatile DurationSpec.IntMillisecondsBound progress_log_schedule_delay = new DurationSpec.IntMillisecondsBound(100); /** * When a barrier transaction is requested how many times to repeat attempting the barrier before giving up @@ -79,7 +79,7 @@ public class AccordSpec { public int segmentSize = 32 << 20; public FailurePolicy failurePolicy = FailurePolicy.STOP; - public FlushMode flushMode = FlushMode.BATCH; + public FlushMode flushMode = FlushMode.PERIODIC; public DurationSpec.IntMillisecondsBound flushPeriod; // pulls default from 'commitlog_sync_period' public DurationSpec.IntMillisecondsBound periodicFlushLagBlock = new DurationSpec.IntMillisecondsBound("1500ms"); diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 285d7308b7..c98182cdd3 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -154,9 +154,6 @@ import static org.apache.cassandra.utils.Clock.Global.logInitializationOutcome; public class DatabaseDescriptor { - public static final String NO_ACCORD_PAXOS_STRATEGY_WITH_ACCORD_DISABLED_MESSAGE = - "Cannot use lwt_strategy \"accord\" while Accord transactions are disabled."; - static { // This static block covers most usages diff --git a/src/java/org/apache/cassandra/journal/ActiveSegment.java b/src/java/org/apache/cassandra/journal/ActiveSegment.java index f16126c157..1fd2e4dd1a 100644 --- a/src/java/org/apache/cassandra/journal/ActiveSegment.java +++ b/src/java/org/apache/cassandra/journal/ActiveSegment.java @@ -24,6 +24,7 @@ import java.nio.channels.FileChannel; import java.nio.file.StandardOpenOption; import java.util.*; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.locks.LockSupport; import com.codahale.metrics.Timer; @@ -50,6 +51,9 @@ final class ActiveSegment<K, V> extends Segment<K, V> * Everything before this offset has been written and flushed. */ private volatile int lastFlushedOffset = 0; + private volatile int lastFsyncOffset = 0; + @SuppressWarnings("rawtypes") + private static final AtomicIntegerFieldUpdater<ActiveSegment> lastFsyncOffsetUpdater = AtomicIntegerFieldUpdater.newUpdater(ActiveSegment.class, "lastFsyncOffset"); /* * End position of the buffer; initially set to its capacity and @@ -86,7 +90,7 @@ final class ActiveSegment<K, V> extends Segment<K, V> @SuppressWarnings("resource") static <K, V> ActiveSegment<K, V> create(Descriptor descriptor, Params params, KeySupport<K> keySupport) { - SyncedOffsets syncedOffsets = SyncedOffsets.active(descriptor, true); + SyncedOffsets syncedOffsets = SyncedOffsets.active(descriptor); InMemoryIndex<K> index = InMemoryIndex.create(keySupport); Metadata metadata = Metadata.create(); return new ActiveSegment<>(descriptor, params, syncedOffsets, index, metadata, keySupport); @@ -152,7 +156,7 @@ final class ActiveSegment<K, V> extends Segment<K, V> boolean isEmpty = discardUnusedTail(); if (!isEmpty) { - flush(); + flush(true); if (persistComponents) persistComponents(); } release(); @@ -261,21 +265,37 @@ final class ActiveSegment<K, V> extends Segment<K, V> * TODO FIXME: calls from outside Flusher + callbacks * @return last synced offset */ - synchronized int flush() + synchronized int flush(boolean fsync) { int allocatePosition = this.allocatePosition.get(); if (lastFlushedOffset >= allocatePosition) return lastFlushedOffset; waitForModifications(); - flushInternal(); + if (fsync) + { + fsyncInternal(); + lastFsyncOffsetUpdater.accumulateAndGet(this, allocatePosition, Math::max); + } lastFlushedOffset = allocatePosition; int syncedOffset = Math.min(allocatePosition, endOfBuffer); - syncedOffsets.mark(syncedOffset); + syncedOffsets.mark(syncedOffset, fsync); flushComplete.signalAll(); return syncedOffset; } + // provides no ordering guarantees + void fsync() + { + int lastFlushed = lastFlushedOffset; + if (lastFsyncOffset >= lastFlushed) + return; + + fsyncInternal(); + syncedOffsets.fsync(); + lastFsyncOffsetUpdater.accumulateAndGet(this, lastFlushed, Math::max); + } + private void waitForFlush(int position) { while (lastFlushedOffset < position) @@ -297,7 +317,7 @@ final class ActiveSegment<K, V> extends Segment<K, V> appendOrder.awaitNewBarrier(); } - private void flushInternal() + private void fsyncInternal() { try { @@ -314,6 +334,11 @@ final class ActiveSegment<K, V> extends Segment<K, V> return syncedOffset >= endOfBuffer; } + boolean isCompletedAndFullyFsynced() + { + return lastFsyncOffset >= endOfBuffer; + } + /** * Ensures no more of this segment is writeable, by allocating any unused section at the end * and marking it discarded void discartUnusedTail() diff --git a/src/java/org/apache/cassandra/journal/Flusher.java b/src/java/org/apache/cassandra/journal/Flusher.java index c4c6d75348..607943b635 100644 --- a/src/java/org/apache/cassandra/journal/Flusher.java +++ b/src/java/org/apache/cassandra/journal/Flusher.java @@ -17,12 +17,15 @@ */ package org.apache.cassandra.journal; -import java.util.ArrayList; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.LockSupport; + +import javax.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import accord.utils.Invariants; import com.codahale.metrics.Timer; import org.apache.cassandra.concurrent.Interruptible; import org.apache.cassandra.concurrent.Interruptible.TerminateException; @@ -33,7 +36,6 @@ import org.apache.cassandra.utils.concurrent.Semaphore; import org.apache.cassandra.utils.concurrent.WaitQueue; import static java.lang.String.format; -import static java.util.Comparator.comparing; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory; @@ -42,7 +44,7 @@ import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.Interrupts.SY import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.SimulatorSafe.SAFE; import static org.apache.cassandra.concurrent.Interruptible.State.NORMAL; import static org.apache.cassandra.concurrent.Interruptible.State.SHUTTING_DOWN; -import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; +import static org.apache.cassandra.journal.Params.FlushMode.PERIODIC; import static org.apache.cassandra.utils.Clock.Global.nanoTime; import static org.apache.cassandra.utils.MonotonicClock.Global.preciseTime; import static org.apache.cassandra.utils.Simulate.With.GLOBAL_CLOCK; @@ -60,16 +62,19 @@ final class Flusher<K, V> private final AsyncCallbacks<K, V> callbacks; private volatile Interruptible flushExecutor; + private volatile Interruptible fsyncExecutor; // counts of total pending write and written entries private final AtomicLong pending = new AtomicLong(0); private final AtomicLong written = new AtomicLong(0); - // all Allocations written before this time will be flushed - volatile long lastFlushedAt = currentTimeMillis(); + // the time of the last initiated flush + volatile long flushStartedAt = nanoTime(); + // the time of the earliest flush that has completed an fsync; all Allocations written before this time are durable + volatile long fsyncFinishedFor = flushStartedAt; // a signal that writers can wait on to be notified of a completed flush in PERIODIC FlushMode - private final WaitQueue flushComplete = newWaitQueue(); + private final WaitQueue fsyncComplete = newWaitQueue(); // TODO (expected): this is only used for testing, can we remove this? // a signal and flag that callers outside the flusher thread can use // to signal they want the journal segments to be flushed to disk @@ -97,20 +102,144 @@ final class Flusher<K, V> void shutdown() { flushExecutor.shutdown(); + if (fsyncExecutor != null) + fsyncExecutor.shutdown(); } @Simulate(with={MONITORS,GLOBAL_CLOCK,LOCK_SUPPORT}) private class FlushRunnable implements Interruptible.Task { - private final MonotonicClock clock; + @Simulate(with={MONITORS,GLOBAL_CLOCK,LOCK_SUPPORT}) + private class FSyncRunnable implements Interruptible.Task + { + // this is written only by the Flusher thread, and read only by the Fsync thread + ActiveSegment<K, V> fsyncUpTo; + ActiveSegment<K, V> fsyncing; + + private volatile Thread awaitingWork; + + // all Allocations written before this time will be written to at least the OS page cache; + volatile long fsyncWaitingSince = 0; + // the time of the earliest flush that has begun participating in an fsync + volatile long fsyncStartedFor = 0; + + @Override + public void run(Interruptible.State state) throws InterruptedException + { + try + { + doRun(state); + } + catch (Throwable t) + { + if (!journal.handleError("Failed to flush segments to disk", t)) + throw new TerminateException(); + } + } + + private void awaitWork() throws InterruptedException + { + long lastStartedAt = fsyncStartedFor; + if (fsyncWaitingSince != lastStartedAt) + return; + + awaitingWork = Thread.currentThread(); + do + { + if (Thread.interrupted()) + { + awaitingWork = null; + throw new InterruptedException(); + } + + LockSupport.park(); + } + while (fsyncWaitingSince == lastStartedAt); + + awaitingWork = null; + } + + void notify(Thread notify) + { + if (notify != null) + LockSupport.unpark(notify); + } + + public void doRun(Interruptible.State state) throws InterruptedException + { + awaitWork(); + if (fsyncing == null) + fsyncing = journal.oldestActiveSegment(); + + // invert order of access; we might see a future fsyncTo, but at worst this means redundantly invoking fsync before updating fsyncStartedFor + long startedAt = fsyncWaitingSince; + ActiveSegment<K, V> fsyncTo = this.fsyncUpTo; + fsyncStartedFor = startedAt; + // synchronized to prevent thread interrupts while performing IO operations and also + // clear interrupted status to prevent ClosedByInterruptException in ActiveSegment::flush + synchronized (this) + { + boolean ignore = Thread.interrupted(); + while (fsyncing != fsyncTo) + { + fsyncing.fsync(); + journal.closeActiveSegmentAndOpenAsStatic(fsyncing); + fsyncing = journal.getActiveSegment(fsyncing.descriptor.timestamp + 1); + } + fsyncing.fsync(); + } + fsyncFinishedFor = startedAt; + fsyncComplete.signalAll(); + long finishedAt = clock.now(); + processDuration(startedAt, finishedAt); + } + + void afterFlush(long startedAt, ActiveSegment<K, V> segment, int syncedOffset) + { + long requireFsyncTo = startedAt - periodicFlushLagBlockNanos(); + + fsyncUpTo = segment; + fsyncWaitingSince = startedAt; + + notify(awaitingWork); + + if (requireFsyncTo > fsyncFinishedFor) + awaitFsyncAt(requireFsyncTo, journal.metrics.waitingOnFlush.time()); + callbacks.onFlush(segment.descriptor.timestamp, syncedOffset); + } + + private void doNoOpFlush(long startedAt) + { + if (fsyncFinishedFor >= fsyncWaitingSince) + { + fsyncFinishedFor = startedAt; + } + else + { + // if the flusher is still running, update the waitingSince register + fsyncWaitingSince = startedAt; + notify(awaitingWork); + } + } + } + private final NoSpamLogger noSpamLogger; + private final MonotonicClock clock; + private final @Nullable FSyncRunnable fSyncRunnable; + + private ActiveSegment<K, V> current = null; - private final ArrayList<ActiveSegment<K, V>> segmentsToFlush = new ArrayList<>(); + private long firstLaggedAt = Long.MIN_VALUE; // first lag ever or since last logged warning + private int fsyncCount = 0; // flush count since firstLaggedAt + private int lagCount = 0; // lag count since firstLaggedAt + private long duration = 0; // time spent flushing since firstLaggedAt + private long lagDuration = 0; // cumulative lag since firstLaggedAt FlushRunnable(MonotonicClock clock) { - this.clock = clock; this.noSpamLogger = NoSpamLogger.wrap(logger, 5, MINUTES); + this.clock = clock; + this.fSyncRunnable = params.flushMode() == PERIODIC ? newFsyncRunnable() : null; } @Override @@ -131,8 +260,9 @@ final class Flusher<K, V> public void doRun(Interruptible.State state) throws InterruptedException { - long startedRunAt = clock.now(); - boolean flushToDisk = lastFlushedAt + flushPeriodNanos() <= startedRunAt || state != NORMAL || flushRequested; + long startedAt = clock.now(); + long flushPeriodNanos = flushPeriodNanos(); + boolean flushToDisk = flushStartedAt + flushPeriodNanos <= startedAt || state != NORMAL || flushRequested; // synchronized to prevent thread interrupts while performing IO operations and also // clear interrupted status to prevent ClosedByInterruptException in ActiveSegment::flush @@ -142,83 +272,71 @@ final class Flusher<K, V> if (flushToDisk) { flushRequested = false; - doFlush(); - lastFlushedAt = startedRunAt; - flushComplete.signalAll(); + flushStartedAt = startedAt; + doFlush(startedAt); } } - long now = clock.now(); - if (flushToDisk) - processFlushDuration(startedRunAt, now); - if (state == SHUTTING_DOWN) return; - long flushPeriodNanos = flushPeriodNanos(); if (flushPeriodNanos <= 0) { haveWork.acquire(1); } else { - long wakeUpAt = startedRunAt + flushPeriodNanos; - if (wakeUpAt > now) - haveWork.tryAcquireUntil(1, wakeUpAt); + long wakeUpAt = startedAt + flushPeriodNanos; + haveWork.tryAcquireUntil(1, wakeUpAt); } } - private void doFlush() + private void doFlush(long startedAt) throws InterruptedException { - journal.selectSegmentToFlush(segmentsToFlush); - segmentsToFlush.sort(comparing(s -> s.descriptor)); + boolean synchronousFsync = fSyncRunnable == null; - try - { - long syncedSegment = -1; - int syncedOffset = -1; + if (current == null) + current = journal.oldestActiveSegment(); + ActiveSegment<K, V> newCurrent = journal.currentActiveSegment(); - for (ActiveSegment<K, V> segment : segmentsToFlush) - { - if (!segment.shouldFlush()) - break; + if (newCurrent == current && (newCurrent == null || !newCurrent.shouldFlush())) + { + if (synchronousFsync) fsyncFinishedFor = startedAt; + else fSyncRunnable.doNoOpFlush(startedAt); + return; + } - syncedSegment = segment.descriptor.timestamp; - syncedOffset = segment.flush(); + Invariants.checkState(newCurrent != null); - // if an older segment isn't fully complete + flushed yet, don't attempt to flush any younger ones - if (!segment.isCompletedAndFullyFlushed(syncedOffset)) - break; + try + { + while (current != newCurrent) + { + current.discardUnusedTail(); + current.flush(synchronousFsync); + if (synchronousFsync) + journal.closeActiveSegmentAndOpenAsStatic(current); + current = journal.getActiveSegment(current.descriptor.timestamp + 1); } + int syncedOffset = current.flush(synchronousFsync); - // invoke the onFlush() callback once, covering entire flushed range across all flushed segments - if (syncedSegment != -1 && syncedOffset != -1) - callbacks.onFlush(syncedSegment, syncedOffset); + if (synchronousFsync) afterFSync(startedAt, current.descriptor.timestamp, syncedOffset); + else fSyncRunnable.afterFlush(startedAt, current, syncedOffset); } catch (Throwable t) { callbacks.onFlushFailed(t); throw t; } - finally - { - segmentsToFlush.clear(); - } } - private long firstLaggedAt = Long.MIN_VALUE; // first lag ever or since last logged warning - private int flushCount = 0; // flush count since firstLaggedAt - private int lagCount = 0; // lag count since firstLaggedAt - private long flushDuration = 0; // time spent flushing since firstLaggedAt - private long lagDuration = 0; // cumulative lag since firstLaggedAt - - private void processFlushDuration(long startedFlushAt, long finishedFlushAt) + private void processDuration(long startedFlushAt, long finishedFsyncAt) { - flushCount++; - flushDuration += (finishedFlushAt - startedFlushAt); + fsyncCount++; + duration += (finishedFsyncAt - startedFlushAt); long flushPeriodNanos = flushPeriodNanos(); - long lag = finishedFlushAt - (startedFlushAt + flushPeriodNanos); + long lag = finishedFsyncAt - (startedFlushAt + flushPeriodNanos); if (flushPeriodNanos <= 0 || lag <= 0) return; @@ -226,26 +344,42 @@ final class Flusher<K, V> lagDuration += lag; if (firstLaggedAt == Long.MIN_VALUE) - firstLaggedAt = finishedFlushAt; + firstLaggedAt = finishedFsyncAt; boolean logged = - noSpamLogger.warn(finishedFlushAt, - "Out of {} {} journal flushes over the past {}s with average duration of {}ms, " + - "{} have exceeded the configured flush period by an average of {}ms", - flushCount, - journal.name, - format("%.2f", (finishedFlushAt - firstLaggedAt) * 1e-9d), - format("%.2f", flushDuration * 1e-6d / flushCount), - lagCount, - format("%.2f", lagDuration * 1e-6d / lagCount)); + noSpamLogger.warn(finishedFsyncAt, + "Out of {} {} journal flushes over the past {}s with average duration of {}ms, " + + "{} have exceeded the configured flush period by an average of {}ms", + fsyncCount, + journal.name, + format("%.2f", (finishedFsyncAt - firstLaggedAt) * 1e-9d), + format("%.2f", duration * 1e-6d / fsyncCount), + lagCount, + format("%.2f", lagDuration * 1e-6d / lagCount)); if (logged) // reset metrics for next log statement { firstLaggedAt = Long.MIN_VALUE; - flushCount = lagCount = 0; - flushDuration = lagDuration = 0; + fsyncCount = lagCount = 0; + duration = lagDuration = 0; } } + + private void afterFSync(long startedAt, long syncedSegment, int syncedOffset) + { + fsyncFinishedFor = startedAt; + callbacks.onFlush(syncedSegment, syncedOffset); + fsyncComplete.signalAll(); + long finishedAt = clock.now(); + processDuration(startedAt, finishedAt); + } + + private FSyncRunnable newFsyncRunnable() + { + final FSyncRunnable fSyncRunnable = new FSyncRunnable(); + fsyncExecutor = executorFactory().infiniteLoop(journal.name + "-fsync", fSyncRunnable, SAFE, NON_DAEMON, SYNCHRONIZED); + return fSyncRunnable; + } } @FunctionalInterface @@ -295,48 +429,40 @@ final class Flusher<K, V> written.incrementAndGet(); } - private void asyncFlushBatch(ActiveSegment<K, V>.Allocation alloc) + private void waitForFlushGroup(ActiveSegment<K, V>.Allocation alloc) { pending.incrementAndGet(); - requestExtraFlush(); - // alloc.awaitFlush(journal.metrics.waitingOnFlush); // TODO (expected): collect async flush metrics + alloc.awaitFlush(journal.metrics.waitingOnFlush); pending.decrementAndGet(); written.incrementAndGet(); } - private void waitForFlushGroup(ActiveSegment<K, V>.Allocation alloc) + private void waitForFlushPeriodic(ActiveSegment<K, V>.Allocation ignore) { - pending.incrementAndGet(); - alloc.awaitFlush(journal.metrics.waitingOnFlush); - pending.decrementAndGet(); + long expectedFlushTime = nanoTime() - periodicFlushLagBlockNanos(); + if (fsyncFinishedFor < expectedFlushTime) + { + pending.incrementAndGet(); + awaitFsyncAt(expectedFlushTime, journal.metrics.waitingOnFlush.time()); + pending.decrementAndGet(); + } written.incrementAndGet(); } - private void asyncFlushGroup(ActiveSegment<K, V>.Allocation alloc) + private void asyncFlushBatch(ActiveSegment<K, V>.Allocation alloc) { - pending.incrementAndGet(); - // alloc.awaitFlush(journal.metrics.waitingOnFlush); // TODO (expected): collect async flush metrics - pending.decrementAndGet(); + requestExtraFlush(); written.incrementAndGet(); } - private void waitForFlushPeriodic(ActiveSegment<K, V>.Allocation alloc) + private void asyncFlushGroup(ActiveSegment<K, V>.Allocation alloc) { - long expectedFlushTime = nanoTime() - periodicFlushLagBlockNanos(); - if (lastFlushedAt < expectedFlushTime) - { - pending.incrementAndGet(); - awaitFlushAt(expectedFlushTime, journal.metrics.waitingOnFlush.time()); - pending.decrementAndGet(); - } written.incrementAndGet(); } private void asyncFlushPeriodic(ActiveSegment<K, V>.Allocation ignore) { - pending.incrementAndGet(); - // awaitFlushAt(expectedFlushTime, journal.metrics.waitingOnFlush.time()); // TODO (expected): collect async flush metrics - pending.decrementAndGet(); + requestExtraFlush(); written.incrementAndGet(); } @@ -350,17 +476,17 @@ final class Flusher<K, V> haveWork.release(1); } - private void awaitFlushAt(long flushTime, Timer.Context context) + private void awaitFsyncAt(long flushTime, Timer.Context context) { do { - WaitQueue.Signal signal = flushComplete.register(context, Timer.Context::stop); - if (lastFlushedAt < flushTime) + WaitQueue.Signal signal = fsyncComplete.register(context, Timer.Context::stop); + if (fsyncFinishedFor < flushTime) signal.awaitUninterruptibly(); else signal.cancel(); } - while (lastFlushedAt < flushTime); + while (fsyncFinishedFor < flushTime); } private long flushPeriodNanos() diff --git a/src/java/org/apache/cassandra/journal/Journal.java b/src/java/org/apache/cassandra/journal/Journal.java index eae190a15e..aa61e5aca5 100644 --- a/src/java/org/apache/cassandra/journal/Journal.java +++ b/src/java/org/apache/cassandra/journal/Journal.java @@ -35,6 +35,7 @@ import java.util.zip.CRC32; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import accord.utils.Invariants; import com.codahale.metrics.Timer.Context; import org.agrona.collections.ObjectHashSet; import org.apache.cassandra.concurrent.Interruptible; @@ -456,9 +457,6 @@ public class Journal<K, V> implements Shutdownable // signal the allocator thread to prepare a new segment wakeAllocator(); - if (null != oldSegment) - closeActiveSegmentAndOpenAsStatic(oldSegment); - // request that the journal be flushed out-of-band, as we've finished a segment flusher.requestExtraFlush(); } @@ -659,6 +657,53 @@ public class Journal<K, V> implements Shutdownable segments().selectActive(currentSegment.descriptor.timestamp, into); } + ActiveSegment<K, V> oldestActiveSegment() + { + ActiveSegment<K, V> current = currentSegment; + if (current == null) + return null; + + ActiveSegment<K, V> oldest = segments().oldestActive(); + if (oldest == null || oldest.descriptor.timestamp > current.descriptor.timestamp) + return current; + + return oldest; + } + + ActiveSegment<K, V> currentActiveSegment() + { + return currentSegment; + } + + ActiveSegment<K, V> getActiveSegment(long timestamp) + { + // we can race with segment addition to the segments() collection, with a new segment appearing in currentSegment first + // since we are most likely to be requesting the currentSegment anyway, we resolve this case by checking currentSegment first + // and resort to the segments() collection only if we do not match + ActiveSegment<K, V> currentSegment = this.currentSegment; + if (currentSegment == null) + throw new IllegalArgumentException("Requested an active segment with timestamp " + timestamp + " but there is no currently active segment"); + long currentSegmentTimestamp = currentSegment.descriptor.timestamp; + if (timestamp == currentSegmentTimestamp) + { + return currentSegment; + } + else if (timestamp > currentSegmentTimestamp) + { + throw new IllegalArgumentException("Requested a newer timestamp " + timestamp + " than the current active segment " + currentSegmentTimestamp); + } + else + { + Segment<K, V> segment = segments().get(timestamp); + Invariants.checkState(segment != null, "Segment %d expected to be found, but neither current segment %d nor in active segments", timestamp, currentSegmentTimestamp); + if (segment == null) + throw new IllegalArgumentException("Request the active segment " + timestamp + " but this segment does not exist"); + if (!segment.isActive()) + throw new IllegalArgumentException("Request the active segment " + timestamp + " but this segment is not active"); + return segment.asActive(); + } + } + /** * Take care of a finished active segment: * 1. discard tail @@ -681,7 +726,7 @@ public class Journal<K, V> implements Shutdownable public void run() { activeSegment.discardUnusedTail(); - activeSegment.flush(); + activeSegment.flush(true); activeSegment.persistComponents(); replaceCompletedSegment(activeSegment, StaticSegment.open(activeSegment.descriptor, keySupport)); activeSegment.release(); diff --git a/src/java/org/apache/cassandra/journal/SegmentWriter.java b/src/java/org/apache/cassandra/journal/SegmentWriter.java index 852e955b21..b8436aed66 100644 --- a/src/java/org/apache/cassandra/journal/SegmentWriter.java +++ b/src/java/org/apache/cassandra/journal/SegmentWriter.java @@ -101,9 +101,9 @@ final class SegmentWriter<K> implements Closeable throw new JournalWriteError(descriptor, file, e); } - try (SyncedOffsets syncedOffsets = SyncedOffsets.active(descriptor, true)) + try (SyncedOffsets syncedOffsets = SyncedOffsets.active(descriptor)) { - syncedOffsets.mark(position()); + syncedOffsets.mark(position(), true); } index.persist(descriptor); diff --git a/src/java/org/apache/cassandra/journal/Segments.java b/src/java/org/apache/cassandra/journal/Segments.java index 0693997ef3..ca5ca47b2b 100644 --- a/src/java/org/apache/cassandra/journal/Segments.java +++ b/src/java/org/apache/cassandra/journal/Segments.java @@ -98,6 +98,21 @@ class Segments<K, V> into.add(segment.asActive()); } + ActiveSegment<K, V> oldestActive() + { + Segment<K, V> oldest = null; + for (Segment<K, V> segment : segments.values()) + if (segment.isActive() && (oldest == null || segment.descriptor.timestamp <= oldest.descriptor.timestamp)) + oldest = segment; + + return oldest == null ? null : oldest.asActive(); + } + + Segment<K, V> get(long timestamp) + { + return segments.get(timestamp); + } + void selectStatic(Collection<StaticSegment<K, V>> into) { for (Segment<K, V> segment : segments.values()) diff --git a/src/java/org/apache/cassandra/journal/SyncedOffsets.java b/src/java/org/apache/cassandra/journal/SyncedOffsets.java index bee302d6d8..cd05e6f8ac 100644 --- a/src/java/org/apache/cassandra/journal/SyncedOffsets.java +++ b/src/java/org/apache/cassandra/journal/SyncedOffsets.java @@ -50,7 +50,9 @@ interface SyncedOffsets extends Closeable * * @param offset the offset into datafile, up to which contents have been fsynced (exclusive) */ - void mark(int offset); + void mark(int offset, boolean fsync); + + void fsync(); @Override default void close() @@ -60,9 +62,9 @@ interface SyncedOffsets extends Closeable /** * @return a disk-backed synced offset tracker for a new {@link ActiveSegment} */ - static Active active(Descriptor descriptor, boolean syncOnMark) + static Active active(Descriptor descriptor) { - return new Active(descriptor, syncOnMark); + return new Active(descriptor); } /** @@ -87,15 +89,13 @@ interface SyncedOffsets extends Closeable final class Active implements SyncedOffsets { private final Descriptor descriptor; - private final boolean syncOnMark; private final FileOutputStreamPlus output; private volatile int syncedOffset; - private Active(Descriptor descriptor, boolean syncOnMark) + private Active(Descriptor descriptor) { this.descriptor = descriptor; - this.syncOnMark = syncOnMark; File file = descriptor.fileFor(Component.SYNCED_OFFSETS); if (file.exists()) @@ -123,7 +123,7 @@ interface SyncedOffsets extends Closeable } @Override - public void mark(int offset) + public void mark(int offset, boolean fsync) { if (offset < syncedOffset) throw new IllegalArgumentException("offset " + offset + " is smaller than previous mark " + offset); @@ -142,10 +142,10 @@ interface SyncedOffsets extends Closeable } syncedOffset = offset; - if (syncOnMark) sync(); + if (fsync) fsync(); } - private void sync() + public void fsync() { try { @@ -160,7 +160,7 @@ interface SyncedOffsets extends Closeable @Override public void close() { - if (!syncOnMark) sync(); + fsync(); try { @@ -218,7 +218,13 @@ interface SyncedOffsets extends Closeable } @Override - public void mark(int offset) + public void mark(int offset, boolean fsync) + { + throw new UnsupportedOperationException(); + } + + @Override + public void fsync() { throw new UnsupportedOperationException(); } @@ -235,7 +241,13 @@ interface SyncedOffsets extends Closeable } @Override - public void mark(int offset) + public void mark(int offset, boolean fsync) + { + throw new UnsupportedOperationException(); + } + + @Override + public void fsync() { throw new UnsupportedOperationException(); } diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java index c846038fd8..0f33f04d92 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java @@ -342,6 +342,7 @@ public class AccordCommandStore extends CommandStore implements CacheSize Runnable saveCommand(Command before, Command after) { Mutation mutation = AccordKeyspace.getCommandMutation(id, before, after, nextSystemTimestampMicros()); + // TODO (required): make sure we test recovering when this has failed to be persisted return null != mutation ? mutation::applyUnsafe : null; } diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java b/src/java/org/apache/cassandra/service/accord/AccordJournal.java index 80cfdf31ea..f9daf2354d 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java +++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java @@ -297,6 +297,10 @@ public class AccordJournal implements IJournal, Shutdownable // TODO (alexp): tests for objects that go through AccordJournal private class JournalCallbacks implements AsyncCallbacks<Key, Object> { + private JournalCallbacks() + { + } + /** * Queue up the record for either frame aggregation (if a protocol message) or frame application (if a frame). */ @@ -352,7 +356,7 @@ public class AccordJournal implements IJournal, Shutdownable private void onFrameWriteFailed(FrameRecord frame, FrameContext context, Throwable cause) { - // TODO: panic + // TODO (required): panic } @Override @@ -364,7 +368,7 @@ public class AccordJournal implements IJournal, Shutdownable @Override public void onFlushFailed(Throwable cause) { - // TODO: panic + // TODO (required): panic } } diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java index 2dd2b5d28d..b96ffa9bd0 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java +++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java @@ -97,9 +97,8 @@ import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.ReadExecutionController; import org.apache.cassandra.db.RegularAndStaticColumns; import org.apache.cassandra.db.SinglePartitionReadCommand; -import org.apache.cassandra.db.Slices; import org.apache.cassandra.db.filter.ClusteringIndexFilter; -import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter; +import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.filter.RowFilter; @@ -161,6 +160,7 @@ import org.apache.cassandra.service.accord.serializers.WaitingOnSerializer; import org.apache.cassandra.utils.Clock.Global; import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.btree.BTree; +import org.apache.cassandra.utils.btree.BTreeSet; import org.apache.cassandra.utils.bytecomparable.ByteComparable; import static accord.utils.Invariants.checkArgument; @@ -201,7 +201,7 @@ public class AccordKeyspace private static final LocalPartitioner FOR_KEYS_LOCAL_PARTITIONER = new LocalPartitioner(CompositeType.getInstance(Int32Type.instance, BytesType.instance, KEY_TYPE)); - private static final ClusteringIndexFilter FULL_PARTITION = new ClusteringIndexSliceFilter(Slices.ALL, false); + private static final ClusteringIndexFilter FULL_PARTITION = new ClusteringIndexNamesFilter(BTreeSet.of(new ClusteringComparator(), Clustering.EMPTY), false); //TODO (now, performance): should this be partitioner rather than TableId? As of this patch distributed tables should only have 1 partitioner... private static final ConcurrentMap<TableId, AccordRoutingKeyByteSource.Serializer> TABLE_SERIALIZERS = new ConcurrentHashMap<>(); @@ -571,6 +571,8 @@ public class AccordKeyspace + "data blob, " + "PRIMARY KEY((store_id, key_token, key))" + ')') + // TODO (expected): make this uncompressed, as not very compressable (except perhaps the primary key, but could switch to operating on tokens directly) +// + " WITH compression = {'enabled':'false'};") .partitioner(FOR_KEYS_LOCAL_PARTITIONER) .build(); } @@ -992,12 +994,12 @@ public class AccordKeyspace public static UntypedResultSet loadCommandRow(CommandStore commandStore, TxnId txnId) { - String cql = "SELECT * FROM %s.%s " + + String cql = "SELECT * FROM " + ACCORD_KEYSPACE_NAME + '.' + COMMANDS + ' ' + "WHERE store_id = ? " + "AND domain = ? " + "AND txn_id=(?, ?, ?)"; - return executeInternal(format(cql, ACCORD_KEYSPACE_NAME, COMMANDS), + return executeInternal(cql, commandStore.id(), txnId.domain().ordinal(), txnId.msb, txnId.lsb, txnId.node.id); @@ -1404,12 +1406,12 @@ public class AccordKeyspace public static UntypedResultSet loadTimestampsForKeyRow(CommandStore commandStore, PartitionKey key) { - String cql = "SELECT * FROM %s.%s " + + String cql = "SELECT * FROM " + ACCORD_KEYSPACE_NAME + '.' + TIMESTAMPS_FOR_KEY + ' ' + "WHERE store_id = ? " + "AND key_token = ? " + "AND key=(?, ?)"; - return executeInternal(format(cql, ACCORD_KEYSPACE_NAME, TIMESTAMPS_FOR_KEY), + return executeInternal(cql, commandStore.id(), serializeToken(key.token()), key.table().asUUID(), key.partitionKey().getKey()); @@ -1624,9 +1626,9 @@ public class AccordKeyspace private static EpochDiskState saveEpochDiskState(EpochDiskState diskState) { - String cql = "INSERT INTO %s.%s (key, min_epoch, max_epoch) VALUES (0, ?, ?);"; - executeInternal(format(cql, ACCORD_KEYSPACE_NAME, EPOCH_METADATA), - diskState.minEpoch, diskState.maxEpoch); + String cql = "INSERT INTO " + ACCORD_KEYSPACE_NAME + '.' + EPOCH_METADATA + ' ' + + "(key, min_epoch, max_epoch) VALUES (0, ?, ?);"; + executeInternal(cql, diskState.minEpoch, diskState.maxEpoch); return diskState; } @@ -1634,7 +1636,8 @@ public class AccordKeyspace @VisibleForTesting public static EpochDiskState loadEpochDiskState() { - String cql = "SELECT * FROM %s.%s WHERE key=0"; + String cql = "SELECT * FROM " + ACCORD_KEYSPACE_NAME + '.' + EPOCH_METADATA + ' ' + + "WHERE key=0"; UntypedResultSet result = executeInternal(format(cql, ACCORD_KEYSPACE_NAME, EPOCH_METADATA)); if (result.isEmpty()) return null; @@ -1668,8 +1671,9 @@ public class AccordKeyspace try { - String cql = "UPDATE %s.%s SET topology=? WHERE epoch=?"; - executeInternal(format(cql, ACCORD_KEYSPACE_NAME, TOPOLOGIES), + String cql = "UPDATE " + ACCORD_KEYSPACE_NAME + '.' + TOPOLOGIES + ' ' + + "SET topology=? WHERE epoch=?"; + executeInternal(cql, serialize(topology, LocalVersionedSerializers.topology), topology.epoch()); flush(Topologies); } @@ -1684,8 +1688,9 @@ public class AccordKeyspace public static EpochDiskState markRemoteTopologySync(Node.Id node, long epoch, EpochDiskState diskState) { diskState = maybeUpdateMaxEpoch(diskState, epoch); - String cql = "UPDATE %s.%s SET remote_sync_complete = remote_sync_complete + ? WHERE epoch = ?"; - executeInternal(format(cql, ACCORD_KEYSPACE_NAME, TOPOLOGIES), + String cql = "UPDATE " + ACCORD_KEYSPACE_NAME + '.' + TOPOLOGIES + ' ' + + "SET remote_sync_complete = remote_sync_complete + ? WHERE epoch = ?"; + executeInternal(cql, Collections.singleton(node.id), epoch); flush(Topologies); return diskState; @@ -1694,8 +1699,9 @@ public class AccordKeyspace public static EpochDiskState markClosed(Ranges ranges, long epoch, EpochDiskState diskState) { diskState = maybeUpdateMaxEpoch(diskState, epoch); - String cql = "UPDATE %s.%s SET closed = closed + ? WHERE epoch = ?"; - executeInternal(String.format(cql, ACCORD_KEYSPACE_NAME, TOPOLOGIES), + String cql = "UPDATE " + ACCORD_KEYSPACE_NAME + '.' + TOPOLOGIES + ' ' + + "SET closed = closed + ? WHERE epoch = ?"; + executeInternal(cql, KeySerializers.rangesToBlobMap(ranges), epoch); flush(Topologies); return diskState; @@ -1704,8 +1710,9 @@ public class AccordKeyspace public static EpochDiskState markRedundant(Ranges ranges, long epoch, EpochDiskState diskState) { diskState = maybeUpdateMaxEpoch(diskState, epoch); - String cql = "UPDATE %s.%s SET redundant = redundant + ? WHERE epoch = ?"; - executeInternal(String.format(cql, ACCORD_KEYSPACE_NAME, TOPOLOGIES), + String cql = "UPDATE " + ACCORD_KEYSPACE_NAME + '.' + TOPOLOGIES + ' ' + + "SET redundant = redundant + ? WHERE epoch = ?"; + executeInternal(cql, KeySerializers.rangesToBlobMap(ranges), epoch); flush(Topologies); return diskState; @@ -1714,8 +1721,9 @@ public class AccordKeyspace public static EpochDiskState setNotifyingLocalSync(long epoch, Set<Node.Id> pending, EpochDiskState diskState) { diskState = maybeUpdateMaxEpoch(diskState, epoch); - String cql = "UPDATE %s.%s SET sync_state = ?, pending_sync_notify = ? WHERE epoch = ?"; - executeInternal(format(cql, ACCORD_KEYSPACE_NAME, TOPOLOGIES), + String cql = "UPDATE " + ACCORD_KEYSPACE_NAME + '.' + TOPOLOGIES + ' ' + + "SET sync_state = ?, pending_sync_notify = ? WHERE epoch = ?"; + executeInternal(cql, SyncStatus.NOTIFYING.ordinal(), pending.stream().map(i -> i.id).collect(Collectors.toSet()), epoch); @@ -1725,8 +1733,9 @@ public class AccordKeyspace public static EpochDiskState markLocalSyncAck(Node.Id node, long epoch, EpochDiskState diskState) { diskState = maybeUpdateMaxEpoch(diskState, epoch); - String cql = "UPDATE %s.%s SET pending_sync_notify = pending_sync_notify - ? WHERE epoch = ?"; - executeInternal(format(cql, ACCORD_KEYSPACE_NAME, TOPOLOGIES), + String cql = "UPDATE " + ACCORD_KEYSPACE_NAME + '.' + TOPOLOGIES + ' ' + + "SET pending_sync_notify = pending_sync_notify - ? WHERE epoch = ?"; + executeInternal(cql, Collections.singleton(node.id), epoch); return diskState; } @@ -1734,8 +1743,9 @@ public class AccordKeyspace public static EpochDiskState setCompletedLocalSync(long epoch, EpochDiskState diskState) { diskState = maybeUpdateMaxEpoch(diskState, epoch); - String cql = "UPDATE %s.%s SET sync_state = ?, pending_sync_notify = {} WHERE epoch = ?"; - executeInternal(format(cql, ACCORD_KEYSPACE_NAME, TOPOLOGIES), + String cql = "UPDATE " + ACCORD_KEYSPACE_NAME + '.' + TOPOLOGIES + ' ' + + "SET sync_state = ?, pending_sync_notify = {} WHERE epoch = ?"; + executeInternal(cql, SyncStatus.COMPLETED.ordinal(), epoch); return diskState; @@ -1748,8 +1758,9 @@ public class AccordKeyspace long delete = diskState.minEpoch; diskState = diskState.withNewMinEpoch(delete + 1); saveEpochDiskState(diskState); - String cql = "DELETE FROM %s.%s WHERE epoch = ?"; - executeInternal(format(cql, ACCORD_KEYSPACE_NAME, TOPOLOGIES), delete); + String cql = "DELETE FROM " + ACCORD_KEYSPACE_NAME + '.' + TOPOLOGIES + ' ' + + "WHERE epoch = ?"; + executeInternal(cql, delete); } return diskState; } @@ -1762,7 +1773,8 @@ public class AccordKeyspace @VisibleForTesting public static void loadEpoch(long epoch, TopologyLoadConsumer consumer) throws IOException { - String cql = format("SELECT * FROM %s.%s WHERE epoch=?", ACCORD_KEYSPACE_NAME, TOPOLOGIES); + String cql = "SELECT * FROM " + ACCORD_KEYSPACE_NAME + '.' + TOPOLOGIES + ' ' + + "WHERE epoch=?"; UntypedResultSet result = executeInternal(cql, epoch); checkState(!result.isEmpty(), "Nothing found for epoch %d", epoch); diff --git a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java index 7346a6eebf..160813d722 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java +++ b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java @@ -347,7 +347,7 @@ public class AccordObjectSizes return size; Command.Committed committed = command.asCommitted(); - size += WaitingOnSerializer.serializedSize(committed.waitingOn); + size += WaitingOnSerializer.serializedSize(command.txnId(), committed.waitingOn); return size; } diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index d3954a208a..36f001162d 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -296,7 +296,7 @@ public class AccordService implements IAccordService, Shutdownable public static long uniqueNow() { - // TODO (correctness, now): This is not unique it's just currentTimeMillis as microseconds + // TODO (now, correctness): This is not unique it's just currentTimeMillis as microseconds return TimeUnit.MILLISECONDS.toMicros(Clock.Global.currentTimeMillis()); } diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java index 070fcfa0e6..e506bbf85c 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java @@ -223,8 +223,7 @@ public class CheckStatusSerializers Writes writes = CommandSerializers.nullableWrites.deserialize(in, version); Result result = null; - if (maxKnowledgeStatus == SaveStatus.PreApplied || maxKnowledgeStatus == SaveStatus.Applied - || maxKnowledgeStatus == SaveStatus.TruncatedApply || maxKnowledgeStatus == SaveStatus.TruncatedApplyWithOutcome || maxKnowledgeStatus == SaveStatus.TruncatedApplyWithDeps) + if (maxKnowledgeStatus.known.outcome.isOrWasApply()) result = CommandSerializers.APPLIED; return createOk(map, maxKnowledgeStatus, maxStatus, maxPromised, maxAcceptedOrCommitted, acceptedOrCommitted, executeAt, diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java index fbc3aeb22f..fe16d3033e 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java @@ -19,6 +19,7 @@ package org.apache.cassandra.service.accord.serializers; import java.io.IOException; +import java.nio.ByteBuffer; import com.google.common.base.Preconditions; @@ -99,6 +100,13 @@ public class CommandSerializers TopologySerializers.nodeId.serialize(ts.node, out, version); } + public void serialize(T ts, DataOutputPlus out) throws IOException + { + out.writeLong(ts.msb); + out.writeLong(ts.lsb); + TopologySerializers.NodeIdSerializer.serialize(ts.node, out); + } + public <V> int serialize(T ts, V dst, ValueAccessor<V> accessor, int offset) { int position = offset; @@ -110,6 +118,13 @@ public class CommandSerializers return size; } + public void serialize(T ts, ByteBuffer out) + { + out.putLong(ts.msb); + out.putLong(ts.lsb); + TopologySerializers.nodeId.serialize(ts.node, out); + } + @Override public T deserialize(DataInputPlus in, int version) throws IOException { @@ -118,6 +133,13 @@ public class CommandSerializers TopologySerializers.nodeId.deserialize(in, version)); } + public T deserialize(DataInputPlus in) throws IOException + { + return factory.create(in.readLong(), + in.readLong(), + TopologySerializers.NodeIdSerializer.deserialize(in)); + } + public <V> T deserialize(V src, ValueAccessor<V> accessor, int offset) { long msb = accessor.getLong(src, offset); @@ -128,6 +150,16 @@ public class CommandSerializers return factory.create(msb, lsb, node); } + public T deserialize(ByteBuffer buffer, int position) + { + long msb = buffer.getLong(position); + position += TypeSizes.LONG_SIZE; + long lsb = buffer.getLong(position); + position += TypeSizes.LONG_SIZE; + Node.Id node = TopologySerializers.nodeId.deserialize(buffer, position); + return factory.create(msb, lsb, node); + } + @Override public long serializedSize(T ts, int version) { diff --git a/src/java/org/apache/cassandra/service/accord/serializers/FetchSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/FetchSerializers.java index 370f88b73d..4512776154 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/FetchSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/FetchSerializers.java @@ -178,17 +178,8 @@ public class FetchSerializers Writes writes = CommandSerializers.nullableWrites.deserialize(in, version); Result result = null; - switch (maxSaveStatus) - { - case PreApplied: - case Applying: - case Applied: - case TruncatedApply: - case TruncatedApplyWithOutcome: - case TruncatedApplyWithDeps: - result = CommandSerializers.APPLIED; - break; - } + if (achieved.outcome.isOrWasApply()) + result = CommandSerializers.APPLIED; return Propagate.SerializerSupport.create(txnId, route, diff --git a/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java index c5c2f9a382..4693c03c5c 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java @@ -19,6 +19,7 @@ package org.apache.cassandra.service.accord.serializers; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.List; import java.util.Set; @@ -46,7 +47,7 @@ public class TopologySerializers { private NodeIdSerializer() {} - private static void serialize(Node.Id id, DataOutputPlus out) throws IOException + public static void serialize(Node.Id id, DataOutputPlus out) throws IOException { out.writeInt(id.id); } @@ -68,7 +69,12 @@ public class TopologySerializers return accessor.putInt(dst, offset, id.id); } - private static Node.Id deserialize(DataInputPlus in) throws IOException + public void serialize(Node.Id id, ByteBuffer out) + { + out.putInt(id.id); + } + + public static Node.Id deserialize(DataInputPlus in) throws IOException { return new Node.Id(in.readInt()); } @@ -90,6 +96,11 @@ public class TopologySerializers return new Node.Id(accessor.getInt(src, offset)); } + public <V> Node.Id deserialize(ByteBuffer src, int position) + { + return new Node.Id(src.getInt(position)); + } + public int serializedSize() { return TypeSizes.INT_SIZE; // id.id diff --git a/src/java/org/apache/cassandra/service/accord/serializers/WaitingOnSerializer.java b/src/java/org/apache/cassandra/service/accord/serializers/WaitingOnSerializer.java index 3efb9e2c6c..6c22d28440 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/WaitingOnSerializer.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/WaitingOnSerializer.java @@ -21,9 +21,11 @@ package org.apache.cassandra.service.accord.serializers; import java.io.IOException; import java.nio.ByteBuffer; +import accord.local.Command; import accord.local.Command.WaitingOn; import accord.primitives.Keys; import accord.primitives.Routable; +import accord.primitives.Timestamp; import accord.primitives.TxnId; import accord.utils.ImmutableBitSet; import accord.utils.Invariants; @@ -32,14 +34,18 @@ import accord.utils.SortedArrays.SortedArrayList; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.utils.vint.VIntCoding; public class WaitingOnSerializer { public static void serialize(TxnId txnId, WaitingOn waitingOn, DataOutputPlus out) throws IOException { - out.writeUnsignedVInt32(waitingOn.keys.size()); - out.writeUnsignedVInt32(waitingOn.txnIds.size()); + if (txnId.kind().awaitsOnlyDeps()) + { + Timestamp executeAtLeast = waitingOn.executeAtLeast(); + out.writeBoolean(executeAtLeast != null); + if (executeAtLeast != null) + CommandSerializers.timestamp.serialize(executeAtLeast, out); + } int keyCount = waitingOn.keys.size(); int txnIdCount = waitingOn.txnIds.size(); int waitingOnLength = (txnIdCount + keyCount + 63) / 64; @@ -53,8 +59,12 @@ public class WaitingOnSerializer public static WaitingOn deserialize(TxnId txnId, Keys keys, SortedArrayList<TxnId> txnIds, DataInputPlus in) throws IOException { - int a = in.readUnsignedVInt32(); - int b = in.readUnsignedVInt32(); + Timestamp executeAtLeast = null; + if (txnId.kind().awaitsOnlyDeps()) + { + if (in.readBoolean()) + executeAtLeast = CommandSerializers.timestamp.deserialize(in); + } int waitingOnLength = (txnIds.size() + keys.size() + 63) / 64; ImmutableBitSet waitingOn = deserialize(waitingOnLength, in); ImmutableBitSet appliedOrInvalidated = null; @@ -63,17 +73,26 @@ public class WaitingOnSerializer int appliedOrInvalidatedLength = (txnIds.size() + 63) / 64; appliedOrInvalidated = deserialize(appliedOrInvalidatedLength, in); } - return new WaitingOn(keys, txnIds, waitingOn, appliedOrInvalidated); + + WaitingOn result = new WaitingOn(keys, txnIds, waitingOn, appliedOrInvalidated); + if (executeAtLeast != null) + result = new Command.WaitingOnWithExecuteAt(result, executeAtLeast); + return result; } - public static long serializedSize(WaitingOn waitingOn) + public static long serializedSize(TxnId txnId, WaitingOn waitingOn) { int keyCount = waitingOn.keys.size(); int txnIdCount = waitingOn.txnIds.size(); int waitingOnLength = (txnIdCount + keyCount + 63) / 64; long size = serializedSize(waitingOnLength, waitingOn.waitingOn); - size += TypeSizes.sizeofUnsignedVInt(keyCount); - size += TypeSizes.sizeofUnsignedVInt(txnIdCount); + if (txnId.kind().awaitsOnlyDeps()) + { + Timestamp executeAtLeast = waitingOn.executeAtLeast(); + size += 1; + if (executeAtLeast != null) + size += CommandSerializers.timestamp.serializedSize(); + } if (waitingOn.appliedOrInvalidated == null) return size; @@ -113,10 +132,24 @@ public class WaitingOnSerializer if (txnId.domain() == Routable.Domain.Range) appliedOrInvalidatedLength = (txnIdCount + 63) / 64; - ByteBuffer out = ByteBuffer.allocate(TypeSizes.sizeofUnsignedVInt(keyCount) + TypeSizes.sizeofUnsignedVInt(txnIdCount) - + TypeSizes.LONG_SIZE * (waitingOnLength + appliedOrInvalidatedLength)); - VIntCoding.writeUnsignedVInt32(keyCount, out); - VIntCoding.writeUnsignedVInt32(txnIdCount, out); + int size = TypeSizes.LONG_SIZE * (waitingOnLength + appliedOrInvalidatedLength); + Timestamp executeAtLeast = null; + if (txnId.kind().awaitsOnlyDeps()) + { + executeAtLeast = waitingOn.executeAtLeast(); + size += 1; + if (executeAtLeast != null) + size += CommandSerializers.timestamp.serializedSize(); + } + + ByteBuffer out = ByteBuffer.allocate(size); + if (txnId.kind().awaitsOnlyDeps()) + { + out.put((byte)(executeAtLeast != null ? 1 : 0)); + if (executeAtLeast != null) + CommandSerializers.timestamp.serialize(executeAtLeast, out); + } + serialize(waitingOnLength, waitingOn.waitingOn, out); if (appliedOrInvalidatedLength > 0) serialize(appliedOrInvalidatedLength, waitingOn.appliedOrInvalidated, out); @@ -133,12 +166,18 @@ public class WaitingOnSerializer public static WaitingOn deserialize(TxnId txnId, Keys keys, SortedArrayList<TxnId> txnIds, ByteBuffer in) throws IOException { - int waitingOnLength = (txnIds.size() + keys.size() + 63) / 64; int position = in.position(); - int a = VIntCoding.readUnsignedVInt32(in, position); - position += TypeSizes.sizeofUnsignedVInt(a); - int b = VIntCoding.readUnsignedVInt32(in, position); - position += TypeSizes.sizeofUnsignedVInt(a); + Timestamp executeAtLeast = null; + if (txnId.kind().awaitsOnlyDeps()) + { + if (in.get(position++) != 0) + { + executeAtLeast = CommandSerializers.timestamp.deserialize(in, position); + position += CommandSerializers.timestamp.serializedSize(); + } + } + + int waitingOnLength = (txnIds.size() + keys.size() + 63) / 64; ImmutableBitSet waitingOn = deserialize(position, waitingOnLength, in); ImmutableBitSet appliedOrInvalidated = null; if (txnId.domain() == Routable.Domain.Range) @@ -147,7 +186,11 @@ public class WaitingOnSerializer int appliedOrInvalidatedLength = (txnIds.size() + 63) / 64; appliedOrInvalidated = deserialize(position, appliedOrInvalidatedLength, in); } - return new WaitingOn(keys, txnIds, waitingOn, appliedOrInvalidated); + + WaitingOn result = new WaitingOn(keys, txnIds, waitingOn, appliedOrInvalidated); + if (executeAtLeast != null) + result = new Command.WaitingOnWithExecuteAt(result, executeAtLeast); + return result; } private static ImmutableBitSet deserialize(int position, int length, ByteBuffer in) diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java index 7965decddb..009930955b 100644 --- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java +++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java @@ -1000,7 +1000,7 @@ public class ByteBufferUtil public static void writeLeastSignificantBytes(long register, int bytes, ByteBuffer out) { - writeMostSignificantBytesSlow(register << ((8 - bytes)*8), bytes, out); + writeMostSignificantBytes(register << ((8 - bytes)*8), bytes, out); } public static void writeMostSignificantBytes(long register, int bytes, ByteBuffer out) diff --git a/test/conf/logback-simulator.xml b/test/conf/logback-simulator.xml index a4c24aab8d..fe823383ee 100644 --- a/test/conf/logback-simulator.xml +++ b/test/conf/logback-simulator.xml @@ -16,7 +16,7 @@ ~ limitations under the License. --> -<configuration debug="false" scan="true" scanPeriod="60 seconds"> +<configuration debug="true" scan="true" scanPeriod="60 seconds"> <define name="run_start" class="org.apache.cassandra.simulator.logging.RunStartDefiner" scope="system"/> <define name="run_seed" class="org.apache.cassandra.simulator.logging.SeedDefiner" scope="system"/> <define name="cluster_id" class="org.apache.cassandra.distributed.impl.ClusterIDDefiner"/> diff --git a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java index ce0e95db03..fdc276de0b 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java +++ b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java @@ -36,6 +36,7 @@ import com.google.common.collect.ImmutableSet; import org.junit.After; import org.junit.BeforeClass; +import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.cql3.Duration; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.BooleanType; @@ -62,6 +63,8 @@ import org.apache.cassandra.distributed.api.IInvokableInstance; import org.apache.cassandra.distributed.shared.DistributedTestBase; import org.apache.cassandra.service.accord.AccordStateCache; +import static java.lang.System.currentTimeMillis; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.cassandra.config.CassandraRelevantProperties.JOIN_RING; import static org.apache.cassandra.config.CassandraRelevantProperties.RESET_BOOTSTRAP_PROGRESS; import static org.apache.cassandra.config.CassandraRelevantProperties.SKIP_GC_INSPECTOR; @@ -81,6 +84,7 @@ public class TestBaseImpl extends DistributedTestBase @BeforeClass public static void beforeClass() throws Throwable { + CassandraRelevantProperties.SIMULATOR_STARTED.setString(Long.toString(MILLISECONDS.toSeconds(currentTimeMillis()))); ICluster.setup(); SKIP_GC_INSPECTOR.setBoolean(true); AccordStateCache.validateLoadOnEvict(true); diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java index 42e7fbf34a..8e663ab966 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java @@ -19,9 +19,15 @@ package org.apache.cassandra.distributed.test.accord; import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; import java.util.Date; +import java.util.List; +import java.util.Map; import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; import com.google.common.util.concurrent.RateLimiter; import org.junit.BeforeClass; @@ -30,10 +36,16 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.ICoordinator; +import org.apache.cassandra.distributed.api.IMessage; +import org.apache.cassandra.distributed.api.IMessageFilters; +import org.apache.cassandra.net.Verb; import org.apache.cassandra.utils.EstimatedHistogram; +import static java.lang.System.currentTimeMillis; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; public class AccordLoadTest extends AccordTestBase @@ -43,20 +55,35 @@ public class AccordLoadTest extends AccordTestBase @BeforeClass public static void setUp() throws IOException { - AccordTestBase.setupCluster(builder -> builder.withConfig(config -> config.set("lwt_strategy", "accord").set("non_serial_write_strategy", "accord")), 2); + CassandraRelevantProperties.SIMULATOR_STARTED.setString(Long.toString(MILLISECONDS.toSeconds(currentTimeMillis()))); + AccordTestBase.setupCluster(builder -> builder, 2); } @Ignore @Test public void testLoad() throws Exception { - test("CREATE TABLE " + qualifiedTableName + " (k int, v int, PRIMARY KEY(k))", + test("CREATE TABLE " + qualifiedTableName + " (k int, v int, PRIMARY KEY(k)) WITH transactional_mode = 'full'", cluster -> { + + final ConcurrentHashMap<Verb, AtomicInteger> verbs = new ConcurrentHashMap<>(); + cluster.filters().outbound().messagesMatching(new IMessageFilters.Matcher() + { + @Override + public boolean matches(int i, int i1, IMessage iMessage) + { + verbs.computeIfAbsent(Verb.fromId(iMessage.verb()), ignore -> new AtomicInteger()).incrementAndGet(); + return false; + } + }).drop(); ICoordinator coordinator = cluster.coordinator(1); + final int repairInterval = 3000; final int batchSize = 1000; final int concurrency = 100; final int ratePerSecond = 1000; final int keyCount = 10; + final float readChance = 0.33f; + long nextRepairAt = repairInterval; for (int i = 1; i <= keyCount; i++) coordinator.execute("INSERT INTO " + qualifiedTableName + " (k, v) VALUES (0, 0) USING TIMESTAMP 0;", ConsistencyLevel.ALL, i); @@ -75,14 +102,66 @@ public class AccordLoadTest extends AccordTestBase inFlight.acquire(); rateLimiter.acquire(); long commandStart = System.nanoTime(); - coordinator.executeWithResult((success, fail) -> { - inFlight.release(); - if (fail == null) histogram.add(NANOSECONDS.toMicros(System.nanoTime() - commandStart)); -// else exceptions.add(fail); - }, "UPDATE " + qualifiedTableName + " SET v += 1 WHERE k = ? IF EXISTS;", ConsistencyLevel.SERIAL, ConsistencyLevel.QUORUM, random.nextInt(keyCount)); + if (random.nextFloat() < readChance) + { + coordinator.executeWithResult((success, fail) -> { + inFlight.release(); + if (fail == null) histogram.add(NANOSECONDS.toMicros(System.nanoTime() - commandStart)); + // else exceptions.add(fail); + }, "SELECT * FROM " + qualifiedTableName + " WHERE k = ?;", ConsistencyLevel.SERIAL, random.nextInt(keyCount)); + } + else + { + coordinator.executeWithResult((success, fail) -> { + inFlight.release(); + if (fail == null) histogram.add(NANOSECONDS.toMicros(System.nanoTime() - commandStart)); + // else exceptions.add(fail); + }, "UPDATE " + qualifiedTableName + " SET v += 1 WHERE k = ? IF EXISTS;", ConsistencyLevel.SERIAL, ConsistencyLevel.QUORUM, random.nextInt(keyCount)); + } + } + + if ((nextRepairAt -= batchSize) <= 0) + { + nextRepairAt += repairInterval; + System.out.println("repairing..."); + cluster.coordinator(1).instance().nodetool("repair", qualifiedTableName); + } + + final Date date = new Date(); + System.out.printf("%tT rate: %.2f/s\n", date, (((float)batchSize * 1000) / NANOSECONDS.toMillis(System.nanoTime() - batchStart))); + System.out.printf("%tT percentiles: %d %d %d %d\n", date, histogram.percentile(.25)/1000, histogram.percentile(.5)/1000, histogram.percentile(.75)/1000, histogram.percentile(1)/1000); + + class VerbCount + { + final Verb verb; + final int count; + + VerbCount(Verb verb, int count) + { + this.verb = verb; + this.count = count; + } + } + List<VerbCount> verbCounts = new ArrayList<>(); + for (Map.Entry<Verb, AtomicInteger> e : verbs.entrySet()) + { + int count = e.getValue().getAndSet(0); + if (count != 0) verbCounts.add(new VerbCount(e.getKey(), count)); + } + verbCounts.sort(Comparator.comparing(v -> -v.count)); + + StringBuilder verbSummary = new StringBuilder(); + for (VerbCount vs : verbCounts) + { + { + if (verbSummary.length() > 0) + verbSummary.append(", "); + verbSummary.append(vs.verb); + verbSummary.append(": "); + verbSummary.append(vs.count); + } } - System.out.printf("%tT rate: %.2f/s\n", new Date(), (((float)batchSize * 1000) / NANOSECONDS.toMillis(System.nanoTime() - batchStart))); - System.out.printf("%tT percentiles: %d %d %d %d\n", new Date(), histogram.percentile(.25)/1000, histogram.percentile(.5)/1000, histogram.percentile(.75)/1000, histogram.percentile(1)/1000); + System.out.printf("%tT verbs: %s\n", date, verbSummary); } } ); diff --git a/test/simulator/main/org/apache/cassandra/simulator/ClusterSimulation.java b/test/simulator/main/org/apache/cassandra/simulator/ClusterSimulation.java index 60a71b4b3b..8ff2aefe74 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/ClusterSimulation.java +++ b/test/simulator/main/org/apache/cassandra/simulator/ClusterSimulation.java @@ -60,6 +60,7 @@ import org.apache.cassandra.io.compress.LZ4Compressor; import org.apache.cassandra.io.filesystem.ListenableFileSystem; import org.apache.cassandra.io.util.FileSystems; import org.apache.cassandra.net.Verb; +import org.apache.cassandra.service.consensus.TransactionalMode; import org.apache.cassandra.service.paxos.BallotGenerator; import org.apache.cassandra.service.paxos.PaxosPrepare; import org.apache.cassandra.simulator.RandomSource.Choices; @@ -198,7 +199,7 @@ public class ClusterSimulation<S extends Simulation> implements AutoCloseable protected HeapPool.Logged.Listener memoryListener; protected SimulatedTime.Listener timeListener = (i1, i2) -> {}; protected LongConsumer onThreadLocalRandomCheck; - protected String lwtStrategy = "migration"; + protected String transactionalMode = "full"; public Builder<S> failures(Failures failures) { @@ -575,12 +576,17 @@ public class ClusterSimulation<S extends Simulation> implements AutoCloseable return this; } - public Builder<S> lwtStrategy(String strategy) + public Builder<S> transactionalMode(String mode) { - this.lwtStrategy = strategy; + this.transactionalMode = mode; return this; } + public TransactionalMode transactionalMode() + { + return TransactionalMode.fromString(transactionalMode); + } + public abstract ClusterSimulation<S> create(long seed) throws IOException; } @@ -774,7 +780,6 @@ public class ClusterSimulation<S extends Simulation> implements AutoCloseable .set("use_deterministic_table_id", true) .set("disk_access_mode", "standard") .set("failure_detector", SimulatedFailureDetector.Instance.class.getName()) - .set("lwt_strategy", builder.lwtStrategy) .set("commitlog_compression", new ParameterizedClass(LZ4Compressor.class.getName(), emptyMap())); ; configUpdater.accept(threadAllocator.update(config)); @@ -866,6 +871,7 @@ public class ClusterSimulation<S extends Simulation> implements AutoCloseable simulated.register((SimulatedFutureActionScheduler) futureActionScheduler); scheduler = builder.schedulerFactory.create(random); + // TODO (required): we aren't passing paxos variant change parameter anymore options = new ClusterActions.Options(builder.topologyChangeLimit, Choices.uniform(KindOfSequence.values()).choose(random).period(builder.topologyChangeIntervalNanos, random), Choices.random(random, builder.topologyChanges), builder.consensusChangeLimit, Choices.uniform(KindOfSequence.values()).choose(random).period(builder.consensusChangeIntervalNanos, random), diff --git a/test/simulator/main/org/apache/cassandra/simulator/SimulationRunner.java b/test/simulator/main/org/apache/cassandra/simulator/SimulationRunner.java index 05f3588911..e5c875cb6e 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/SimulationRunner.java +++ b/test/simulator/main/org/apache/cassandra/simulator/SimulationRunner.java @@ -344,7 +344,7 @@ public class SimulationRunner builder.debug(debugLevels, debugPrimaryKeys); } - Optional.ofNullable(lwtStrategy).ifPresent(builder::lwtStrategy); + Optional.ofNullable(lwtStrategy).ifPresent(builder::transactionalMode); } public void run(B builder) throws IOException diff --git a/test/simulator/main/org/apache/cassandra/simulator/paxos/AccordClusterSimulation.java b/test/simulator/main/org/apache/cassandra/simulator/paxos/AccordClusterSimulation.java index a75a1ef461..ee8fd0ca49 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/paxos/AccordClusterSimulation.java +++ b/test/simulator/main/org/apache/cassandra/simulator/paxos/AccordClusterSimulation.java @@ -71,11 +71,11 @@ class AccordClusterSimulation extends ClusterSimulation<PaxosSimulation> impleme int[] primaryKeys = primaryKeys(seed, builder.primaryKeyCount()); KindOfSequence.Period jitter = RandomSource.Choices.uniform(KindOfSequence.values()).choose(random) .period(builder.schedulerJitterNanos(), random); - return new PairOfSequencesAccordSimulation(simulated, cluster, options, - builder.readChance().select(random), builder.concurrency(), builder.primaryKeySeconds(), builder.withinKeyConcurrency(), - SERIAL, schedulers, builder.debug(), seed, - primaryKeys, builder.secondsToSimulate() >= 0 ? SECONDS.toNanos(builder.secondsToSimulate()) : -1, - () -> jitter.get(random)); + return new PairOfSequencesAccordSimulation(simulated, cluster, options, builder.transactionalMode(), + builder.readChance().select(random), builder.concurrency(), builder.primaryKeySeconds(), builder.withinKeyConcurrency(), + SERIAL, schedulers, builder.debug(), seed, + primaryKeys, builder.secondsToSimulate() >= 0 ? SECONDS.toNanos(builder.secondsToSimulate()) : -1, + () -> jitter.get(random)); }); } diff --git a/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesAccordSimulation.java b/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesAccordSimulation.java index 8d6c8a0dcc..7965c29fc1 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesAccordSimulation.java +++ b/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesAccordSimulation.java @@ -50,6 +50,7 @@ import org.apache.cassandra.distributed.api.SimpleQueryResult; import org.apache.cassandra.distributed.impl.Query; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.consensus.TransactionalMode; import org.apache.cassandra.simulator.Action; import org.apache.cassandra.simulator.Debug; import org.apache.cassandra.simulator.RunnableActionScheduler; @@ -123,10 +124,12 @@ public class PairOfSequencesAccordSimulation extends AbstractPairOfSequencesPaxo private final float writeRatio; private final HistoryValidator validator; + private final TransactionalMode transactionalMode; public PairOfSequencesAccordSimulation(SimulatedSystems simulated, Cluster cluster, ClusterActions.Options clusterOptions, + TransactionalMode transactionalMode, float readRatio, int concurrency, IntRange simulateKeyForSeconds, IntRange withinKeyConcurrency, ConsistencyLevel serialConsistency, RunnableActionScheduler scheduler, Debug debug, @@ -139,6 +142,7 @@ public class PairOfSequencesAccordSimulation extends AbstractPairOfSequencesPaxo scheduler, debug, seed, primaryKeys, runForNanos, jitter); + this.transactionalMode = transactionalMode; this.writeRatio = 1F - readRatio; HistoryValidator validator = new StrictSerializabilityValidator(primaryKeys); if (CassandraRelevantProperties.TEST_HISTORY_VALIDATOR_LOGGING_ENABLED.getBoolean()) @@ -149,7 +153,7 @@ public class PairOfSequencesAccordSimulation extends AbstractPairOfSequencesPaxo @Override protected String createTableStmt() { - return "CREATE TABLE " + KEYSPACE + ".tbl (pk int, count int, seq text, PRIMARY KEY (pk))"; + return String.format("CREATE TABLE " + KEYSPACE + ".tbl (pk int, count int, seq text, PRIMARY KEY (pk)) WITH transactional_mode = '%s'", transactionalMode); } @Override diff --git a/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosClusterSimulation.java b/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosClusterSimulation.java index 6e7d058bea..c54ba1c26d 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosClusterSimulation.java +++ b/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosClusterSimulation.java @@ -69,11 +69,6 @@ class PaxosClusterSimulation extends ClusterSimulation<PaxosSimulation> implemen random.reset(seed); return new PaxosClusterSimulation(random, seed, uniqueNum, this); } - - public String lwtStrategy() - { - return lwtStrategy; - } } PaxosClusterSimulation(RandomSource random, long seed, int uniqueNum, Builder builder) throws IOException diff --git a/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulationRunner.java b/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulationRunner.java index 71734c6e68..095c79769a 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulationRunner.java +++ b/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulationRunner.java @@ -69,7 +69,7 @@ public class PaxosSimulationRunner extends SimulationRunner @Override protected void run( long seed, PaxosClusterSimulation.Builder builder) throws IOException { - if (Objects.equals(builder.lwtStrategy(), "accord")) + if (Objects.equals(builder.transactionalMode(), "accord")) { // Apply handicaps builder.dcs(new IntRange(1, 1)); diff --git a/test/unit/org/apache/cassandra/journal/SyncedOffsetsTest.java b/test/unit/org/apache/cassandra/journal/SyncedOffsetsTest.java index 5b83ee88f1..b5df2b6b22 100644 --- a/test/unit/org/apache/cassandra/journal/SyncedOffsetsTest.java +++ b/test/unit/org/apache/cassandra/journal/SyncedOffsetsTest.java @@ -57,9 +57,9 @@ public class SyncedOffsetsTest Descriptor descriptor = Descriptor.create(directory, System.currentTimeMillis(), 1); - SyncedOffsets active = SyncedOffsets.active(descriptor, syncOnMark); + SyncedOffsets active = SyncedOffsets.active(descriptor); for (int i = 0; i < n; i++) - active.mark(i); + active.mark(i, syncOnMark); assertEquals(n - 1, active.syncedOffset()); active.close(); diff --git a/test/unit/org/apache/cassandra/service/accord/serializers/WaitingOnSerializerTest.java b/test/unit/org/apache/cassandra/service/accord/serializers/WaitingOnSerializerTest.java index 3df7d87e0d..07b626346a 100644 --- a/test/unit/org/apache/cassandra/service/accord/serializers/WaitingOnSerializerTest.java +++ b/test/unit/org/apache/cassandra/service/accord/serializers/WaitingOnSerializerTest.java @@ -57,7 +57,7 @@ public class WaitingOnSerializerTest TxnId txnId = TxnId.NONE; if (waitingOn.appliedOrInvalidated != null) txnId = new TxnId(txnId.epoch(), txnId.hlc(), txnId.kind(), Routable.Domain.Range, txnId.node); buffer.clear(); - long expectedSize = WaitingOnSerializer.serializedSize(waitingOn); + long expectedSize = WaitingOnSerializer.serializedSize(txnId, waitingOn); WaitingOnSerializer.serialize(txnId, waitingOn, buffer); Assertions.assertThat(buffer.getLength()).isEqualTo(expectedSize); Command.WaitingOn read = WaitingOnSerializer.deserialize(txnId, waitingOn.keys, waitingOn.txnIds, new DataInputBuffer(buffer.unsafeGetBufferAndFlip(), false)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org