This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new a0f7da9b23 Introduce Periodic mode to Accord Journal
a0f7da9b23 is described below

commit a0f7da9b2355d9cf7fddea59c882a76f4acbe73f
Author: Benedict Elliott Smith <bened...@apache.org>
AuthorDate: Thu May 23 14:20:09 2024 +0100

    Introduce Periodic mode to Accord Journal
    
    patch by Benedict; reviewed by Aleksey Yeschenko, Alex Petrov and David 
Capwell for CASSANDRA-19720
---
 .../org/apache/cassandra/config/AccordSpec.java    |   4 +-
 .../cassandra/config/DatabaseDescriptor.java       |   3 -
 .../apache/cassandra/journal/ActiveSegment.java    |  37 ++-
 src/java/org/apache/cassandra/journal/Flusher.java | 312 +++++++++++++++------
 src/java/org/apache/cassandra/journal/Journal.java |  53 +++-
 .../apache/cassandra/journal/SegmentWriter.java    |   4 +-
 .../org/apache/cassandra/journal/Segments.java     |  15 +
 .../apache/cassandra/journal/SyncedOffsets.java    |  36 ++-
 .../service/accord/AccordCommandStore.java         |   1 +
 .../cassandra/service/accord/AccordJournal.java    |   8 +-
 .../cassandra/service/accord/AccordKeyspace.java   |  68 +++--
 .../service/accord/AccordObjectSizes.java          |   2 +-
 .../cassandra/service/accord/AccordService.java    |   2 +-
 .../accord/serializers/CheckStatusSerializers.java |   3 +-
 .../accord/serializers/CommandSerializers.java     |  32 +++
 .../accord/serializers/FetchSerializers.java       |  13 +-
 .../accord/serializers/TopologySerializers.java    |  15 +-
 .../accord/serializers/WaitingOnSerializer.java    |  81 ++++--
 .../org/apache/cassandra/utils/ByteBufferUtil.java |   2 +-
 test/conf/logback-simulator.xml                    |   2 +-
 .../cassandra/distributed/test/TestBaseImpl.java   |   4 +
 .../distributed/test/accord/AccordLoadTest.java    |  97 ++++++-
 .../cassandra/simulator/ClusterSimulation.java     |  14 +-
 .../cassandra/simulator/SimulationRunner.java      |   2 +-
 .../simulator/paxos/AccordClusterSimulation.java   |  10 +-
 .../paxos/PairOfSequencesAccordSimulation.java     |   6 +-
 .../simulator/paxos/PaxosClusterSimulation.java    |   5 -
 .../simulator/paxos/PaxosSimulationRunner.java     |   2 +-
 .../cassandra/journal/SyncedOffsetsTest.java       |   4 +-
 .../serializers/WaitingOnSerializerTest.java       |   2 +-
 30 files changed, 620 insertions(+), 219 deletions(-)

diff --git a/src/java/org/apache/cassandra/config/AccordSpec.java 
b/src/java/org/apache/cassandra/config/AccordSpec.java
index b035b0b9b5..2e0d614957 100644
--- a/src/java/org/apache/cassandra/config/AccordSpec.java
+++ b/src/java/org/apache/cassandra/config/AccordSpec.java
@@ -30,7 +30,7 @@ public class AccordSpec
 
     public volatile OptionaldPositiveInt shard_count = 
OptionaldPositiveInt.UNDEFINED;
 
-    public volatile DurationSpec.IntSecondsBound progress_log_schedule_delay = 
new DurationSpec.IntSecondsBound(1);
+    public volatile DurationSpec.IntMillisecondsBound 
progress_log_schedule_delay = new DurationSpec.IntMillisecondsBound(100);
 
     /**
      * When a barrier transaction is requested how many times to repeat 
attempting the barrier before giving up
@@ -79,7 +79,7 @@ public class AccordSpec
     {
         public int segmentSize = 32 << 20;
         public FailurePolicy failurePolicy = FailurePolicy.STOP;
-        public FlushMode flushMode = FlushMode.BATCH;
+        public FlushMode flushMode = FlushMode.PERIODIC;
         public DurationSpec.IntMillisecondsBound flushPeriod; // pulls default 
from 'commitlog_sync_period'
         public DurationSpec.IntMillisecondsBound periodicFlushLagBlock = new 
DurationSpec.IntMillisecondsBound("1500ms");
 
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 285d7308b7..c98182cdd3 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -154,9 +154,6 @@ import static 
org.apache.cassandra.utils.Clock.Global.logInitializationOutcome;
 
 public class DatabaseDescriptor
 {
-    public static final String 
NO_ACCORD_PAXOS_STRATEGY_WITH_ACCORD_DISABLED_MESSAGE =
-    "Cannot use lwt_strategy \"accord\" while Accord transactions are 
disabled.";
-
     static
     {
         // This static block covers most usages
diff --git a/src/java/org/apache/cassandra/journal/ActiveSegment.java 
b/src/java/org/apache/cassandra/journal/ActiveSegment.java
index f16126c157..1fd2e4dd1a 100644
--- a/src/java/org/apache/cassandra/journal/ActiveSegment.java
+++ b/src/java/org/apache/cassandra/journal/ActiveSegment.java
@@ -24,6 +24,7 @@ import java.nio.channels.FileChannel;
 import java.nio.file.StandardOpenOption;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.locks.LockSupport;
 
 import com.codahale.metrics.Timer;
@@ -50,6 +51,9 @@ final class ActiveSegment<K, V> extends Segment<K, V>
      * Everything before this offset has been written and flushed.
      */
     private volatile int lastFlushedOffset = 0;
+    private volatile int lastFsyncOffset = 0;
+    @SuppressWarnings("rawtypes")
+    private static final AtomicIntegerFieldUpdater<ActiveSegment> 
lastFsyncOffsetUpdater = 
AtomicIntegerFieldUpdater.newUpdater(ActiveSegment.class, "lastFsyncOffset");
 
     /*
      * End position of the buffer; initially set to its capacity and
@@ -86,7 +90,7 @@ final class ActiveSegment<K, V> extends Segment<K, V>
     @SuppressWarnings("resource")
     static <K, V> ActiveSegment<K, V> create(Descriptor descriptor, Params 
params, KeySupport<K> keySupport)
     {
-        SyncedOffsets syncedOffsets = SyncedOffsets.active(descriptor, true);
+        SyncedOffsets syncedOffsets = SyncedOffsets.active(descriptor);
         InMemoryIndex<K> index = InMemoryIndex.create(keySupport);
         Metadata metadata = Metadata.create();
         return new ActiveSegment<>(descriptor, params, syncedOffsets, index, 
metadata, keySupport);
@@ -152,7 +156,7 @@ final class ActiveSegment<K, V> extends Segment<K, V>
         boolean isEmpty = discardUnusedTail();
         if (!isEmpty)
         {
-            flush();
+            flush(true);
             if (persistComponents) persistComponents();
         }
         release();
@@ -261,21 +265,37 @@ final class ActiveSegment<K, V> extends Segment<K, V>
      * TODO FIXME: calls from outside Flusher + callbacks
      * @return last synced offset
      */
-    synchronized int flush()
+    synchronized int flush(boolean fsync)
     {
         int allocatePosition = this.allocatePosition.get();
         if (lastFlushedOffset >= allocatePosition)
             return lastFlushedOffset;
 
         waitForModifications();
-        flushInternal();
+        if (fsync)
+        {
+            fsyncInternal();
+            lastFsyncOffsetUpdater.accumulateAndGet(this, allocatePosition, 
Math::max);
+        }
         lastFlushedOffset = allocatePosition;
         int syncedOffset = Math.min(allocatePosition, endOfBuffer);
-        syncedOffsets.mark(syncedOffset);
+        syncedOffsets.mark(syncedOffset, fsync);
         flushComplete.signalAll();
         return syncedOffset;
     }
 
+    // provides no ordering guarantees
+    void fsync()
+    {
+        int lastFlushed = lastFlushedOffset;
+        if (lastFsyncOffset >= lastFlushed)
+            return;
+
+        fsyncInternal();
+        syncedOffsets.fsync();
+        lastFsyncOffsetUpdater.accumulateAndGet(this, lastFlushed, Math::max);
+    }
+
     private void waitForFlush(int position)
     {
         while (lastFlushedOffset < position)
@@ -297,7 +317,7 @@ final class ActiveSegment<K, V> extends Segment<K, V>
         appendOrder.awaitNewBarrier();
     }
 
-    private void flushInternal()
+    private void fsyncInternal()
     {
         try
         {
@@ -314,6 +334,11 @@ final class ActiveSegment<K, V> extends Segment<K, V>
         return syncedOffset >= endOfBuffer;
     }
 
+    boolean isCompletedAndFullyFsynced()
+    {
+        return lastFsyncOffset >= endOfBuffer;
+    }
+
     /**
      * Ensures no more of this segment is writeable, by allocating any unused 
section at the end
      * and marking it discarded void discartUnusedTail()
diff --git a/src/java/org/apache/cassandra/journal/Flusher.java 
b/src/java/org/apache/cassandra/journal/Flusher.java
index c4c6d75348..607943b635 100644
--- a/src/java/org/apache/cassandra/journal/Flusher.java
+++ b/src/java/org/apache/cassandra/journal/Flusher.java
@@ -17,12 +17,15 @@
  */
 package org.apache.cassandra.journal;
 
-import java.util.ArrayList;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
+
+import javax.annotation.Nullable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import accord.utils.Invariants;
 import com.codahale.metrics.Timer;
 import org.apache.cassandra.concurrent.Interruptible;
 import org.apache.cassandra.concurrent.Interruptible.TerminateException;
@@ -33,7 +36,6 @@ import org.apache.cassandra.utils.concurrent.Semaphore;
 import org.apache.cassandra.utils.concurrent.WaitQueue;
 
 import static java.lang.String.format;
-import static java.util.Comparator.comparing;
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
@@ -42,7 +44,7 @@ import static 
org.apache.cassandra.concurrent.InfiniteLoopExecutor.Interrupts.SY
 import static 
org.apache.cassandra.concurrent.InfiniteLoopExecutor.SimulatorSafe.SAFE;
 import static org.apache.cassandra.concurrent.Interruptible.State.NORMAL;
 import static 
org.apache.cassandra.concurrent.Interruptible.State.SHUTTING_DOWN;
-import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
+import static org.apache.cassandra.journal.Params.FlushMode.PERIODIC;
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 import static org.apache.cassandra.utils.MonotonicClock.Global.preciseTime;
 import static org.apache.cassandra.utils.Simulate.With.GLOBAL_CLOCK;
@@ -60,16 +62,19 @@ final class Flusher<K, V>
     private final AsyncCallbacks<K, V> callbacks;
 
     private volatile Interruptible flushExecutor;
+    private volatile Interruptible fsyncExecutor;
 
     // counts of total pending write and written entries
     private final AtomicLong pending = new AtomicLong(0);
     private final AtomicLong written = new AtomicLong(0);
 
-    // all Allocations written before this time will be flushed
-    volatile long lastFlushedAt = currentTimeMillis();
+    // the time of the last initiated flush
+    volatile long flushStartedAt = nanoTime();
+    // the time of the earliest flush that has completed an fsync; all 
Allocations written before this time are durable
+    volatile long fsyncFinishedFor = flushStartedAt;
 
     // a signal that writers can wait on to be notified of a completed flush 
in PERIODIC FlushMode
-    private final WaitQueue flushComplete = newWaitQueue();
+    private final WaitQueue fsyncComplete = newWaitQueue(); // TODO 
(expected): this is only used for testing, can we remove this?
 
     // a signal and flag that callers outside the flusher thread can use
     // to signal they want the journal segments to be flushed to disk
@@ -97,20 +102,144 @@ final class Flusher<K, V>
     void shutdown()
     {
         flushExecutor.shutdown();
+        if (fsyncExecutor != null)
+            fsyncExecutor.shutdown();
     }
 
     @Simulate(with={MONITORS,GLOBAL_CLOCK,LOCK_SUPPORT})
     private class FlushRunnable implements Interruptible.Task
     {
-        private final MonotonicClock clock;
+        @Simulate(with={MONITORS,GLOBAL_CLOCK,LOCK_SUPPORT})
+        private class FSyncRunnable implements Interruptible.Task
+        {
+            // this is written only by the Flusher thread, and read only by 
the Fsync thread
+            ActiveSegment<K, V> fsyncUpTo;
+            ActiveSegment<K, V> fsyncing;
+
+            private volatile Thread awaitingWork;
+
+            // all Allocations written before this time will be written to at 
least the OS page cache;
+            volatile long fsyncWaitingSince = 0;
+            // the time of the earliest flush that has begun participating in 
an fsync
+            volatile long fsyncStartedFor = 0;
+
+            @Override
+            public void run(Interruptible.State state) throws 
InterruptedException
+            {
+                try
+                {
+                    doRun(state);
+                }
+                catch (Throwable t)
+                {
+                    if (!journal.handleError("Failed to flush segments to 
disk", t))
+                        throw new TerminateException();
+                }
+            }
+
+            private void awaitWork() throws InterruptedException
+            {
+                long lastStartedAt = fsyncStartedFor;
+                if (fsyncWaitingSince != lastStartedAt)
+                    return;
+
+                awaitingWork = Thread.currentThread();
+                do
+                {
+                    if (Thread.interrupted())
+                    {
+                        awaitingWork = null;
+                        throw new InterruptedException();
+                    }
+
+                    LockSupport.park();
+                }
+                while (fsyncWaitingSince == lastStartedAt);
+
+                awaitingWork = null;
+            }
+
+            void notify(Thread notify)
+            {
+                if (notify != null)
+                    LockSupport.unpark(notify);
+            }
+
+            public void doRun(Interruptible.State state) throws 
InterruptedException
+            {
+                awaitWork();
+                if (fsyncing == null)
+                    fsyncing = journal.oldestActiveSegment();
+
+                // invert order of access; we might see a future fsyncTo, but 
at worst this means redundantly invoking fsync before updating fsyncStartedFor
+                long startedAt = fsyncWaitingSince;
+                ActiveSegment<K, V> fsyncTo = this.fsyncUpTo;
+                fsyncStartedFor = startedAt;
+                // synchronized to prevent thread interrupts while performing 
IO operations and also
+                // clear interrupted status to prevent 
ClosedByInterruptException in ActiveSegment::flush
+                synchronized (this)
+                {
+                    boolean ignore = Thread.interrupted();
+                    while (fsyncing != fsyncTo)
+                    {
+                        fsyncing.fsync();
+                        journal.closeActiveSegmentAndOpenAsStatic(fsyncing);
+                        fsyncing = 
journal.getActiveSegment(fsyncing.descriptor.timestamp + 1);
+                    }
+                    fsyncing.fsync();
+                }
+                fsyncFinishedFor = startedAt;
+                fsyncComplete.signalAll();
+                long finishedAt = clock.now();
+                processDuration(startedAt, finishedAt);
+            }
+
+            void afterFlush(long startedAt, ActiveSegment<K, V> segment, int 
syncedOffset)
+            {
+                long requireFsyncTo = startedAt - periodicFlushLagBlockNanos();
+
+                fsyncUpTo = segment;
+                fsyncWaitingSince = startedAt;
+
+                notify(awaitingWork);
+
+                if (requireFsyncTo > fsyncFinishedFor)
+                    awaitFsyncAt(requireFsyncTo, 
journal.metrics.waitingOnFlush.time());
+                callbacks.onFlush(segment.descriptor.timestamp, syncedOffset);
+            }
+
+            private void doNoOpFlush(long startedAt)
+            {
+                if (fsyncFinishedFor >= fsyncWaitingSince)
+                {
+                    fsyncFinishedFor = startedAt;
+                }
+                else
+                {
+                    // if the flusher is still running, update the 
waitingSince register
+                    fsyncWaitingSince = startedAt;
+                    notify(awaitingWork);
+                }
+            }
+        }
+
         private final NoSpamLogger noSpamLogger;
+        private final MonotonicClock clock;
+        private final @Nullable FSyncRunnable fSyncRunnable;
+
+        private ActiveSegment<K, V> current = null;
 
-        private final ArrayList<ActiveSegment<K, V>> segmentsToFlush = new 
ArrayList<>();
+        private long firstLaggedAt = Long.MIN_VALUE; // first lag ever or 
since last logged warning
+        private int fsyncCount = 0;                  // flush count since 
firstLaggedAt
+        private int lagCount = 0;                    // lag count since 
firstLaggedAt
+        private long duration = 0;              // time spent flushing since 
firstLaggedAt
+        private long lagDuration = 0;                // cumulative lag since 
firstLaggedAt
 
         FlushRunnable(MonotonicClock clock)
         {
-            this.clock = clock;
             this.noSpamLogger = NoSpamLogger.wrap(logger, 5, MINUTES);
+            this.clock = clock;
+            this.fSyncRunnable = params.flushMode() == PERIODIC ? 
newFsyncRunnable() : null;
         }
 
         @Override
@@ -131,8 +260,9 @@ final class Flusher<K, V>
 
         public void doRun(Interruptible.State state) throws 
InterruptedException
         {
-            long startedRunAt = clock.now();
-            boolean flushToDisk = lastFlushedAt + flushPeriodNanos() <= 
startedRunAt || state != NORMAL || flushRequested;
+            long startedAt = clock.now();
+            long flushPeriodNanos = flushPeriodNanos();
+            boolean flushToDisk = flushStartedAt + flushPeriodNanos <= 
startedAt || state != NORMAL || flushRequested;
 
             // synchronized to prevent thread interrupts while performing IO 
operations and also
             // clear interrupted status to prevent ClosedByInterruptException 
in ActiveSegment::flush
@@ -142,83 +272,71 @@ final class Flusher<K, V>
                 if (flushToDisk)
                 {
                     flushRequested = false;
-                    doFlush();
-                    lastFlushedAt = startedRunAt;
-                    flushComplete.signalAll();
+                    flushStartedAt = startedAt;
+                    doFlush(startedAt);
                 }
             }
 
-            long now = clock.now();
-            if (flushToDisk)
-                processFlushDuration(startedRunAt, now);
-
             if (state == SHUTTING_DOWN)
                 return;
 
-            long flushPeriodNanos = flushPeriodNanos();
             if (flushPeriodNanos <= 0)
             {
                 haveWork.acquire(1);
             }
             else
             {
-                long wakeUpAt = startedRunAt + flushPeriodNanos;
-                if (wakeUpAt > now)
-                    haveWork.tryAcquireUntil(1, wakeUpAt);
+                long wakeUpAt = startedAt + flushPeriodNanos;
+                haveWork.tryAcquireUntil(1, wakeUpAt);
             }
         }
 
-        private void doFlush()
+        private void doFlush(long startedAt) throws InterruptedException
         {
-            journal.selectSegmentToFlush(segmentsToFlush);
-            segmentsToFlush.sort(comparing(s -> s.descriptor));
+            boolean synchronousFsync = fSyncRunnable == null;
 
-            try
-            {
-                long syncedSegment = -1;
-                int syncedOffset = -1;
+            if (current == null)
+                current = journal.oldestActiveSegment();
+            ActiveSegment<K, V> newCurrent = journal.currentActiveSegment();
 
-                for (ActiveSegment<K, V> segment : segmentsToFlush)
-                {
-                    if (!segment.shouldFlush())
-                        break;
+            if (newCurrent == current && (newCurrent == null || 
!newCurrent.shouldFlush()))
+            {
+                if (synchronousFsync) fsyncFinishedFor = startedAt;
+                else fSyncRunnable.doNoOpFlush(startedAt);
+                return;
+            }
 
-                    syncedSegment = segment.descriptor.timestamp;
-                    syncedOffset = segment.flush();
+            Invariants.checkState(newCurrent != null);
 
-                    // if an older segment isn't fully complete + flushed yet, 
don't attempt to flush any younger ones
-                    if (!segment.isCompletedAndFullyFlushed(syncedOffset))
-                        break;
+            try
+            {
+                while (current != newCurrent)
+                {
+                    current.discardUnusedTail();
+                    current.flush(synchronousFsync);
+                    if (synchronousFsync)
+                        journal.closeActiveSegmentAndOpenAsStatic(current);
+                    current = 
journal.getActiveSegment(current.descriptor.timestamp + 1);
                 }
+                int syncedOffset = current.flush(synchronousFsync);
 
-                // invoke the onFlush() callback once, covering entire flushed 
range across all flushed segments
-                if (syncedSegment != -1 && syncedOffset != -1)
-                    callbacks.onFlush(syncedSegment, syncedOffset);
+                if (synchronousFsync) afterFSync(startedAt, 
current.descriptor.timestamp, syncedOffset);
+                else fSyncRunnable.afterFlush(startedAt, current, 
syncedOffset);
             }
             catch (Throwable t)
             {
                 callbacks.onFlushFailed(t);
                 throw t;
             }
-            finally
-            {
-                segmentsToFlush.clear();
-            }
         }
 
-        private long firstLaggedAt = Long.MIN_VALUE; // first lag ever or 
since last logged warning
-        private int flushCount = 0;                  // flush count since 
firstLaggedAt
-        private int lagCount = 0;                    // lag count since 
firstLaggedAt
-        private long flushDuration = 0;              // time spent flushing 
since firstLaggedAt
-        private long lagDuration = 0;                // cumulative lag since 
firstLaggedAt
-
-        private void processFlushDuration(long startedFlushAt, long 
finishedFlushAt)
+        private void processDuration(long startedFlushAt, long finishedFsyncAt)
         {
-            flushCount++;
-            flushDuration += (finishedFlushAt - startedFlushAt);
+            fsyncCount++;
+            duration += (finishedFsyncAt - startedFlushAt);
 
             long flushPeriodNanos = flushPeriodNanos();
-            long lag = finishedFlushAt - (startedFlushAt + flushPeriodNanos);
+            long lag = finishedFsyncAt - (startedFlushAt + flushPeriodNanos);
             if (flushPeriodNanos <= 0 || lag <= 0)
                 return;
 
@@ -226,26 +344,42 @@ final class Flusher<K, V>
             lagDuration += lag;
 
             if (firstLaggedAt == Long.MIN_VALUE)
-                firstLaggedAt = finishedFlushAt;
+                firstLaggedAt = finishedFsyncAt;
 
             boolean logged =
-                noSpamLogger.warn(finishedFlushAt,
-                                  "Out of {} {} journal flushes over the past 
{}s with average duration of {}ms, " +
-                                      "{} have exceeded the configured flush 
period by an average of {}ms",
-                                  flushCount,
-                                  journal.name,
-                                  format("%.2f", (finishedFlushAt - 
firstLaggedAt) * 1e-9d),
-                                  format("%.2f", flushDuration * 1e-6d / 
flushCount),
-                                  lagCount,
-                                  format("%.2f", lagDuration * 1e-6d / 
lagCount));
+            noSpamLogger.warn(finishedFsyncAt,
+                              "Out of {} {} journal flushes over the past {}s 
with average duration of {}ms, " +
+                              "{} have exceeded the configured flush period by 
an average of {}ms",
+                              fsyncCount,
+                              journal.name,
+                              format("%.2f", (finishedFsyncAt - firstLaggedAt) 
* 1e-9d),
+                              format("%.2f", duration * 1e-6d / fsyncCount),
+                              lagCount,
+                              format("%.2f", lagDuration * 1e-6d / lagCount));
 
             if (logged) // reset metrics for next log statement
             {
                 firstLaggedAt = Long.MIN_VALUE;
-                flushCount = lagCount = 0;
-                flushDuration = lagDuration = 0;
+                fsyncCount = lagCount = 0;
+                duration = lagDuration = 0;
             }
         }
+
+        private void afterFSync(long startedAt, long syncedSegment, int 
syncedOffset)
+        {
+            fsyncFinishedFor = startedAt;
+            callbacks.onFlush(syncedSegment, syncedOffset);
+            fsyncComplete.signalAll();
+            long finishedAt = clock.now();
+            processDuration(startedAt, finishedAt);
+        }
+
+        private FSyncRunnable newFsyncRunnable()
+        {
+            final FSyncRunnable fSyncRunnable = new FSyncRunnable();
+            fsyncExecutor = executorFactory().infiniteLoop(journal.name + 
"-fsync", fSyncRunnable, SAFE, NON_DAEMON, SYNCHRONIZED);
+            return fSyncRunnable;
+        }
     }
 
     @FunctionalInterface
@@ -295,48 +429,40 @@ final class Flusher<K, V>
         written.incrementAndGet();
     }
 
-    private void asyncFlushBatch(ActiveSegment<K, V>.Allocation alloc)
+    private void waitForFlushGroup(ActiveSegment<K, V>.Allocation alloc)
     {
         pending.incrementAndGet();
-        requestExtraFlush();
-        // alloc.awaitFlush(journal.metrics.waitingOnFlush); // TODO 
(expected): collect async flush metrics
+        alloc.awaitFlush(journal.metrics.waitingOnFlush);
         pending.decrementAndGet();
         written.incrementAndGet();
     }
 
-    private void waitForFlushGroup(ActiveSegment<K, V>.Allocation alloc)
+    private void waitForFlushPeriodic(ActiveSegment<K, V>.Allocation ignore)
     {
-        pending.incrementAndGet();
-        alloc.awaitFlush(journal.metrics.waitingOnFlush);
-        pending.decrementAndGet();
+        long expectedFlushTime = nanoTime() - periodicFlushLagBlockNanos();
+        if (fsyncFinishedFor < expectedFlushTime)
+        {
+            pending.incrementAndGet();
+            awaitFsyncAt(expectedFlushTime, 
journal.metrics.waitingOnFlush.time());
+            pending.decrementAndGet();
+        }
         written.incrementAndGet();
     }
 
-    private void asyncFlushGroup(ActiveSegment<K, V>.Allocation alloc)
+    private void asyncFlushBatch(ActiveSegment<K, V>.Allocation alloc)
     {
-        pending.incrementAndGet();
-        // alloc.awaitFlush(journal.metrics.waitingOnFlush); // TODO 
(expected): collect async flush metrics
-        pending.decrementAndGet();
+        requestExtraFlush();
         written.incrementAndGet();
     }
 
-    private void waitForFlushPeriodic(ActiveSegment<K, V>.Allocation alloc)
+    private void asyncFlushGroup(ActiveSegment<K, V>.Allocation alloc)
     {
-        long expectedFlushTime = nanoTime() - periodicFlushLagBlockNanos();
-        if (lastFlushedAt < expectedFlushTime)
-        {
-            pending.incrementAndGet();
-            awaitFlushAt(expectedFlushTime, 
journal.metrics.waitingOnFlush.time());
-            pending.decrementAndGet();
-        }
         written.incrementAndGet();
     }
 
     private void asyncFlushPeriodic(ActiveSegment<K, V>.Allocation ignore)
     {
-        pending.incrementAndGet();
-        // awaitFlushAt(expectedFlushTime, 
journal.metrics.waitingOnFlush.time()); // TODO (expected): collect async flush 
metrics
-        pending.decrementAndGet();
+        requestExtraFlush();
         written.incrementAndGet();
     }
 
@@ -350,17 +476,17 @@ final class Flusher<K, V>
         haveWork.release(1);
     }
 
-    private void awaitFlushAt(long flushTime, Timer.Context context)
+    private void awaitFsyncAt(long flushTime, Timer.Context context)
     {
         do
         {
-            WaitQueue.Signal signal = flushComplete.register(context, 
Timer.Context::stop);
-            if (lastFlushedAt < flushTime)
+            WaitQueue.Signal signal = fsyncComplete.register(context, 
Timer.Context::stop);
+            if (fsyncFinishedFor < flushTime)
                 signal.awaitUninterruptibly();
             else
                 signal.cancel();
         }
-        while (lastFlushedAt < flushTime);
+        while (fsyncFinishedFor < flushTime);
     }
 
     private long flushPeriodNanos()
diff --git a/src/java/org/apache/cassandra/journal/Journal.java 
b/src/java/org/apache/cassandra/journal/Journal.java
index eae190a15e..aa61e5aca5 100644
--- a/src/java/org/apache/cassandra/journal/Journal.java
+++ b/src/java/org/apache/cassandra/journal/Journal.java
@@ -35,6 +35,7 @@ import java.util.zip.CRC32;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import accord.utils.Invariants;
 import com.codahale.metrics.Timer.Context;
 import org.agrona.collections.ObjectHashSet;
 import org.apache.cassandra.concurrent.Interruptible;
@@ -456,9 +457,6 @@ public class Journal<K, V> implements Shutdownable
         // signal the allocator thread to prepare a new segment
         wakeAllocator();
 
-        if (null != oldSegment)
-            closeActiveSegmentAndOpenAsStatic(oldSegment);
-
         // request that the journal be flushed out-of-band, as we've finished 
a segment
         flusher.requestExtraFlush();
     }
@@ -659,6 +657,53 @@ public class Journal<K, V> implements Shutdownable
         segments().selectActive(currentSegment.descriptor.timestamp, into);
     }
 
+    ActiveSegment<K, V> oldestActiveSegment()
+    {
+        ActiveSegment<K, V> current = currentSegment;
+        if (current == null)
+            return null;
+
+        ActiveSegment<K, V> oldest = segments().oldestActive();
+        if (oldest == null || oldest.descriptor.timestamp > 
current.descriptor.timestamp)
+            return current;
+
+        return oldest;
+    }
+
+    ActiveSegment<K, V> currentActiveSegment()
+    {
+        return currentSegment;
+    }
+
+    ActiveSegment<K, V> getActiveSegment(long timestamp)
+    {
+        // we can race with segment addition to the segments() collection, 
with a new segment appearing in currentSegment first
+        // since we are most likely to be requesting the currentSegment 
anyway, we resolve this case by checking currentSegment first
+        // and resort to the segments() collection only if we do not match
+        ActiveSegment<K, V> currentSegment = this.currentSegment;
+        if (currentSegment == null)
+            throw new IllegalArgumentException("Requested an active segment 
with timestamp " + timestamp + " but there is no currently active segment");
+        long currentSegmentTimestamp = currentSegment.descriptor.timestamp;
+        if (timestamp == currentSegmentTimestamp)
+        {
+            return currentSegment;
+        }
+        else if (timestamp > currentSegmentTimestamp)
+        {
+            throw new IllegalArgumentException("Requested a newer timestamp " 
+ timestamp + " than the current active segment " + currentSegmentTimestamp);
+        }
+        else
+        {
+            Segment<K, V> segment = segments().get(timestamp);
+            Invariants.checkState(segment != null, "Segment %d expected to be 
found, but neither current segment %d nor in active segments", timestamp, 
currentSegmentTimestamp);
+            if (segment == null)
+                throw new IllegalArgumentException("Request the active segment 
" + timestamp + " but this segment does not exist");
+            if (!segment.isActive())
+                throw new IllegalArgumentException("Request the active segment 
" + timestamp + " but this segment is not active");
+            return segment.asActive();
+        }
+    }
+
     /**
      * Take care of a finished active segment:
      * 1. discard tail
@@ -681,7 +726,7 @@ public class Journal<K, V> implements Shutdownable
         public void run()
         {
             activeSegment.discardUnusedTail();
-            activeSegment.flush();
+            activeSegment.flush(true);
             activeSegment.persistComponents();
             replaceCompletedSegment(activeSegment, 
StaticSegment.open(activeSegment.descriptor, keySupport));
             activeSegment.release();
diff --git a/src/java/org/apache/cassandra/journal/SegmentWriter.java 
b/src/java/org/apache/cassandra/journal/SegmentWriter.java
index 852e955b21..b8436aed66 100644
--- a/src/java/org/apache/cassandra/journal/SegmentWriter.java
+++ b/src/java/org/apache/cassandra/journal/SegmentWriter.java
@@ -101,9 +101,9 @@ final class SegmentWriter<K> implements Closeable
             throw new JournalWriteError(descriptor, file, e);
         }
 
-        try (SyncedOffsets syncedOffsets = SyncedOffsets.active(descriptor, 
true))
+        try (SyncedOffsets syncedOffsets = SyncedOffsets.active(descriptor))
         {
-            syncedOffsets.mark(position());
+            syncedOffsets.mark(position(), true);
         }
 
         index.persist(descriptor);
diff --git a/src/java/org/apache/cassandra/journal/Segments.java 
b/src/java/org/apache/cassandra/journal/Segments.java
index 0693997ef3..ca5ca47b2b 100644
--- a/src/java/org/apache/cassandra/journal/Segments.java
+++ b/src/java/org/apache/cassandra/journal/Segments.java
@@ -98,6 +98,21 @@ class Segments<K, V>
                 into.add(segment.asActive());
     }
 
+    ActiveSegment<K, V> oldestActive()
+    {
+        Segment<K, V> oldest = null;
+        for (Segment<K, V> segment : segments.values())
+            if (segment.isActive() && (oldest == null || 
segment.descriptor.timestamp <= oldest.descriptor.timestamp))
+                oldest = segment;
+
+        return oldest == null ? null : oldest.asActive();
+    }
+
+    Segment<K, V> get(long timestamp)
+    {
+        return segments.get(timestamp);
+    }
+
     void selectStatic(Collection<StaticSegment<K, V>> into)
     {
         for (Segment<K, V> segment : segments.values())
diff --git a/src/java/org/apache/cassandra/journal/SyncedOffsets.java 
b/src/java/org/apache/cassandra/journal/SyncedOffsets.java
index bee302d6d8..cd05e6f8ac 100644
--- a/src/java/org/apache/cassandra/journal/SyncedOffsets.java
+++ b/src/java/org/apache/cassandra/journal/SyncedOffsets.java
@@ -50,7 +50,9 @@ interface SyncedOffsets extends Closeable
      *
      * @param offset the offset into datafile, up to which contents have been 
fsynced (exclusive)
      */
-    void mark(int offset);
+    void mark(int offset, boolean fsync);
+
+    void fsync();
 
     @Override
     default void close()
@@ -60,9 +62,9 @@ interface SyncedOffsets extends Closeable
     /**
      * @return a disk-backed synced offset tracker for a new {@link 
ActiveSegment}
      */
-    static Active active(Descriptor descriptor, boolean syncOnMark)
+    static Active active(Descriptor descriptor)
     {
-        return new Active(descriptor, syncOnMark);
+        return new Active(descriptor);
     }
 
     /**
@@ -87,15 +89,13 @@ interface SyncedOffsets extends Closeable
     final class Active implements SyncedOffsets
     {
         private final Descriptor descriptor;
-        private final boolean syncOnMark;
 
         private final FileOutputStreamPlus output;
         private volatile int syncedOffset;
 
-        private Active(Descriptor descriptor, boolean syncOnMark)
+        private Active(Descriptor descriptor)
         {
             this.descriptor = descriptor;
-            this.syncOnMark = syncOnMark;
 
             File file = descriptor.fileFor(Component.SYNCED_OFFSETS);
             if (file.exists())
@@ -123,7 +123,7 @@ interface SyncedOffsets extends Closeable
         }
 
         @Override
-        public void mark(int offset)
+        public void mark(int offset, boolean fsync)
         {
             if (offset < syncedOffset)
                 throw new IllegalArgumentException("offset " + offset + " is 
smaller than previous mark " + offset);
@@ -142,10 +142,10 @@ interface SyncedOffsets extends Closeable
             }
 
             syncedOffset = offset;
-            if (syncOnMark) sync();
+            if (fsync) fsync();
         }
 
-        private void sync()
+        public void fsync()
         {
             try
             {
@@ -160,7 +160,7 @@ interface SyncedOffsets extends Closeable
         @Override
         public void close()
         {
-            if (!syncOnMark) sync();
+            fsync();
 
             try
             {
@@ -218,7 +218,13 @@ interface SyncedOffsets extends Closeable
         }
 
         @Override
-        public void mark(int offset)
+        public void mark(int offset, boolean fsync)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void fsync()
         {
             throw new UnsupportedOperationException();
         }
@@ -235,7 +241,13 @@ interface SyncedOffsets extends Closeable
         }
 
         @Override
-        public void mark(int offset)
+        public void mark(int offset, boolean fsync)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void fsync()
         {
             throw new UnsupportedOperationException();
         }
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index c846038fd8..0f33f04d92 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -342,6 +342,7 @@ public class AccordCommandStore extends CommandStore 
implements CacheSize
     Runnable saveCommand(Command before, Command after)
     {
         Mutation mutation = AccordKeyspace.getCommandMutation(id, before, 
after, nextSystemTimestampMicros());
+        // TODO (required): make sure we test recovering when this has failed 
to be persisted
         return null != mutation ? mutation::applyUnsafe : null;
     }
 
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index 80cfdf31ea..f9daf2354d 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -297,6 +297,10 @@ public class AccordJournal implements IJournal, 
Shutdownable
     // TODO (alexp): tests for objects that go through AccordJournal
     private class JournalCallbacks implements AsyncCallbacks<Key, Object>
     {
+        private JournalCallbacks()
+        {
+        }
+
         /**
          * Queue up the record for either frame aggregation (if a protocol 
message) or frame application (if a frame).
          */
@@ -352,7 +356,7 @@ public class AccordJournal implements IJournal, Shutdownable
 
         private void onFrameWriteFailed(FrameRecord frame, FrameContext 
context, Throwable cause)
         {
-            // TODO: panic
+            // TODO (required): panic
         }
 
         @Override
@@ -364,7 +368,7 @@ public class AccordJournal implements IJournal, Shutdownable
         @Override
         public void onFlushFailed(Throwable cause)
         {
-            // TODO: panic
+            // TODO (required): panic
         }
     }
 
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java 
b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index 2dd2b5d28d..b96ffa9bd0 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -97,9 +97,8 @@ import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.ReadExecutionController;
 import org.apache.cassandra.db.RegularAndStaticColumns;
 import org.apache.cassandra.db.SinglePartitionReadCommand;
-import org.apache.cassandra.db.Slices;
 import org.apache.cassandra.db.filter.ClusteringIndexFilter;
-import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
+import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.filter.RowFilter;
@@ -161,6 +160,7 @@ import 
org.apache.cassandra.service.accord.serializers.WaitingOnSerializer;
 import org.apache.cassandra.utils.Clock.Global;
 import org.apache.cassandra.utils.Throwables;
 import org.apache.cassandra.utils.btree.BTree;
+import org.apache.cassandra.utils.btree.BTreeSet;
 import org.apache.cassandra.utils.bytecomparable.ByteComparable;
 
 import static accord.utils.Invariants.checkArgument;
@@ -201,7 +201,7 @@ public class AccordKeyspace
     private static final LocalPartitioner FOR_KEYS_LOCAL_PARTITIONER =
         new LocalPartitioner(CompositeType.getInstance(Int32Type.instance, 
BytesType.instance, KEY_TYPE));
 
-    private static final ClusteringIndexFilter FULL_PARTITION = new 
ClusteringIndexSliceFilter(Slices.ALL, false);
+    private static final ClusteringIndexFilter FULL_PARTITION = new 
ClusteringIndexNamesFilter(BTreeSet.of(new ClusteringComparator(), 
Clustering.EMPTY), false);
 
     //TODO (now, performance): should this be partitioner rather than TableId? 
 As of this patch distributed tables should only have 1 partitioner...
     private static final ConcurrentMap<TableId, 
AccordRoutingKeyByteSource.Serializer> TABLE_SERIALIZERS = new 
ConcurrentHashMap<>();
@@ -571,6 +571,8 @@ public class AccordKeyspace
               + "data blob, "
               + "PRIMARY KEY((store_id, key_token, key))"
               + ')')
+               // TODO (expected): make this uncompressed, as not very 
compressable (except perhaps the primary key, but could switch to operating on 
tokens directly)
+//               + " WITH compression = {'enabled':'false'};")
         .partitioner(FOR_KEYS_LOCAL_PARTITIONER)
         .build();
     }
@@ -992,12 +994,12 @@ public class AccordKeyspace
 
     public static UntypedResultSet loadCommandRow(CommandStore commandStore, 
TxnId txnId)
     {
-        String cql = "SELECT * FROM %s.%s " +
+        String cql = "SELECT * FROM " + ACCORD_KEYSPACE_NAME + '.' + COMMANDS 
+ ' ' +
                      "WHERE store_id = ? " +
                      "AND domain = ? " +
                      "AND txn_id=(?, ?, ?)";
 
-        return executeInternal(format(cql, ACCORD_KEYSPACE_NAME, COMMANDS),
+        return executeInternal(cql,
                                commandStore.id(),
                                txnId.domain().ordinal(),
                                txnId.msb, txnId.lsb, txnId.node.id);
@@ -1404,12 +1406,12 @@ public class AccordKeyspace
 
     public static UntypedResultSet loadTimestampsForKeyRow(CommandStore 
commandStore, PartitionKey key)
     {
-        String cql = "SELECT * FROM %s.%s " +
+        String cql = "SELECT * FROM " + ACCORD_KEYSPACE_NAME + '.' + 
TIMESTAMPS_FOR_KEY + ' ' +
                      "WHERE store_id = ? " +
                      "AND key_token = ? " +
                      "AND key=(?, ?)";
 
-        return executeInternal(format(cql, ACCORD_KEYSPACE_NAME, 
TIMESTAMPS_FOR_KEY),
+        return executeInternal(cql,
                                commandStore.id(),
                                serializeToken(key.token()),
                                key.table().asUUID(), 
key.partitionKey().getKey());
@@ -1624,9 +1626,9 @@ public class AccordKeyspace
 
     private static EpochDiskState saveEpochDiskState(EpochDiskState diskState)
     {
-        String cql = "INSERT INTO %s.%s (key, min_epoch, max_epoch) VALUES (0, 
?, ?);";
-        executeInternal(format(cql, ACCORD_KEYSPACE_NAME, EPOCH_METADATA),
-                        diskState.minEpoch, diskState.maxEpoch);
+        String cql = "INSERT INTO " + ACCORD_KEYSPACE_NAME + '.' + 
EPOCH_METADATA + ' ' +
+                     "(key, min_epoch, max_epoch) VALUES (0, ?, ?);";
+        executeInternal(cql, diskState.minEpoch, diskState.maxEpoch);
         return diskState;
     }
 
@@ -1634,7 +1636,8 @@ public class AccordKeyspace
     @VisibleForTesting
     public static EpochDiskState loadEpochDiskState()
     {
-        String cql = "SELECT * FROM %s.%s WHERE key=0";
+        String cql = "SELECT * FROM " + ACCORD_KEYSPACE_NAME + '.' + 
EPOCH_METADATA + ' ' +
+                     "WHERE key=0";
         UntypedResultSet result = executeInternal(format(cql, 
ACCORD_KEYSPACE_NAME, EPOCH_METADATA));
         if (result.isEmpty())
             return null;
@@ -1668,8 +1671,9 @@ public class AccordKeyspace
 
         try
         {
-            String cql = "UPDATE %s.%s SET topology=? WHERE epoch=?";
-            executeInternal(format(cql, ACCORD_KEYSPACE_NAME, TOPOLOGIES),
+            String cql = "UPDATE " + ACCORD_KEYSPACE_NAME + '.' + TOPOLOGIES + 
' ' +
+                         "SET topology=? WHERE epoch=?";
+            executeInternal(cql,
                             serialize(topology, 
LocalVersionedSerializers.topology), topology.epoch());
             flush(Topologies);
         }
@@ -1684,8 +1688,9 @@ public class AccordKeyspace
     public static EpochDiskState markRemoteTopologySync(Node.Id node, long 
epoch, EpochDiskState diskState)
     {
         diskState = maybeUpdateMaxEpoch(diskState, epoch);
-        String cql = "UPDATE %s.%s SET remote_sync_complete = 
remote_sync_complete + ? WHERE epoch = ?";
-        executeInternal(format(cql, ACCORD_KEYSPACE_NAME, TOPOLOGIES),
+        String cql = "UPDATE " + ACCORD_KEYSPACE_NAME + '.' + TOPOLOGIES + ' ' 
+
+                     "SET remote_sync_complete = remote_sync_complete + ? 
WHERE epoch = ?";
+        executeInternal(cql,
                         Collections.singleton(node.id), epoch);
         flush(Topologies);
         return diskState;
@@ -1694,8 +1699,9 @@ public class AccordKeyspace
     public static EpochDiskState markClosed(Ranges ranges, long epoch, 
EpochDiskState diskState)
     {
         diskState = maybeUpdateMaxEpoch(diskState, epoch);
-        String cql = "UPDATE %s.%s SET closed = closed + ? WHERE epoch = ?";
-        executeInternal(String.format(cql, ACCORD_KEYSPACE_NAME, TOPOLOGIES),
+        String cql = "UPDATE " + ACCORD_KEYSPACE_NAME + '.' + TOPOLOGIES + ' ' 
+
+                     "SET closed = closed + ? WHERE epoch = ?";
+        executeInternal(cql,
                         KeySerializers.rangesToBlobMap(ranges), epoch);
         flush(Topologies);
         return diskState;
@@ -1704,8 +1710,9 @@ public class AccordKeyspace
     public static EpochDiskState markRedundant(Ranges ranges, long epoch, 
EpochDiskState diskState)
     {
         diskState = maybeUpdateMaxEpoch(diskState, epoch);
-        String cql = "UPDATE %s.%s SET redundant = redundant + ? WHERE epoch = 
?";
-        executeInternal(String.format(cql, ACCORD_KEYSPACE_NAME, TOPOLOGIES),
+        String cql = "UPDATE " + ACCORD_KEYSPACE_NAME + '.' + TOPOLOGIES + ' ' 
+
+                     "SET redundant = redundant + ? WHERE epoch = ?";
+        executeInternal(cql,
                         KeySerializers.rangesToBlobMap(ranges), epoch);
         flush(Topologies);
         return diskState;
@@ -1714,8 +1721,9 @@ public class AccordKeyspace
     public static EpochDiskState setNotifyingLocalSync(long epoch, 
Set<Node.Id> pending, EpochDiskState diskState)
     {
         diskState = maybeUpdateMaxEpoch(diskState, epoch);
-        String cql = "UPDATE %s.%s SET sync_state = ?, pending_sync_notify = ? 
WHERE epoch = ?";
-        executeInternal(format(cql, ACCORD_KEYSPACE_NAME, TOPOLOGIES),
+        String cql = "UPDATE " + ACCORD_KEYSPACE_NAME + '.' + TOPOLOGIES + ' ' 
+
+                     "SET sync_state = ?, pending_sync_notify = ? WHERE epoch 
= ?";
+        executeInternal(cql,
                         SyncStatus.NOTIFYING.ordinal(),
                         pending.stream().map(i -> 
i.id).collect(Collectors.toSet()),
                         epoch);
@@ -1725,8 +1733,9 @@ public class AccordKeyspace
     public static EpochDiskState markLocalSyncAck(Node.Id node, long epoch, 
EpochDiskState diskState)
     {
         diskState = maybeUpdateMaxEpoch(diskState, epoch);
-        String cql = "UPDATE %s.%s SET pending_sync_notify = 
pending_sync_notify - ? WHERE epoch = ?";
-        executeInternal(format(cql, ACCORD_KEYSPACE_NAME, TOPOLOGIES),
+        String cql = "UPDATE " + ACCORD_KEYSPACE_NAME + '.' + TOPOLOGIES + ' ' 
+
+                     "SET pending_sync_notify = pending_sync_notify - ? WHERE 
epoch = ?";
+        executeInternal(cql,
                         Collections.singleton(node.id), epoch);
         return diskState;
     }
@@ -1734,8 +1743,9 @@ public class AccordKeyspace
     public static EpochDiskState setCompletedLocalSync(long epoch, 
EpochDiskState diskState)
     {
         diskState = maybeUpdateMaxEpoch(diskState, epoch);
-        String cql = "UPDATE %s.%s SET sync_state = ?, pending_sync_notify = 
{} WHERE epoch = ?";
-        executeInternal(format(cql, ACCORD_KEYSPACE_NAME, TOPOLOGIES),
+        String cql = "UPDATE " + ACCORD_KEYSPACE_NAME + '.' + TOPOLOGIES + ' ' 
+
+                     "SET sync_state = ?, pending_sync_notify = {} WHERE epoch 
= ?";
+        executeInternal(cql,
                         SyncStatus.COMPLETED.ordinal(),
                         epoch);
         return diskState;
@@ -1748,8 +1758,9 @@ public class AccordKeyspace
             long delete = diskState.minEpoch;
             diskState = diskState.withNewMinEpoch(delete + 1);
             saveEpochDiskState(diskState);
-            String cql = "DELETE FROM %s.%s WHERE epoch = ?";
-            executeInternal(format(cql, ACCORD_KEYSPACE_NAME, TOPOLOGIES), 
delete);
+            String cql = "DELETE FROM " + ACCORD_KEYSPACE_NAME + '.' + 
TOPOLOGIES + ' ' +
+                         "WHERE epoch = ?";
+            executeInternal(cql, delete);
         }
         return diskState;
     }
@@ -1762,7 +1773,8 @@ public class AccordKeyspace
     @VisibleForTesting
     public static void loadEpoch(long epoch, TopologyLoadConsumer consumer) 
throws IOException
     {
-        String cql = format("SELECT * FROM %s.%s WHERE epoch=?", 
ACCORD_KEYSPACE_NAME, TOPOLOGIES);
+        String cql = "SELECT * FROM " + ACCORD_KEYSPACE_NAME + '.' + 
TOPOLOGIES + ' ' +
+                     "WHERE epoch=?";
 
         UntypedResultSet result = executeInternal(cql, epoch);
         checkState(!result.isEmpty(), "Nothing found for epoch %d", epoch);
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java 
b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
index 7346a6eebf..160813d722 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
@@ -347,7 +347,7 @@ public class AccordObjectSizes
             return size;
 
         Command.Committed committed = command.asCommitted();
-        size += WaitingOnSerializer.serializedSize(committed.waitingOn);
+        size += WaitingOnSerializer.serializedSize(command.txnId(), 
committed.waitingOn);
 
         return size;
     }
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index d3954a208a..36f001162d 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -296,7 +296,7 @@ public class AccordService implements IAccordService, 
Shutdownable
 
     public static long uniqueNow()
     {
-        // TODO (correctness, now): This is not unique it's just 
currentTimeMillis as microseconds
+        // TODO (now, correctness): This is not unique it's just 
currentTimeMillis as microseconds
         return 
TimeUnit.MILLISECONDS.toMicros(Clock.Global.currentTimeMillis());
     }
 
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java
index 070fcfa0e6..e506bbf85c 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java
@@ -223,8 +223,7 @@ public class CheckStatusSerializers
                     Writes writes = 
CommandSerializers.nullableWrites.deserialize(in, version);
 
                     Result result = null;
-                    if (maxKnowledgeStatus == SaveStatus.PreApplied || 
maxKnowledgeStatus == SaveStatus.Applied
-                        || maxKnowledgeStatus == SaveStatus.TruncatedApply || 
maxKnowledgeStatus == SaveStatus.TruncatedApplyWithOutcome || 
maxKnowledgeStatus == SaveStatus.TruncatedApplyWithDeps)
+                    if (maxKnowledgeStatus.known.outcome.isOrWasApply())
                         result = CommandSerializers.APPLIED;
 
                     return createOk(map, maxKnowledgeStatus, maxStatus, 
maxPromised, maxAcceptedOrCommitted, acceptedOrCommitted, executeAt,
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
index fbc3aeb22f..fe16d3033e 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.service.accord.serializers;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import com.google.common.base.Preconditions;
 
@@ -99,6 +100,13 @@ public class CommandSerializers
             TopologySerializers.nodeId.serialize(ts.node, out, version);
         }
 
+        public void serialize(T ts, DataOutputPlus out) throws IOException
+        {
+            out.writeLong(ts.msb);
+            out.writeLong(ts.lsb);
+            TopologySerializers.NodeIdSerializer.serialize(ts.node, out);
+        }
+
         public <V> int serialize(T ts, V dst, ValueAccessor<V> accessor, int 
offset)
         {
             int position = offset;
@@ -110,6 +118,13 @@ public class CommandSerializers
             return size;
         }
 
+        public void serialize(T ts, ByteBuffer out)
+        {
+            out.putLong(ts.msb);
+            out.putLong(ts.lsb);
+            TopologySerializers.nodeId.serialize(ts.node, out);
+        }
+
         @Override
         public T deserialize(DataInputPlus in, int version) throws IOException
         {
@@ -118,6 +133,13 @@ public class CommandSerializers
                                   TopologySerializers.nodeId.deserialize(in, 
version));
         }
 
+        public T deserialize(DataInputPlus in) throws IOException
+        {
+            return factory.create(in.readLong(),
+                                  in.readLong(),
+                                  
TopologySerializers.NodeIdSerializer.deserialize(in));
+        }
+
         public <V> T deserialize(V src, ValueAccessor<V> accessor, int offset)
         {
             long msb = accessor.getLong(src, offset);
@@ -128,6 +150,16 @@ public class CommandSerializers
             return factory.create(msb, lsb, node);
         }
 
+        public T deserialize(ByteBuffer buffer, int position)
+        {
+            long msb = buffer.getLong(position);
+            position += TypeSizes.LONG_SIZE;
+            long lsb = buffer.getLong(position);
+            position += TypeSizes.LONG_SIZE;
+            Node.Id node = TopologySerializers.nodeId.deserialize(buffer, 
position);
+            return factory.create(msb, lsb, node);
+        }
+
         @Override
         public long serializedSize(T ts, int version)
         {
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/FetchSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/FetchSerializers.java
index 370f88b73d..4512776154 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/FetchSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/FetchSerializers.java
@@ -178,17 +178,8 @@ public class FetchSerializers
             Writes writes = CommandSerializers.nullableWrites.deserialize(in, 
version);
 
             Result result = null;
-            switch (maxSaveStatus)
-            {
-                case PreApplied:
-                case Applying:
-                case Applied:
-                case TruncatedApply:
-                case TruncatedApplyWithOutcome:
-                case TruncatedApplyWithDeps:
-                    result = CommandSerializers.APPLIED;
-                    break;
-            }
+            if (achieved.outcome.isOrWasApply())
+                result = CommandSerializers.APPLIED;
 
             return Propagate.SerializerSupport.create(txnId,
                                                       route,
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java
index c5c2f9a382..4693c03c5c 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.service.accord.serializers;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Set;
 
@@ -46,7 +47,7 @@ public class TopologySerializers
     {
         private NodeIdSerializer() {}
 
-        private static void serialize(Node.Id id, DataOutputPlus out) throws 
IOException
+        public static void serialize(Node.Id id, DataOutputPlus out) throws 
IOException
         {
             out.writeInt(id.id);
         }
@@ -68,7 +69,12 @@ public class TopologySerializers
             return accessor.putInt(dst, offset, id.id);
         }
 
-        private static Node.Id deserialize(DataInputPlus in) throws IOException
+        public void serialize(Node.Id id, ByteBuffer out)
+        {
+            out.putInt(id.id);
+        }
+
+        public static Node.Id deserialize(DataInputPlus in) throws IOException
         {
             return new Node.Id(in.readInt());
         }
@@ -90,6 +96,11 @@ public class TopologySerializers
             return new Node.Id(accessor.getInt(src, offset));
         }
 
+        public <V> Node.Id deserialize(ByteBuffer src, int position)
+        {
+            return new Node.Id(src.getInt(position));
+        }
+
         public int serializedSize()
         {
             return TypeSizes.INT_SIZE;  // id.id
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/WaitingOnSerializer.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/WaitingOnSerializer.java
index 3efb9e2c6c..6c22d28440 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/WaitingOnSerializer.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/WaitingOnSerializer.java
@@ -21,9 +21,11 @@ package org.apache.cassandra.service.accord.serializers;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import accord.local.Command;
 import accord.local.Command.WaitingOn;
 import accord.primitives.Keys;
 import accord.primitives.Routable;
+import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.utils.ImmutableBitSet;
 import accord.utils.Invariants;
@@ -32,14 +34,18 @@ import accord.utils.SortedArrays.SortedArrayList;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.utils.vint.VIntCoding;
 
 public class WaitingOnSerializer
 {
     public static void serialize(TxnId txnId, WaitingOn waitingOn, 
DataOutputPlus out) throws IOException
     {
-        out.writeUnsignedVInt32(waitingOn.keys.size());
-        out.writeUnsignedVInt32(waitingOn.txnIds.size());
+        if (txnId.kind().awaitsOnlyDeps())
+        {
+            Timestamp executeAtLeast = waitingOn.executeAtLeast();
+            out.writeBoolean(executeAtLeast != null);
+            if (executeAtLeast != null)
+                CommandSerializers.timestamp.serialize(executeAtLeast, out);
+        }
         int keyCount = waitingOn.keys.size();
         int txnIdCount = waitingOn.txnIds.size();
         int waitingOnLength = (txnIdCount + keyCount + 63) / 64;
@@ -53,8 +59,12 @@ public class WaitingOnSerializer
 
     public static WaitingOn deserialize(TxnId txnId, Keys keys, 
SortedArrayList<TxnId> txnIds, DataInputPlus in) throws IOException
     {
-        int a = in.readUnsignedVInt32();
-        int b = in.readUnsignedVInt32();
+        Timestamp executeAtLeast = null;
+        if (txnId.kind().awaitsOnlyDeps())
+        {
+            if (in.readBoolean())
+                executeAtLeast = CommandSerializers.timestamp.deserialize(in);
+        }
         int waitingOnLength = (txnIds.size() + keys.size() + 63) / 64;
         ImmutableBitSet waitingOn = deserialize(waitingOnLength, in);
         ImmutableBitSet appliedOrInvalidated = null;
@@ -63,17 +73,26 @@ public class WaitingOnSerializer
             int appliedOrInvalidatedLength = (txnIds.size() + 63) / 64;
             appliedOrInvalidated = deserialize(appliedOrInvalidatedLength, in);
         }
-        return new WaitingOn(keys, txnIds, waitingOn, appliedOrInvalidated);
+
+        WaitingOn result = new WaitingOn(keys, txnIds, waitingOn, 
appliedOrInvalidated);
+        if (executeAtLeast != null)
+            result = new Command.WaitingOnWithExecuteAt(result, 
executeAtLeast);
+        return result;
     }
 
-    public static long serializedSize(WaitingOn waitingOn)
+    public static long serializedSize(TxnId txnId, WaitingOn waitingOn)
     {
         int keyCount = waitingOn.keys.size();
         int txnIdCount = waitingOn.txnIds.size();
         int waitingOnLength = (txnIdCount + keyCount + 63) / 64;
         long size = serializedSize(waitingOnLength, waitingOn.waitingOn);
-        size += TypeSizes.sizeofUnsignedVInt(keyCount);
-        size += TypeSizes.sizeofUnsignedVInt(txnIdCount);
+        if (txnId.kind().awaitsOnlyDeps())
+        {
+            Timestamp executeAtLeast = waitingOn.executeAtLeast();
+            size += 1;
+            if (executeAtLeast != null)
+                size += CommandSerializers.timestamp.serializedSize();
+        }
         if (waitingOn.appliedOrInvalidated == null)
             return size;
 
@@ -113,10 +132,24 @@ public class WaitingOnSerializer
         if (txnId.domain() == Routable.Domain.Range)
             appliedOrInvalidatedLength = (txnIdCount + 63) / 64;
 
-        ByteBuffer out = 
ByteBuffer.allocate(TypeSizes.sizeofUnsignedVInt(keyCount) + 
TypeSizes.sizeofUnsignedVInt(txnIdCount)
-                                             + TypeSizes.LONG_SIZE * 
(waitingOnLength + appliedOrInvalidatedLength));
-        VIntCoding.writeUnsignedVInt32(keyCount, out);
-        VIntCoding.writeUnsignedVInt32(txnIdCount, out);
+        int size = TypeSizes.LONG_SIZE * (waitingOnLength + 
appliedOrInvalidatedLength);
+        Timestamp executeAtLeast = null;
+        if (txnId.kind().awaitsOnlyDeps())
+        {
+            executeAtLeast = waitingOn.executeAtLeast();
+            size += 1;
+            if (executeAtLeast != null)
+                size += CommandSerializers.timestamp.serializedSize();
+        }
+
+        ByteBuffer out = ByteBuffer.allocate(size);
+        if (txnId.kind().awaitsOnlyDeps())
+        {
+            out.put((byte)(executeAtLeast != null ? 1 : 0));
+            if (executeAtLeast != null)
+                CommandSerializers.timestamp.serialize(executeAtLeast, out);
+        }
+
         serialize(waitingOnLength, waitingOn.waitingOn, out);
         if (appliedOrInvalidatedLength > 0)
             serialize(appliedOrInvalidatedLength, 
waitingOn.appliedOrInvalidated, out);
@@ -133,12 +166,18 @@ public class WaitingOnSerializer
 
     public static WaitingOn deserialize(TxnId txnId, Keys keys, 
SortedArrayList<TxnId> txnIds, ByteBuffer in) throws IOException
     {
-        int waitingOnLength = (txnIds.size() + keys.size() + 63) / 64;
         int position = in.position();
-        int a = VIntCoding.readUnsignedVInt32(in, position);
-        position += TypeSizes.sizeofUnsignedVInt(a);
-        int b = VIntCoding.readUnsignedVInt32(in, position);
-        position += TypeSizes.sizeofUnsignedVInt(a);
+        Timestamp executeAtLeast = null;
+        if (txnId.kind().awaitsOnlyDeps())
+        {
+            if (in.get(position++) != 0)
+            {
+                executeAtLeast = CommandSerializers.timestamp.deserialize(in, 
position);
+                position += CommandSerializers.timestamp.serializedSize();
+            }
+        }
+
+        int waitingOnLength = (txnIds.size() + keys.size() + 63) / 64;
         ImmutableBitSet waitingOn = deserialize(position, waitingOnLength, in);
         ImmutableBitSet appliedOrInvalidated = null;
         if (txnId.domain() == Routable.Domain.Range)
@@ -147,7 +186,11 @@ public class WaitingOnSerializer
             int appliedOrInvalidatedLength = (txnIds.size() + 63) / 64;
             appliedOrInvalidated = deserialize(position, 
appliedOrInvalidatedLength, in);
         }
-        return new WaitingOn(keys, txnIds, waitingOn, appliedOrInvalidated);
+
+        WaitingOn result = new WaitingOn(keys, txnIds, waitingOn, 
appliedOrInvalidated);
+        if (executeAtLeast != null)
+            result = new Command.WaitingOnWithExecuteAt(result, 
executeAtLeast);
+        return result;
     }
 
     private static ImmutableBitSet deserialize(int position, int length, 
ByteBuffer in)
diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java 
b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
index 7965decddb..009930955b 100644
--- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
@@ -1000,7 +1000,7 @@ public class ByteBufferUtil
 
     public static void writeLeastSignificantBytes(long register, int bytes, 
ByteBuffer out)
     {
-        writeMostSignificantBytesSlow(register << ((8 - bytes)*8), bytes, out);
+        writeMostSignificantBytes(register << ((8 - bytes)*8), bytes, out);
     }
 
     public static void writeMostSignificantBytes(long register, int bytes, 
ByteBuffer out)
diff --git a/test/conf/logback-simulator.xml b/test/conf/logback-simulator.xml
index a4c24aab8d..fe823383ee 100644
--- a/test/conf/logback-simulator.xml
+++ b/test/conf/logback-simulator.xml
@@ -16,7 +16,7 @@
   ~ limitations under the License.
   -->
 
-<configuration debug="false" scan="true" scanPeriod="60 seconds">
+<configuration debug="true" scan="true" scanPeriod="60 seconds">
   <define name="run_start" 
class="org.apache.cassandra.simulator.logging.RunStartDefiner" scope="system"/>
   <define name="run_seed" 
class="org.apache.cassandra.simulator.logging.SeedDefiner" scope="system"/>
   <define name="cluster_id" 
class="org.apache.cassandra.distributed.impl.ClusterIDDefiner"/>
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java 
b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
index ce0e95db03..fdc276de0b 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
@@ -36,6 +36,7 @@ import com.google.common.collect.ImmutableSet;
 import org.junit.After;
 import org.junit.BeforeClass;
 
+import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.cql3.Duration;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.BooleanType;
@@ -62,6 +63,8 @@ import 
org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.shared.DistributedTestBase;
 import org.apache.cassandra.service.accord.AccordStateCache;
 
+import static java.lang.System.currentTimeMillis;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.JOIN_RING;
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.RESET_BOOTSTRAP_PROGRESS;
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.SKIP_GC_INSPECTOR;
@@ -81,6 +84,7 @@ public class TestBaseImpl extends DistributedTestBase
     @BeforeClass
     public static void beforeClass() throws Throwable
     {
+        
CassandraRelevantProperties.SIMULATOR_STARTED.setString(Long.toString(MILLISECONDS.toSeconds(currentTimeMillis())));
         ICluster.setup();
         SKIP_GC_INSPECTOR.setBoolean(true);
         AccordStateCache.validateLoadOnEvict(true);
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
index 42e7fbf34a..8e663ab966 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
@@ -19,9 +19,15 @@
 package org.apache.cassandra.distributed.test.accord;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.Date;
+import java.util.List;
+import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.util.concurrent.RateLimiter;
 import org.junit.BeforeClass;
@@ -30,10 +36,16 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.IMessage;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.utils.EstimatedHistogram;
 
+import static java.lang.System.currentTimeMillis;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 public class AccordLoadTest extends AccordTestBase
@@ -43,20 +55,35 @@ public class AccordLoadTest extends AccordTestBase
     @BeforeClass
     public static void setUp() throws IOException
     {
-        AccordTestBase.setupCluster(builder -> builder.withConfig(config -> 
config.set("lwt_strategy", "accord").set("non_serial_write_strategy", 
"accord")), 2);
+        
CassandraRelevantProperties.SIMULATOR_STARTED.setString(Long.toString(MILLISECONDS.toSeconds(currentTimeMillis())));
+        AccordTestBase.setupCluster(builder -> builder, 2);
     }
 
     @Ignore
     @Test
     public void testLoad() throws Exception
     {
-        test("CREATE TABLE " + qualifiedTableName + " (k int, v int, PRIMARY 
KEY(k))",
+        test("CREATE TABLE " + qualifiedTableName + " (k int, v int, PRIMARY 
KEY(k)) WITH transactional_mode = 'full'",
              cluster -> {
+
+                final ConcurrentHashMap<Verb, AtomicInteger> verbs = new 
ConcurrentHashMap<>();
+                cluster.filters().outbound().messagesMatching(new 
IMessageFilters.Matcher()
+                {
+                    @Override
+                    public boolean matches(int i, int i1, IMessage iMessage)
+                    {
+                        verbs.computeIfAbsent(Verb.fromId(iMessage.verb()), 
ignore -> new AtomicInteger()).incrementAndGet();
+                        return false;
+                    }
+                }).drop();
                  ICoordinator coordinator = cluster.coordinator(1);
+                 final int repairInterval = 3000;
                  final int batchSize = 1000;
                  final int concurrency = 100;
                  final int ratePerSecond = 1000;
                  final int keyCount = 10;
+                 final float readChance = 0.33f;
+                 long nextRepairAt = repairInterval;
                  for (int i = 1; i <= keyCount; i++)
                      coordinator.execute("INSERT INTO " + qualifiedTableName + 
" (k, v) VALUES (0, 0) USING TIMESTAMP 0;", ConsistencyLevel.ALL, i);
 
@@ -75,14 +102,66 @@ public class AccordLoadTest extends AccordTestBase
                          inFlight.acquire();
                          rateLimiter.acquire();
                          long commandStart = System.nanoTime();
-                         coordinator.executeWithResult((success, fail) -> {
-                             inFlight.release();
-                             if (fail == null) 
histogram.add(NANOSECONDS.toMicros(System.nanoTime() - commandStart));
-//                             else exceptions.add(fail);
-                         }, "UPDATE " + qualifiedTableName + " SET v += 1 
WHERE k = ? IF EXISTS;", ConsistencyLevel.SERIAL, ConsistencyLevel.QUORUM, 
random.nextInt(keyCount));
+                         if (random.nextFloat() < readChance)
+                         {
+                             coordinator.executeWithResult((success, fail) -> {
+                                 inFlight.release();
+                                 if (fail == null) 
histogram.add(NANOSECONDS.toMicros(System.nanoTime() - commandStart));
+                                 //                             else 
exceptions.add(fail);
+                             }, "SELECT * FROM " + qualifiedTableName + " 
WHERE k = ?;", ConsistencyLevel.SERIAL, random.nextInt(keyCount));
+                         }
+                         else
+                         {
+                             coordinator.executeWithResult((success, fail) -> {
+                                 inFlight.release();
+                                 if (fail == null) 
histogram.add(NANOSECONDS.toMicros(System.nanoTime() - commandStart));
+    //                             else exceptions.add(fail);
+                             }, "UPDATE " + qualifiedTableName + " SET v += 1 
WHERE k = ? IF EXISTS;", ConsistencyLevel.SERIAL, ConsistencyLevel.QUORUM, 
random.nextInt(keyCount));
+                         }
+                     }
+
+                     if ((nextRepairAt -= batchSize) <= 0)
+                     {
+                         nextRepairAt += repairInterval;
+                         System.out.println("repairing...");
+                         cluster.coordinator(1).instance().nodetool("repair", 
qualifiedTableName);
+                     }
+
+                     final Date date = new Date();
+                     System.out.printf("%tT rate: %.2f/s\n", date, 
(((float)batchSize * 1000) / NANOSECONDS.toMillis(System.nanoTime() - 
batchStart)));
+                     System.out.printf("%tT percentiles: %d %d %d %d\n", date, 
histogram.percentile(.25)/1000, histogram.percentile(.5)/1000, 
histogram.percentile(.75)/1000, histogram.percentile(1)/1000);
+
+                     class VerbCount
+                     {
+                         final Verb verb;
+                         final int count;
+
+                         VerbCount(Verb verb, int count)
+                         {
+                             this.verb = verb;
+                             this.count = count;
+                         }
+                     }
+                     List<VerbCount> verbCounts = new ArrayList<>();
+                     for (Map.Entry<Verb, AtomicInteger> e : verbs.entrySet())
+                     {
+                         int count = e.getValue().getAndSet(0);
+                         if (count != 0) verbCounts.add(new 
VerbCount(e.getKey(), count));
+                     }
+                     verbCounts.sort(Comparator.comparing(v -> -v.count));
+
+                     StringBuilder verbSummary = new StringBuilder();
+                     for (VerbCount vs : verbCounts)
+                     {
+                         {
+                             if (verbSummary.length() > 0)
+                                 verbSummary.append(", ");
+                             verbSummary.append(vs.verb);
+                             verbSummary.append(": ");
+                             verbSummary.append(vs.count);
+                         }
                      }
-                     System.out.printf("%tT rate: %.2f/s\n", new Date(), 
(((float)batchSize * 1000) / NANOSECONDS.toMillis(System.nanoTime() - 
batchStart)));
-                     System.out.printf("%tT percentiles: %d %d %d %d\n", new 
Date(), histogram.percentile(.25)/1000, histogram.percentile(.5)/1000, 
histogram.percentile(.75)/1000, histogram.percentile(1)/1000);
+                     System.out.printf("%tT verbs: %s\n", date, verbSummary);
                  }
              }
         );
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/ClusterSimulation.java 
b/test/simulator/main/org/apache/cassandra/simulator/ClusterSimulation.java
index 60a71b4b3b..8ff2aefe74 100644
--- a/test/simulator/main/org/apache/cassandra/simulator/ClusterSimulation.java
+++ b/test/simulator/main/org/apache/cassandra/simulator/ClusterSimulation.java
@@ -60,6 +60,7 @@ import org.apache.cassandra.io.compress.LZ4Compressor;
 import org.apache.cassandra.io.filesystem.ListenableFileSystem;
 import org.apache.cassandra.io.util.FileSystems;
 import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.service.consensus.TransactionalMode;
 import org.apache.cassandra.service.paxos.BallotGenerator;
 import org.apache.cassandra.service.paxos.PaxosPrepare;
 import org.apache.cassandra.simulator.RandomSource.Choices;
@@ -198,7 +199,7 @@ public class ClusterSimulation<S extends Simulation> 
implements AutoCloseable
         protected HeapPool.Logged.Listener memoryListener;
         protected SimulatedTime.Listener timeListener = (i1, i2) -> {};
         protected LongConsumer onThreadLocalRandomCheck;
-        protected String lwtStrategy = "migration";
+        protected String transactionalMode = "full";
 
         public Builder<S> failures(Failures failures)
         {
@@ -575,12 +576,17 @@ public class ClusterSimulation<S extends Simulation> 
implements AutoCloseable
             return this;
         }
 
-        public Builder<S> lwtStrategy(String strategy)
+        public Builder<S> transactionalMode(String mode)
         {
-            this.lwtStrategy = strategy;
+            this.transactionalMode = mode;
             return this;
         }
 
+        public TransactionalMode transactionalMode()
+        {
+            return TransactionalMode.fromString(transactionalMode);
+        }
+
         public abstract ClusterSimulation<S> create(long seed) throws 
IOException;
     }
 
@@ -774,7 +780,6 @@ public class ClusterSimulation<S extends Simulation> 
implements AutoCloseable
                                    .set("use_deterministic_table_id", true)
                                    .set("disk_access_mode", "standard")
                                    .set("failure_detector", 
SimulatedFailureDetector.Instance.class.getName())
-                                   .set("lwt_strategy", builder.lwtStrategy)
                                    .set("commitlog_compression", new 
ParameterizedClass(LZ4Compressor.class.getName(), emptyMap()));
                              ;
                              
configUpdater.accept(threadAllocator.update(config));
@@ -866,6 +871,7 @@ public class ClusterSimulation<S extends Simulation> 
implements AutoCloseable
             simulated.register((SimulatedFutureActionScheduler) 
futureActionScheduler);
 
         scheduler = builder.schedulerFactory.create(random);
+        // TODO (required): we aren't passing paxos variant change parameter 
anymore
         options = new ClusterActions.Options(builder.topologyChangeLimit, 
Choices.uniform(KindOfSequence.values()).choose(random).period(builder.topologyChangeIntervalNanos,
 random),
                                              Choices.random(random, 
builder.topologyChanges),
                                              builder.consensusChangeLimit, 
Choices.uniform(KindOfSequence.values()).choose(random).period(builder.consensusChangeIntervalNanos,
 random),
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/SimulationRunner.java 
b/test/simulator/main/org/apache/cassandra/simulator/SimulationRunner.java
index 05f3588911..e5c875cb6e 100644
--- a/test/simulator/main/org/apache/cassandra/simulator/SimulationRunner.java
+++ b/test/simulator/main/org/apache/cassandra/simulator/SimulationRunner.java
@@ -344,7 +344,7 @@ public class SimulationRunner
                 builder.debug(debugLevels, debugPrimaryKeys);
             }
 
-            Optional.ofNullable(lwtStrategy).ifPresent(builder::lwtStrategy);
+            
Optional.ofNullable(lwtStrategy).ifPresent(builder::transactionalMode);
         }
 
         public void run(B builder) throws IOException
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/AccordClusterSimulation.java
 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/AccordClusterSimulation.java
index a75a1ef461..ee8fd0ca49 100644
--- 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/AccordClusterSimulation.java
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/AccordClusterSimulation.java
@@ -71,11 +71,11 @@ class AccordClusterSimulation extends 
ClusterSimulation<PaxosSimulation> impleme
                   int[] primaryKeys = primaryKeys(seed, 
builder.primaryKeyCount());
                   KindOfSequence.Period jitter = 
RandomSource.Choices.uniform(KindOfSequence.values()).choose(random)
                                                                      
.period(builder.schedulerJitterNanos(), random);
-                  return new PairOfSequencesAccordSimulation(simulated, 
cluster, options,
-                                                            
builder.readChance().select(random), builder.concurrency(), 
builder.primaryKeySeconds(), builder.withinKeyConcurrency(),
-                                                            SERIAL, 
schedulers, builder.debug(), seed,
-                                                            primaryKeys, 
builder.secondsToSimulate() >= 0 ? SECONDS.toNanos(builder.secondsToSimulate()) 
: -1,
-                                                            () -> 
jitter.get(random));
+                  return new PairOfSequencesAccordSimulation(simulated, 
cluster, options, builder.transactionalMode(),
+                                                             
builder.readChance().select(random), builder.concurrency(), 
builder.primaryKeySeconds(), builder.withinKeyConcurrency(),
+                                                             SERIAL, 
schedulers, builder.debug(), seed,
+                                                             primaryKeys, 
builder.secondsToSimulate() >= 0 ? SECONDS.toNanos(builder.secondsToSimulate()) 
: -1,
+                                                             () -> 
jitter.get(random));
               });
     }
 
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesAccordSimulation.java
 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesAccordSimulation.java
index 8d6c8a0dcc..7965c29fc1 100644
--- 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesAccordSimulation.java
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesAccordSimulation.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.distributed.api.SimpleQueryResult;
 import org.apache.cassandra.distributed.impl.Query;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.consensus.TransactionalMode;
 import org.apache.cassandra.simulator.Action;
 import org.apache.cassandra.simulator.Debug;
 import org.apache.cassandra.simulator.RunnableActionScheduler;
@@ -123,10 +124,12 @@ public class PairOfSequencesAccordSimulation extends 
AbstractPairOfSequencesPaxo
 
     private final float writeRatio;
     private final HistoryValidator validator;
+    private final TransactionalMode transactionalMode;
 
     public PairOfSequencesAccordSimulation(SimulatedSystems simulated,
                                            Cluster cluster,
                                            ClusterActions.Options 
clusterOptions,
+                                           TransactionalMode transactionalMode,
                                            float readRatio,
                                            int concurrency, IntRange 
simulateKeyForSeconds, IntRange withinKeyConcurrency,
                                            ConsistencyLevel serialConsistency, 
RunnableActionScheduler scheduler, Debug debug,
@@ -139,6 +142,7 @@ public class PairOfSequencesAccordSimulation extends 
AbstractPairOfSequencesPaxo
               scheduler, debug,
               seed, primaryKeys,
               runForNanos, jitter);
+        this.transactionalMode = transactionalMode;
         this.writeRatio = 1F - readRatio;
         HistoryValidator validator = new 
StrictSerializabilityValidator(primaryKeys);
         if 
(CassandraRelevantProperties.TEST_HISTORY_VALIDATOR_LOGGING_ENABLED.getBoolean())
@@ -149,7 +153,7 @@ public class PairOfSequencesAccordSimulation extends 
AbstractPairOfSequencesPaxo
     @Override
     protected String createTableStmt()
     {
-        return "CREATE TABLE " + KEYSPACE + ".tbl (pk int, count int, seq 
text, PRIMARY KEY (pk))";
+        return String.format("CREATE TABLE " + KEYSPACE + ".tbl (pk int, count 
int, seq text, PRIMARY KEY (pk)) WITH transactional_mode = '%s'", 
transactionalMode);
     }
 
     @Override
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosClusterSimulation.java
 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosClusterSimulation.java
index 6e7d058bea..c54ba1c26d 100644
--- 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosClusterSimulation.java
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosClusterSimulation.java
@@ -69,11 +69,6 @@ class PaxosClusterSimulation extends 
ClusterSimulation<PaxosSimulation> implemen
             random.reset(seed);
             return new PaxosClusterSimulation(random, seed, uniqueNum, this);
         }
-
-        public String lwtStrategy()
-        {
-            return lwtStrategy;
-        }
     }
 
     PaxosClusterSimulation(RandomSource random, long seed, int uniqueNum, 
Builder builder) throws IOException
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulationRunner.java
 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulationRunner.java
index 71734c6e68..095c79769a 100644
--- 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulationRunner.java
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulationRunner.java
@@ -69,7 +69,7 @@ public class PaxosSimulationRunner extends SimulationRunner
         @Override
         protected void run( long seed, PaxosClusterSimulation.Builder builder) 
throws IOException
         {
-            if (Objects.equals(builder.lwtStrategy(), "accord"))
+            if (Objects.equals(builder.transactionalMode(), "accord"))
             {
                 // Apply handicaps
                 builder.dcs(new IntRange(1, 1));
diff --git a/test/unit/org/apache/cassandra/journal/SyncedOffsetsTest.java 
b/test/unit/org/apache/cassandra/journal/SyncedOffsetsTest.java
index 5b83ee88f1..b5df2b6b22 100644
--- a/test/unit/org/apache/cassandra/journal/SyncedOffsetsTest.java
+++ b/test/unit/org/apache/cassandra/journal/SyncedOffsetsTest.java
@@ -57,9 +57,9 @@ public class SyncedOffsetsTest
 
         Descriptor descriptor = Descriptor.create(directory, 
System.currentTimeMillis(), 1);
 
-        SyncedOffsets active = SyncedOffsets.active(descriptor, syncOnMark);
+        SyncedOffsets active = SyncedOffsets.active(descriptor);
         for (int i = 0; i < n; i++)
-            active.mark(i);
+            active.mark(i, syncOnMark);
         assertEquals(n - 1, active.syncedOffset());
         active.close();
 
diff --git 
a/test/unit/org/apache/cassandra/service/accord/serializers/WaitingOnSerializerTest.java
 
b/test/unit/org/apache/cassandra/service/accord/serializers/WaitingOnSerializerTest.java
index 3df7d87e0d..07b626346a 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/serializers/WaitingOnSerializerTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/serializers/WaitingOnSerializerTest.java
@@ -57,7 +57,7 @@ public class WaitingOnSerializerTest
             TxnId txnId = TxnId.NONE;
             if (waitingOn.appliedOrInvalidated != null) txnId = new 
TxnId(txnId.epoch(), txnId.hlc(), txnId.kind(), Routable.Domain.Range, 
txnId.node);
             buffer.clear();
-            long expectedSize = WaitingOnSerializer.serializedSize(waitingOn);
+            long expectedSize = WaitingOnSerializer.serializedSize(txnId, 
waitingOn);
             WaitingOnSerializer.serialize(txnId, waitingOn, buffer);
             Assertions.assertThat(buffer.getLength()).isEqualTo(expectedSize);
             Command.WaitingOn read = WaitingOnSerializer.deserialize(txnId, 
waitingOn.keys, waitingOn.txnIds, new 
DataInputBuffer(buffer.unsafeGetBufferAndFlip(), false));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to