This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit f9f2d93447add98dc4784a331310245138312183
Author: Benedict Elliott Smith <bened...@apache.org>
AuthorDate: Sun Jan 8 22:06:38 2023 +0000

    Shard local CommandStores on contiguous ranges
    
    patch by Benedict; reviewed by Aleksey Yeschenko for CASSANDRA-18142
---
 .build/include-accord.sh                           |   2 +-
 .../cassandra/db/marshal/ByteArrayAccessor.java    |   2 +
 .../apache/cassandra/dht/AccordBytesSplitter.java  |  89 ++++
 .../org/apache/cassandra/dht/AccordSplitter.java   | 103 ++++
 .../cassandra/dht/ByteOrderedPartitioner.java      |   8 +
 .../org/apache/cassandra/dht/IPartitioner.java     |   3 +
 .../org/apache/cassandra/dht/LocalPartitioner.java |   8 +
 .../apache/cassandra/dht/Murmur3Partitioner.java   |  22 +
 .../cassandra/dht/OrderPreservingPartitioner.java  |  65 ++-
 .../apache/cassandra/dht/RandomPartitioner.java    |  20 +
 src/java/org/apache/cassandra/dht/Splitter.java    |   8 +-
 src/java/org/apache/cassandra/schema/TableId.java  |   2 +-
 .../cassandra/service/accord/AccordCommand.java    |  16 +-
 .../service/accord/AccordCommandStore.java         | 516 +++++++++++----------
 .../service/accord/AccordCommandStores.java        |  51 +-
 .../cassandra/service/accord/AccordKeyspace.java   |  22 +-
 .../service/accord/AccordObjectSizes.java          |   3 +-
 .../service/accord/AccordSerializers.java          |   2 +-
 .../cassandra/service/accord/AccordService.java    |   6 +
 .../service/accord/AccordTopologyUtils.java        |  52 +--
 .../cassandra/service/accord/ListenerProxy.java    |   9 +-
 .../cassandra/service/accord/TokenRange.java       |   5 +-
 .../service/accord/api/AccordRoutableKey.java      |  36 +-
 .../service/accord/api/AccordRoutingKey.java       | 105 +++--
 .../cassandra/service/accord/api/PartitionKey.java |  26 +-
 .../service/accord/async/AsyncOperation.java       |   4 +-
 .../cassandra/service/accord/txn/TxnData.java      |  17 +-
 .../cassandra/service/accord/txn/TxnNamedRead.java |   2 +-
 .../cassandra/service/accord/txn/TxnUpdate.java    |   2 +-
 .../cassandra/dht/ByteOrderedPartitionerTest.java  |  16 +
 .../apache/cassandra/dht/LengthPartitioner.java    |  34 +-
 .../dht/OrderPreservingPartitionerTest.java        |  20 +-
 .../apache/cassandra/dht/PartitionerTestCase.java  | 115 +++++
 .../service/accord/AccordCommandTest.java          |   7 +-
 .../cassandra/service/accord/AccordTestUtils.java  |  81 +---
 .../service/accord/AccordTopologyTest.java         |   4 +-
 .../service/accord/api/AccordKeyTest.java          |  40 +-
 .../service/accord/async/AsyncOperationTest.java   |   4 +-
 .../accord/serializers/CommandSerializersTest.java |   5 +-
 .../service/accord/txn/AbstractKeySortedTest.java  |   2 +-
 40 files changed, 1005 insertions(+), 529 deletions(-)

diff --git a/.build/include-accord.sh b/.build/include-accord.sh
index b1409603fb..37bdcbe079 100755
--- a/.build/include-accord.sh
+++ b/.build/include-accord.sh
@@ -25,7 +25,7 @@ set -o nounset
 bin="$(cd "$(dirname "$0")" > /dev/null; pwd)"
 
 accord_repo='https://github.com/apache/cassandra-accord.git'
-accord_branch='63c37e20cfe66a421c1b07ba1f430a9e6aabe4c5'
+accord_branch='804a77d32c8ae45751a3a7f450b372560f08cacc'
 accord_src="$bin/cassandra-accord"
 
 checkout() {
diff --git a/src/java/org/apache/cassandra/db/marshal/ByteArrayAccessor.java 
b/src/java/org/apache/cassandra/db/marshal/ByteArrayAccessor.java
index d7108992da..4926027a7b 100644
--- a/src/java/org/apache/cassandra/db/marshal/ByteArrayAccessor.java
+++ b/src/java/org/apache/cassandra/db/marshal/ByteArrayAccessor.java
@@ -25,6 +25,7 @@ import java.nio.charset.Charset;
 import java.util.Arrays;
 import java.util.UUID;
 
+import accord.utils.Invariants;
 import org.apache.cassandra.db.Digest;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -107,6 +108,7 @@ public class ByteArrayAccessor implements 
ValueAccessor<byte[]>
     @Override
     public byte[] slice(byte[] input, int offset, int length)
     {
+        Invariants.checkArgument(offset + length <= input.length);
         return Arrays.copyOfRange(input, offset, offset + length);
     }
 
diff --git a/src/java/org/apache/cassandra/dht/AccordBytesSplitter.java 
b/src/java/org/apache/cassandra/dht/AccordBytesSplitter.java
new file mode 100644
index 0000000000..c27bc43599
--- /dev/null
+++ b/src/java/org/apache/cassandra/dht/AccordBytesSplitter.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.math.BigInteger;
+
+import accord.api.RoutingKey;
+import accord.primitives.Ranges;
+import accord.utils.Invariants;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey;
+
+import static accord.utils.Invariants.checkArgument;
+import static java.math.BigInteger.ONE;
+import static java.math.BigInteger.ZERO;
+
+public class AccordBytesSplitter extends AccordSplitter
+{
+    final int byteLength;
+
+    protected AccordBytesSplitter(Ranges ranges)
+    {
+        int bytesLength = 0;
+        for (accord.primitives.Range range : ranges)
+        {
+            bytesLength = Integer.max(bytesLength, byteLength(range.start()));
+            bytesLength = Integer.max(bytesLength, byteLength(range.end()));
+        }
+        this.byteLength = bytesLength;
+    }
+
+    @Override
+    BigInteger minimumValue()
+    {
+        return ZERO;
+    }
+
+    @Override
+    BigInteger maximumValue()
+    {
+        return ONE.shiftLeft(8 * byteLength).subtract(ONE);
+    }
+
+    @Override
+    BigInteger valueForToken(Token token)
+    {
+        byte[] bytes = ((ByteOrderedPartitioner.BytesToken) token).token;
+        checkArgument(bytes.length <= byteLength);
+        BigInteger value = ZERO;
+        for (int i = 0 ; i < bytes.length ; ++i)
+            value = value.add(BigInteger.valueOf(bytes[i] & 
0xffL).shiftLeft((byteLength - 1 - i) * 8));
+        return value;
+    }
+
+    @Override
+    Token tokenForValue(BigInteger value)
+    {
+        Invariants.checkArgument(value.compareTo(ZERO) >= 0);
+        byte[] bytes = new byte[byteLength];
+        for (int i = 0 ; i < bytes.length ; ++i)
+            bytes[i] = value.shiftRight((byteLength - 1 - i) * 8).byteValue();
+        return new ByteOrderedPartitioner.BytesToken(bytes);
+    }
+
+    private static int byteLength(RoutingKey routingKey)
+    {
+        return byteLength(((AccordRoutingKey) routingKey).token());
+    }
+
+    private static int byteLength(Token token)
+    {
+        return ((ByteOrderedPartitioner.BytesToken) token).token.length;
+    }
+}
diff --git a/src/java/org/apache/cassandra/dht/AccordSplitter.java 
b/src/java/org/apache/cassandra/dht/AccordSplitter.java
new file mode 100644
index 0000000000..232a47d454
--- /dev/null
+++ b/src/java/org/apache/cassandra/dht/AccordSplitter.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.math.BigInteger;
+
+import accord.local.ShardDistributor;
+import org.apache.cassandra.service.accord.TokenRange;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey.SentinelKey;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
+
+import static java.math.BigInteger.ZERO;
+
+public abstract class AccordSplitter implements 
ShardDistributor.EvenSplit.Splitter<BigInteger>
+{
+    abstract BigInteger valueForToken(Token token);
+    abstract Token tokenForValue(BigInteger value);
+    abstract BigInteger minimumValue();
+    abstract BigInteger maximumValue();
+
+    @Override
+    public BigInteger sizeOf(accord.primitives.Range range)
+    {
+        // note: minimum value
+        BigInteger start = range.start() instanceof SentinelKey ? 
minimumValue() : valueForToken(((AccordRoutingKey)range.start()).token());
+        BigInteger end = range.end() instanceof SentinelKey ? maximumValue() : 
valueForToken(((AccordRoutingKey)range.end()).token());
+        return end.subtract(start);
+    }
+
+    @Override
+    public accord.primitives.Range subRange(accord.primitives.Range range, 
BigInteger startOffset, BigInteger endOffset)
+    {
+        AccordRoutingKey startBound = (AccordRoutingKey)range.start();
+        AccordRoutingKey endBound = (AccordRoutingKey)range.end();
+
+        BigInteger start = startBound instanceof SentinelKey ? minimumValue() 
: valueForToken(startBound.token());
+        BigInteger end = endBound instanceof SentinelKey ? maximumValue() : 
valueForToken(endBound.token());
+        BigInteger sizeOfRange = end.subtract(start);
+
+        String keyspace = startBound.keyspace();
+        return new TokenRange(startOffset.equals(ZERO) ? startBound : new 
TokenKey(keyspace, tokenForValue(start.add(startOffset))),
+                              endOffset.equals(sizeOfRange) ? endBound : new 
TokenKey(keyspace, tokenForValue(start.add(endOffset))));
+    }
+
+    @Override
+    public BigInteger zero()
+    {
+        return ZERO;
+    }
+
+    @Override
+    public BigInteger add(BigInteger a, BigInteger b)
+    {
+        return a.add(b);
+    }
+
+    @Override
+    public BigInteger subtract(BigInteger a, BigInteger b)
+    {
+        return a.subtract(b);
+    }
+
+    @Override
+    public BigInteger divide(BigInteger a, int i)
+    {
+        return a.divide(BigInteger.valueOf(i));
+    }
+
+    @Override
+    public BigInteger multiply(BigInteger a, int i)
+    {
+        return a.multiply(BigInteger.valueOf(i));
+    }
+
+    @Override
+    public int min(BigInteger v, int i)
+    {
+        return v.min(BigInteger.valueOf(i)).intValue();
+    }
+
+    @Override
+    public int compare(BigInteger a, BigInteger b)
+    {
+        return a.compareTo(b);
+    }
+}
diff --git a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java 
b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
index 788763635a..9732da9475 100644
--- a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.dht;
 
+import accord.primitives.Ranges;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.db.BufferDecoratedKey;
@@ -44,6 +45,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
 
 import com.google.common.collect.Maps;
 
@@ -386,4 +388,10 @@ public class ByteOrderedPartitioner implements IPartitioner
     {
         return BytesType.instance;
     }
+
+    @Override
+    public Function<Ranges, AccordSplitter> accordSplitter()
+    {
+        return AccordBytesSplitter::new;
+    }
 }
diff --git a/src/java/org/apache/cassandra/dht/IPartitioner.java 
b/src/java/org/apache/cassandra/dht/IPartitioner.java
index 0d32dc9c31..a2596f22ed 100644
--- a/src/java/org/apache/cassandra/dht/IPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/IPartitioner.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Random;
+import java.util.function.Function;
 
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -136,6 +137,8 @@ public interface IPartitioner
         return Optional.empty();
     }
 
+    Function<accord.primitives.Ranges, AccordSplitter> accordSplitter();
+
     default boolean isFixedLength()
     {
         return false;
diff --git a/src/java/org/apache/cassandra/dht/LocalPartitioner.java 
b/src/java/org/apache/cassandra/dht/LocalPartitioner.java
index ad8461962d..2daf45e7ac 100644
--- a/src/java/org/apache/cassandra/dht/LocalPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/LocalPartitioner.java
@@ -22,7 +22,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.function.Function;
 
+import accord.primitives.Ranges;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.CachedHashDecoratedKey;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -207,4 +209,10 @@ public class LocalPartitioner implements IPartitioner
             return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(token);
         }
     }
+
+    @Override
+    public Function<Ranges, AccordSplitter> accordSplitter()
+    {
+        return AccordBytesSplitter::new;
+    }
 }
diff --git a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java 
b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
index 1f7f3605e9..4101449b1c 100644
--- a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
+++ b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
@@ -23,7 +23,9 @@ import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
 
+import accord.primitives.Ranges;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.PreHashedDecoratedKey;
 import org.apache.cassandra.db.TypeSizes;
@@ -58,6 +60,8 @@ public class Murmur3Partitioner implements IPartitioner
 
     private final Splitter splitter = new Splitter(this)
     {
+        final BigInteger MAX = BigInteger.valueOf(Long.MAX_VALUE), MIN = 
BigInteger.valueOf(Long.MIN_VALUE);
+
         public Token tokenForValue(BigInteger value)
         {
             return new LongToken(value.longValue());
@@ -67,6 +71,18 @@ public class Murmur3Partitioner implements IPartitioner
         {
             return BigInteger.valueOf(((LongToken) token).token);
         }
+
+        @Override
+        BigInteger minimumValue()
+        {
+            return MIN;
+        }
+
+        @Override
+        BigInteger maximumValue()
+        {
+            return MAX;
+        }
     };
 
     public DecoratedKey decorateKey(ByteBuffer key)
@@ -448,4 +464,10 @@ public class Murmur3Partitioner implements IPartitioner
     {
         return Optional.of(splitter);
     }
+
+    @Override
+    public Function<Ranges, AccordSplitter> accordSplitter()
+    {
+        return ignore -> splitter;
+    }
 }
diff --git a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java 
b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
index 18cd94c388..edceee7d66 100644
--- a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
@@ -22,7 +22,10 @@ import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.util.*;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
 
+import accord.api.RoutingKey;
+import accord.primitives.Ranges;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.CachedHashDecoratedKey;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -32,6 +35,7 @@ import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.bytecomparable.ByteComparable;
 import org.apache.cassandra.utils.bytecomparable.ByteSource;
@@ -40,6 +44,11 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.Pair;
 
+import static accord.utils.Invariants.checkArgument;
+import static java.lang.Integer.max;
+import static java.math.BigInteger.ONE;
+import static java.math.BigInteger.ZERO;
+
 public class OrderPreservingPartitioner implements IPartitioner
 {
     private static final String rndchars = 
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
@@ -81,7 +90,7 @@ public class OrderPreservingPartitioner implements 
IPartitioner
     {
         assert str.length() <= sigchars;
 
-        BigInteger big = BigInteger.ZERO;
+        BigInteger big = ZERO;
         for (int i = 0; i < str.length(); i++)
         {
             int charpos = 16 * (sigchars - (i + 1));
@@ -276,4 +285,58 @@ public class OrderPreservingPartitioner implements 
IPartitioner
     {
         return UTF8Type.instance;
     }
+
+    @Override
+    public Function<Ranges, AccordSplitter> accordSplitter()
+    {
+        return ranges -> new AccordSplitter()
+        {
+            final int charLength = ranges.stream().mapToInt(range -> 
max(charLength(range.start()), charLength(range.end())))
+                                         .max().orElse(0);
+
+            @Override
+            BigInteger valueForToken(Token token)
+            {
+                String chars = ((StringToken) token).token;
+                checkArgument(chars.length() <= charLength);
+                BigInteger value = ZERO;
+                for (int i = 0 ; i < chars.length() ; ++i)
+                    value = value.add(BigInteger.valueOf(chars.charAt(i) & 
0xffffL).shiftLeft((charLength - 1 - i) * 16));
+                return value;
+            }
+
+            @Override
+            Token tokenForValue(BigInteger value)
+            {
+                // TODO (required): test
+                checkArgument(value.compareTo(ZERO) >= 0);
+                char[] chars = new char[charLength];
+                for (int i = 0 ; i < chars.length ; ++i)
+                    chars[i] = (char) value.shiftRight((charLength - 1 - i) * 
16).shortValue();
+                return new StringToken(new String(chars));
+            }
+
+            @Override
+            BigInteger minimumValue()
+            {
+                return ZERO;
+            }
+
+            @Override
+            BigInteger maximumValue()
+            {
+                return ONE.shiftLeft(charLength * 16).subtract(ONE);
+            }
+        };
+    }
+
+    private static int charLength(RoutingKey routingKey)
+    {
+        return charLength(((AccordRoutingKey) routingKey).token());
+    }
+
+    private static int charLength(Token token)
+    {
+        return ((StringToken) token).token.length();
+    }
 }
diff --git a/src/java/org/apache/cassandra/dht/RandomPartitioner.java 
b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
index 7d1e7505a1..180726348d 100644
--- a/src/java/org/apache/cassandra/dht/RandomPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
@@ -23,9 +23,11 @@ import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.util.*;
+import java.util.function.Function;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import accord.primitives.Ranges;
 import org.apache.cassandra.db.CachedHashDecoratedKey;
 import org.apache.cassandra.db.marshal.ByteArrayAccessor;
 import org.apache.cassandra.db.marshal.ByteBufferAccessor;
@@ -92,6 +94,18 @@ public class RandomPartitioner implements IPartitioner
         {
             return ((BigIntegerToken)token).getTokenValue();
         }
+
+        @Override
+        BigInteger minimumValue()
+        {
+            return MINIMUM.getTokenValue();
+        }
+
+        @Override
+        BigInteger maximumValue()
+        {
+            return MAXIMUM;
+        }
     };
 
     public DecoratedKey decorateKey(ByteBuffer key)
@@ -368,6 +382,12 @@ public class RandomPartitioner implements IPartitioner
         return Optional.of(splitter);
     }
 
+    @Override
+    public Function<Ranges, AccordSplitter> accordSplitter()
+    {
+        return ignore -> splitter;
+    }
+
     private static BigInteger hashToBigInteger(ByteBuffer data)
     {
         MessageDigest messageDigest = localMD5Digest.get();
diff --git a/src/java/org/apache/cassandra/dht/Splitter.java 
b/src/java/org/apache/cassandra/dht/Splitter.java
index e410a9cb29..91782f467a 100644
--- a/src/java/org/apache/cassandra/dht/Splitter.java
+++ b/src/java/org/apache/cassandra/dht/Splitter.java
@@ -36,7 +36,7 @@ import static java.util.stream.Collectors.toSet;
 /**
  * Partition splitter.
  */
-public abstract class Splitter
+public abstract class Splitter extends AccordSplitter
 {
     private final IPartitioner partitioner;
 
@@ -45,12 +45,6 @@ public abstract class Splitter
         this.partitioner = partitioner;
     }
 
-    @VisibleForTesting
-    protected abstract Token tokenForValue(BigInteger value);
-
-    @VisibleForTesting
-    protected abstract BigInteger valueForToken(Token token);
-
     @VisibleForTesting
     protected BigInteger tokensInRange(Range<Token> range)
     {
diff --git a/src/java/org/apache/cassandra/schema/TableId.java 
b/src/java/org/apache/cassandra/schema/TableId.java
index 53b4465bf3..cc4fd39c39 100644
--- a/src/java/org/apache/cassandra/schema/TableId.java
+++ b/src/java/org/apache/cassandra/schema/TableId.java
@@ -131,7 +131,7 @@ public class TableId implements Comparable<TableId>
         return position - offset;
     }
 
-    public static int serializedSize()
+    public final int serializedSize()
     {
         return 16;
     }
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommand.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommand.java
index 2b4f36863d..2003e77ae1 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommand.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommand.java
@@ -56,6 +56,7 @@ import accord.primitives.Txn;
 import accord.primitives.TxnId;
 import accord.primitives.Writes;
 import accord.utils.DeterministicIdentitySet;
+import 
org.apache.cassandra.service.accord.AccordCommandStore.SafeAccordCommandStore;
 import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.service.accord.async.AsyncContext;
 import org.apache.cassandra.service.accord.store.StoredNavigableMap;
@@ -342,7 +343,7 @@ public class AccordCommand extends Command implements 
AccordState<TxnId>
         blockingCommitOn.clearModifiedFlag();
         waitingOnApply.clearModifiedFlag();
         blockingApplyOn.clearModifiedFlag();
-        storedListeners.clearModifiedFlag();;
+        storedListeners.clearModifiedFlag();
     }
 
     @Override
@@ -628,7 +629,7 @@ public class AccordCommand extends Command implements 
AccordState<TxnId>
     @Override
     protected void postApply(SafeCommandStore safeStore)
     {
-        AccordStateCache.Instance<TxnId, AccordCommand> cache = 
((AccordCommandStore) safeStore).commandCache();
+        AccordStateCache.Instance<TxnId, AccordCommand> cache = 
((SafeAccordCommandStore) safeStore).commandStore().commandCache();
         cache.cleanupWriteFuture(txnId);
         super.postApply(safeStore);
     }
@@ -640,10 +641,7 @@ public class AccordCommand extends Command implements 
AccordState<TxnId>
         for (int i=0,mi=keys.size(); i<mi; i++)
         {
             PartitionKey key = (PartitionKey) keys.get(i);
-            if (((AccordCommandStore)safeStore).isCommandsForKeyInContext(key))
-                continue;
-
-            if (!safeStore.commandStore().hashIntersects(key))
+            if (((SafeAccordCommandStore) 
safeStore).commandStore().isCommandsForKeyInContext(key))
                 continue;
 
             if (!ranges.contains(key))
@@ -672,7 +670,7 @@ public class AccordCommand extends Command implements 
AccordState<TxnId>
 
     private Future<Void> apply(SafeCommandStore safeStore, boolean 
canReschedule)
     {
-        AccordStateCache.Instance<TxnId, AccordCommand> cache = 
((AccordCommandStore) safeStore).commandCache();
+        AccordStateCache.Instance<TxnId, AccordCommand> cache = 
((SafeAccordCommandStore) safeStore).commandStore().commandCache();
         Future<Void> future = cache.getWriteFuture(txnId);
         if (future != null)
             return future;
@@ -700,7 +698,7 @@ public class AccordCommand extends Command implements 
AccordState<TxnId>
     @Override
     public Future<Data> read(SafeCommandStore safeStore)
     {
-        AccordStateCache.Instance<TxnId, AccordCommand> cache = 
((AccordCommandStore) safeStore).commandCache();
+        AccordStateCache.Instance<TxnId, AccordCommand> cache = 
((SafeAccordCommandStore) safeStore).commandStore().commandCache();
         Future<Data> future = cache.getReadFuture(txnId);
         if (future != null)
             return future;
@@ -756,7 +754,7 @@ public class AccordCommand extends Command implements 
AccordState<TxnId>
         storedListeners.getView().forEach(l -> l.onChange(safeStore, this));
         transientListeners.forEach(listener -> {
             PreLoadContext ctx = listener.listenerPreLoadContext(txnId());
-            AsyncContext context = 
((AccordCommandStore)safeStore).getContext();
+            AsyncContext context = 
((SafeAccordCommandStore)safeStore).context();
             if (context.containsScopedItems(ctx))
             {
                 logger.trace("{}: synchronously updating listener {}", 
txnId(), listener);
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index 068b08eff8..f2b54492fa 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -27,8 +27,6 @@ import java.util.function.BinaryOperator;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
-import com.google.common.base.Preconditions;
-
 import accord.api.Agent;
 import accord.api.DataStore;
 import accord.api.Key;
@@ -36,6 +34,8 @@ import accord.api.ProgressLog;
 import accord.local.Command;
 import accord.local.CommandListener;
 import accord.local.CommandStore;
+import accord.local.CommandStores.RangesForEpoch;
+import accord.local.CommandStores.RangesForEpochHolder;
 import accord.local.CommandsForKey;
 import accord.local.NodeTimeService;
 import accord.local.PreLoadContext;
@@ -48,6 +48,7 @@ import accord.primitives.Seekables;
 import accord.primitives.Timestamp;
 import accord.primitives.AbstractKeys;
 import accord.primitives.TxnId;
+import accord.utils.Invariants;
 import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.service.accord.async.AsyncContext;
 import org.apache.cassandra.service.accord.async.AsyncOperation;
@@ -55,11 +56,257 @@ import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.concurrent.Future;
 import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
-public class AccordCommandStore extends CommandStore implements 
SafeCommandStore
+import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
+
+public class AccordCommandStore extends CommandStore
 {
-    public static long maxCacheSize()
+    public class SafeAccordCommandStore implements SafeCommandStore
     {
-        return 5 << 20; // TODO: make configurable
+        final RangesForEpoch rangesForEpoch;
+        final AsyncContext context;
+
+        SafeAccordCommandStore(RangesForEpoch rangesForEpoch, AsyncContext 
context)
+        {
+            this.rangesForEpoch = rangesForEpoch;
+            this.context = context;
+        }
+
+        public AsyncContext context()
+        {
+            return context;
+        }
+
+        @Override
+        public Command command(TxnId txnId)
+        {
+            AccordCommand command = getCommandInternal(txnId);
+            if (command.isEmpty())
+                command.initialize();
+            return command;
+        }
+
+        @Override
+        public Command ifPresent(TxnId txnId)
+        {
+            AccordCommand command = getCommandInternal(txnId);
+            return !command.isEmpty() ? command : null;
+        }
+
+        @Override
+        public Command ifLoaded(TxnId txnId)
+        {
+            AccordCommand command = commandCache.getOrNull(txnId);
+            if (command != null && command.isLoaded())
+            {
+                getContext().commands.add(command);
+                return command;
+            }
+            return null;
+        }
+
+        @Override
+        public CommandsForKey commandsForKey(Key key)
+        {
+            AccordCommandsForKey commandsForKey = 
getCommandsForKeyInternal(key);
+            if (commandsForKey.isEmpty())
+                commandsForKey.initialize();
+            return commandsForKey;
+        }
+
+        @Override
+        public CommandsForKey maybeCommandsForKey(Key key)
+        {
+            AccordCommandsForKey commandsForKey = 
getCommandsForKeyInternal(key);
+            return !commandsForKey.isEmpty() ? commandsForKey : null;
+        }
+
+        @Override
+        public void addAndInvokeListener(TxnId txnId, CommandListener listener)
+        {
+            AccordCommand.WriteOnly command = (AccordCommand.WriteOnly) 
getContext().commands.getOrCreateWriteOnly(txnId, (ignore, id) -> new 
AccordCommand.WriteOnly(id), commandStore());
+            command.addListener(listener);
+            execute(listener.listenerPreLoadContext(txnId), store -> {
+                listener.onChange(store, store.command(txnId));
+            });
+        }
+
+        @Override
+        public <T> T mapReduce(Routables<?, ?> keysOrRanges, Ranges slice, 
Function<CommandsForKey, T> map, BinaryOperator<T> reduce, T initialValue)
+        {
+            switch (keysOrRanges.kindOfContents()) {
+                default:
+                    throw new AssertionError();
+                case Key:
+                    // TODO: efficiency
+                    AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) 
keysOrRanges;
+                    return keys.stream()
+                               .filter(slice::contains)
+                               .map(this::commandsForKey)
+                               .map(map)
+                               .reduce(initialValue, reduce);
+                case Range:
+                    // TODO: implement
+                    throw new UnsupportedOperationException();
+            }
+        }
+
+        public <T> T mapReduce(Routables<?, ?> keysOrRanges, 
Function<CommandsForKey, T> map, BinaryOperator<T> reduce, T initialValue)
+        {
+            switch (keysOrRanges.kindOfContents()) {
+                default:
+                    throw new AssertionError();
+                case Key:
+                    AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) 
keysOrRanges;
+                    return keys.stream()
+                               .map(this::commandsForKey)
+                               .map(map)
+                               .reduce(initialValue, reduce);
+                case Range:
+                    // TODO: implement
+                    throw new UnsupportedOperationException();
+            }
+        }
+
+        public void forEach(Routables<?, ?> keysOrRanges, 
Consumer<CommandsForKey> forEach)
+        {
+            switch (keysOrRanges.kindOfContents()) {
+                default:
+                    throw new AssertionError();
+                case Key:
+                    AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) 
keysOrRanges;
+                    keys.forEach(key -> forEach.accept(commandsForKey(key)));
+                    break;
+                case Range:
+                    // TODO: implement
+                    throw new UnsupportedOperationException();
+            }
+        }
+
+        public void forEach(Routable keyOrRange, Consumer<CommandsForKey> 
forEach)
+        {
+            switch (keyOrRange.kind())
+            {
+                default: throw new AssertionError();
+                case Key:
+                    forEach.accept(commandsForKey((Key) keyOrRange));
+                    break;
+                case Range:
+                    // TODO: implement
+                    throw new UnsupportedOperationException();
+            }
+        }
+
+        @Override
+        public void forEach(Routables<?, ?> keysOrRanges, Ranges slice, 
Consumer<CommandsForKey> forEach)
+        {
+            switch (keysOrRanges.kindOfContents()) {
+                default:
+                    throw new AssertionError();
+                case Key:
+                    AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) 
keysOrRanges;
+                    keys.forEach(slice, key -> {
+                        forEach.accept(commandsForKey(key));
+                    });
+                    break;
+                case Range:
+                    // TODO: implement
+                    throw new UnsupportedOperationException();
+            }
+        }
+
+        @Override
+        public void forEach(Routable keyOrRange, Ranges slice, 
Consumer<CommandsForKey> forEach)
+        {
+            switch (keyOrRange.kind())
+            {
+                default: throw new AssertionError();
+                case Key:
+                    Key key = (Key) keyOrRange;
+                    if (slice.contains(key))
+                        forEach.accept(commandsForKey(key));
+                    break;
+                case Range:
+                    // TODO: implement
+                    throw new UnsupportedOperationException();
+            }
+        }
+
+        @Override
+        public AccordCommandStore commandStore()
+        {
+            return AccordCommandStore.this;
+        }
+
+        @Override
+        public DataStore dataStore()
+        {
+            return dataStore;
+        }
+
+        @Override
+        public Agent agent()
+        {
+            return agent;
+        }
+
+        @Override
+        public ProgressLog progressLog()
+        {
+            return progressLog;
+        }
+
+        @Override
+        public RangesForEpoch ranges()
+        {
+            return rangesForEpoch;
+        }
+
+        @Override
+        public long latestEpoch()
+        {
+            return time.epoch();
+        }
+
+        @Override
+        public Timestamp preaccept(TxnId txnId, Seekables<?, ?> keys)
+        {
+            Timestamp max = maxConflict(keys);
+            long epoch = latestEpoch();
+            if (txnId.compareTo(max) > 0 && txnId.epoch >= epoch && 
!agent.isExpired(txnId, time.now()))
+                return txnId;
+
+            return time.uniqueNow(max);
+        }
+
+        @Override
+        public Future<Void> execute(PreLoadContext context, Consumer<? super 
SafeCommandStore> consumer)
+        {
+            return AccordCommandStore.this.execute(context, consumer);
+        }
+
+        @Override
+        public <T> Future<T> submit(PreLoadContext context, Function<? super 
SafeCommandStore, T> function)
+        {
+            return AccordCommandStore.this.submit(context, function);
+        }
+
+        @Override
+        public NodeTimeService time()
+        {
+            return time;
+        }
+
+        public Timestamp maxConflict(Seekables<?, ?> keys)
+        {
+            // TODO: Seekables
+            // TODO: efficiency
+            return ((Keys)keys).stream()
+                               .map(this::maybeCommandsForKey)
+                               .filter(Objects::nonNull)
+                               .map(CommandsForKey::max)
+                               .max(Comparator.naturalOrder())
+                               .orElse(Timestamp.NONE);
+        }
     }
 
     private static long getThreadId(ExecutorService executor)
@@ -91,29 +338,25 @@ public class AccordCommandStore extends CommandStore 
implements SafeCommandStore
     private final Agent agent;
     private final DataStore dataStore;
     private final ProgressLog progressLog;
-    private final RangesForEpoch rangesForEpoch;
+    private final RangesForEpochHolder rangesForEpochHolder;
 
     public AccordCommandStore(int id,
-                              int generation,
-                              int index,
-                              int numShards,
                               NodeTimeService time,
                               Agent agent,
                               DataStore dataStore,
                               ProgressLog.Factory progressLogFactory,
-                              RangesForEpoch rangesForEpoch,
-                              ExecutorService executor)
+                              RangesForEpochHolder rangesForEpoch)
     {
-        super(id, generation, index, numShards);
+        super(id);
         this.time = time;
         this.agent = agent;
         this.dataStore = dataStore;
         this.progressLog = progressLogFactory.create(this);
-        this.rangesForEpoch = rangesForEpoch;
-        this.loggingId = String.format("[%s:%s]", generation, index);
-        this.executor = executor;
+        this.rangesForEpochHolder = rangesForEpoch;
+        this.loggingId = String.format("[%s]", id);
+        this.executor = 
executorFactory().sequential(CommandStore.class.getSimpleName() + '[' + id + 
']');
         this.threadId = getThreadId(this.executor);
-        this.stateCache = new AccordStateCache(maxCacheSize() / numShards);
+        this.stateCache = new AccordStateCache(0);
         this.commandCache = stateCache.instance(TxnId.class,
                                                 AccordCommand.class,
                                                 AccordCommand::new);
@@ -128,14 +371,19 @@ public class AccordCommandStore extends CommandStore 
implements SafeCommandStore
         stateCache.setMaxSize(bytes);
     }
 
+    public SafeAccordCommandStore safeStore(AsyncContext context)
+    {
+        return new SafeAccordCommandStore(rangesForEpochHolder.get(), context);
+    }
+
     public void checkInStoreThread()
     {
-        Preconditions.checkState(Thread.currentThread().getId() == threadId);
+        Invariants.checkState(Thread.currentThread().getId() == threadId);
     }
 
     public void checkNotInStoreThread()
     {
-        Preconditions.checkState(Thread.currentThread().getId() != threadId);
+        Invariants.checkState(Thread.currentThread().getId() != threadId);
     }
 
     public ExecutorService executor()
@@ -155,19 +403,19 @@ public class AccordCommandStore extends CommandStore 
implements SafeCommandStore
 
     public void setContext(AsyncContext context)
     {
-        Preconditions.checkState(currentCtx == null);
+        Invariants.checkState(currentCtx == null);
         currentCtx = context;
     }
 
     public AsyncContext getContext()
     {
-        Preconditions.checkState(currentCtx != null);
+        Invariants.checkState(currentCtx != null);
         return currentCtx;
     }
 
     public void unsetContext(AsyncContext context)
     {
-        Preconditions.checkState(currentCtx == context);
+        Invariants.checkState(currentCtx == context);
         currentCtx = null;
     }
 
@@ -179,44 +427,14 @@ public class AccordCommandStore extends CommandStore 
implements SafeCommandStore
 
     private AccordCommand getCommandInternal(TxnId txnId)
     {
-        Preconditions.checkState(currentCtx != null);
+        Invariants.checkState(currentCtx != null);
         AccordCommand command = currentCtx.commands.get(txnId);
         if (command == null)
             throw new IllegalArgumentException("No command in context for 
txnId " + txnId);
-
-        Preconditions.checkState(command.isLoaded() || (command.isReadOnly() 
&& command.isPartiallyLoaded()));
-
-        return command;
-    }
-
-    @Override
-    public Command command(TxnId txnId)
-    {
-        AccordCommand command = getCommandInternal(txnId);
-        if (command.isEmpty())
-            command.initialize();
+        Invariants.checkState(command.isLoaded() || (command.isReadOnly() && 
command.isPartiallyLoaded()));
         return command;
     }
 
-    @Override
-    public Command ifPresent(TxnId txnId)
-    {
-        AccordCommand command = getCommandInternal(txnId);
-        return !command.isEmpty() ? command : null;
-    }
-
-    @Override
-    public Command ifLoaded(TxnId txnId)
-    {
-        AccordCommand command = commandCache.getOrNull(txnId);
-        if (command != null && command.isLoaded())
-        {
-            getContext().commands.add(command);
-            return command;
-        }
-        return null;
-    }
-
     public boolean isCommandsForKeyInContext(PartitionKey key)
     {
         return currentCtx.commandsForKey.get(key) != null;
@@ -230,102 +448,10 @@ public class AccordCommandStore extends CommandStore 
implements SafeCommandStore
         AccordCommandsForKey commandsForKey = 
currentCtx.commandsForKey.get((PartitionKey) key);
         if (commandsForKey == null)
             throw new IllegalArgumentException("No commandsForKey in context 
for key " + key);
-        Preconditions.checkState(commandsForKey.isLoaded());
+        Invariants.checkState(commandsForKey.isLoaded());
         return commandsForKey;
     }
 
-    @Override
-    public CommandsForKey commandsForKey(Key key)
-    {
-        AccordCommandsForKey commandsForKey = getCommandsForKeyInternal(key);
-        if (commandsForKey.isEmpty())
-            commandsForKey.initialize();
-        return commandsForKey;
-    }
-
-    @Override
-    public CommandsForKey maybeCommandsForKey(Key key)
-    {
-        AccordCommandsForKey commandsForKey = getCommandsForKeyInternal(key);
-        return !commandsForKey.isEmpty() ? commandsForKey : null;
-    }
-
-    @Override
-    public void addAndInvokeListener(TxnId txnId, CommandListener listener)
-    {
-        AccordCommand.WriteOnly command = (AccordCommand.WriteOnly) 
getContext().commands.getOrCreateWriteOnly(txnId, (ignore, id) -> new 
AccordCommand.WriteOnly(id), this);
-        command.addListener(listener);
-        execute(listener.listenerPreLoadContext(txnId), store -> {
-            listener.onChange(store, store.command(txnId));
-        });
-    }
-
-    public <T> T mapReduce(Routables<?, ?> keysOrRanges, Ranges slice, 
Function<CommandsForKey, T> map, BinaryOperator<T> reduce, T initialValue)
-    {
-        switch (keysOrRanges.kindOfContents()) {
-            default:
-                throw new AssertionError();
-            case Key:
-                // TODO: efficiency
-                AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) 
keysOrRanges;
-                return keys.stream()
-                           .filter(slice::contains)
-                           .filter(this::hashIntersects)
-                           .map(this::commandsForKey)
-                           .map(map)
-                           .reduce(initialValue, reduce);
-            case Range:
-                // TODO:
-                throw new UnsupportedOperationException();
-        }
-    }
-
-    public void forEach(Routables<?, ?> keysOrRanges, Ranges slice, 
Consumer<CommandsForKey> forEach)
-    {
-        switch (keysOrRanges.kindOfContents()) {
-            default:
-                throw new AssertionError();
-            case Key:
-                AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) 
keysOrRanges;
-                keys.forEach(slice, key -> {
-                    if (hashIntersects(key))
-                        forEach.accept(commandsForKey(key));
-                });
-                break;
-            case Range:
-                // TODO:
-                throw new UnsupportedOperationException();
-        }
-    }
-
-    public void forEach(Routable keyOrRange, Ranges slice, 
Consumer<CommandsForKey> forEach)
-    {
-        switch (keyOrRange.kind())
-        {
-            default: throw new AssertionError();
-            case Key:
-                Key key = (Key) keyOrRange;
-                if (slice.contains(key))
-                    forEach.accept(commandsForKey(key));
-                break;
-            case Range:
-                // TODO:
-                throw new UnsupportedOperationException();
-        }
-    }
-
-    @Override
-    public CommandStore commandStore()
-    {
-        return this;
-    }
-
-    @Override
-    public DataStore dataStore()
-    {
-        return dataStore;
-    }
-
     @Override
     public <T> Future<T> submit(PreLoadContext loadCtx, Function<? super 
SafeCommandStore, T> function)
     {
@@ -340,53 +466,6 @@ public class AccordCommandStore extends CommandStore 
implements SafeCommandStore
         return agent;
     }
 
-    @Override
-    public ProgressLog progressLog()
-    {
-        return progressLog;
-    }
-
-    @Override
-    public RangesForEpoch ranges()
-    {
-        return rangesForEpoch;
-    }
-
-    @Override
-    public long latestEpoch()
-    {
-        return time.epoch();
-    }
-
-    @Override
-    public Timestamp preaccept(TxnId txnId, Seekables<?, ?> keys)
-    {
-        Timestamp max = maxConflict(keys);
-        long epoch = latestEpoch();
-        if (txnId.compareTo(max) > 0 && txnId.epoch >= epoch && 
!agent.isExpired(txnId, time.now()))
-            return txnId;
-
-        return time.uniqueNow(max);
-    }
-
-    @Override
-    public NodeTimeService time()
-    {
-        return time;
-    }
-
-    public Timestamp maxConflict(Seekables<?, ?> keys)
-    {
-        // TODO: Seekables
-        // TODO: efficiency
-        return ((Keys)keys).stream()
-                   .map(this::maybeCommandsForKey)
-                   .filter(Objects::nonNull)
-                   .map(CommandsForKey::max)
-                   .max(Comparator.naturalOrder())
-                   .orElse(Timestamp.NONE);
-    }
-
     @Override
     public Future<Void> execute(PreLoadContext preLoadContext, Consumer<? 
super SafeCommandStore> consumer)
     {
@@ -411,58 +490,9 @@ public class AccordCommandStore extends CommandStore 
implements SafeCommandStore
         }
     }
 
-    public <T> T mapReduce(Routables<?, ?> keysOrRanges, 
Function<CommandsForKey, T> map, BinaryOperator<T> reduce, T initialValue) {
-        switch (keysOrRanges.kindOfContents()) {
-            default:
-                throw new AssertionError();
-            case Key:
-                AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) 
keysOrRanges;
-                return keys.stream()
-                           .filter(this::hashIntersects)
-                           .map(this::commandsForKey)
-                           .map(map)
-                           .reduce(initialValue, reduce);
-            case Range:
-                // TODO: implement
-                throw new UnsupportedOperationException();
-        }
-    }
-
-    public void forEach(Routables<?, ?> keysOrRanges, Consumer<CommandsForKey> 
forEach)
-    {
-        switch (keysOrRanges.kindOfContents()) {
-            default:
-                throw new AssertionError();
-            case Key:
-                AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) 
keysOrRanges;
-                keys.forEach(key -> {
-                    if (hashIntersects(key))
-                        forEach.accept(commandsForKey(key));
-                });
-                break;
-            case Range:
-                // TODO: implement
-                throw new UnsupportedOperationException();
-        }
-    }
-
-    public void forEach(Routable keyOrRange, Consumer<CommandsForKey> forEach)
-    {
-        switch (keyOrRange.kind())
-        {
-            default: throw new AssertionError();
-            case Key:
-                forEach.accept(commandsForKey((Key) keyOrRange));
-                break;
-            case Range:
-                // TODO: implement
-                throw new UnsupportedOperationException();
-        }
-    }
-
     @Override
     public void shutdown()
     {
-        // executors are shutdown by AccordCommandStores
+        executor.shutdown();
     }
 }
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
index 24b77bcc5c..14dd0851c2 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
@@ -18,50 +18,50 @@
 
 package org.apache.cassandra.service.accord;
 
-import java.util.concurrent.ExecutorService;
-
 import accord.api.Agent;
 import accord.api.DataStore;
 import accord.api.ProgressLog;
 import accord.local.AsyncCommandStores;
-import accord.local.CommandStore;
-import accord.local.Node;
 import accord.local.NodeTimeService;
-import org.apache.cassandra.concurrent.ExecutorFactory;
-import org.apache.cassandra.utils.ExecutorUtils;
+import accord.local.ShardDistributor;
+import accord.topology.Topology;
+import 
org.apache.cassandra.service.accord.AccordCommandStore.SafeAccordCommandStore;
 
 public class AccordCommandStores extends AsyncCommandStores
 {
-    private final ExecutorService[] executors;
+    private long cacheSize;
+    AccordCommandStores(NodeTimeService time, Agent agent, DataStore store,
+                        ShardDistributor shardDistributor, ProgressLog.Factory 
progressLogFactory)
+    {
+        super(time, agent, store, shardDistributor, progressLogFactory, 
AccordCommandStore::new);
+        setCacheSize(maxCacheSize());
+    }
 
-    public AccordCommandStores(int numShards, Node node, Agent agent, 
DataStore store,
-                               ProgressLog.Factory progressLogFactory)
+    synchronized void setCacheSize(long bytes)
     {
-        this(numShards, node, agent, store, progressLogFactory, 
executors(node, numShards));
+        cacheSize = bytes;
+        refreshCacheSizes();
     }
 
-    private AccordCommandStores(int numShards, NodeTimeService time, Agent 
agent, DataStore store,
-                                ProgressLog.Factory progressLogFactory, 
ExecutorService[] executors)
+    synchronized void refreshCacheSizes()
     {
-        super(numShards, time, agent, store, progressLogFactory,
-              (id, generation, index, numShards1, time1, agent1, store1, 
progressLogFactory1, rangesForEpoch)
-                -> new AccordCommandStore(id, generation, index, numShards1, 
time1, agent1, store1, progressLogFactory1, rangesForEpoch, executors[index]));
-        this.executors = executors;
+        if (count() == 0)
+            return;
+        long perStore = cacheSize / count();
+        // TODO (low priority, safety): we might transiently breach our limit 
if we increase one store before decreasing another
+        forEach(commandStore -> ((SafeAccordCommandStore) 
commandStore).commandStore().setCacheSize(perStore));
     }
 
-    private static ExecutorService[] executors(Node node, int count)
+    private static long maxCacheSize()
     {
-        ExecutorService[] executors = new ExecutorService[count];
-        for (int i=0; i<count; i++)
-        {
-            executors[i] = 
ExecutorFactory.Global.executorFactory().sequential(CommandStore.class.getSimpleName()
 + '[' + node + ':' + i + ']');
-        }
-        return executors;
+        return 5 << 20; // TODO (required): make configurable
     }
 
-    void setCacheSize(long bytes)
+    @Override
+    public synchronized void updateTopology(Topology newTopology)
     {
-        forEach(commandStore -> ((AccordCommandStore) 
commandStore).setCacheSize(bytes));
+        super.updateTopology(newTopology);
+        refreshCacheSizes();
     }
 
     @Override
@@ -69,6 +69,5 @@ public class AccordCommandStores extends AsyncCommandStores
     {
         super.shutdown();
         //TODO shutdown isn't useful by itself, we need a way to "wait" as 
well.  Should be AutoCloseable or offer awaitTermination as well (think 
Shutdownable interface)
-        ExecutorUtils.shutdown(executors);
     }
 }
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java 
b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index f3382e17cf..12da1ffc10 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -133,8 +133,7 @@ public class AccordKeyspace
         parse(COMMANDS,
               "accord commands",
               "CREATE TABLE %s ("
-              + "store_generation int,"
-              + "store_index int,"
+              + "store_id int,"
               + format("txn_id %s,", TIMESTAMP_TUPLE)
               + "status int,"
               + "home_key blob,"
@@ -154,7 +153,7 @@ public class AccordKeyspace
               + "listeners set<blob>, "
               + format("blocking_commit_on set<%s>, ", TIMESTAMP_TUPLE)
               + format("blocking_apply_on set<%s>, ", TIMESTAMP_TUPLE)
-              + "PRIMARY KEY((store_generation, store_index, txn_id))"
+              + "PRIMARY KEY((store_id, txn_id))"
               + ')');
 
     // TODO: naming is not very clearly distinct from the base serializers
@@ -208,8 +207,7 @@ public class AccordKeyspace
         parse(COMMANDS_FOR_KEY,
               "accord commands per key",
               "CREATE TABLE %s ("
-              + "store_generation int, "
-              + "store_index int, "
+              + "store_id int, "
               + format("key %s, ", KEY_TUPLE)
               + format("max_timestamp %s static, ", TIMESTAMP_TUPLE)
               + format("last_executed_timestamp %s static, ", TIMESTAMP_TUPLE)
@@ -219,7 +217,7 @@ public class AccordKeyspace
               + "series int, "
               + format("timestamp %s, ", TIMESTAMP_TUPLE)
               + "data blob, "
-              + "PRIMARY KEY((store_generation, store_index, key), series, 
timestamp)"
+              + "PRIMARY KEY((store_id, key), series, timestamp)"
               + ')');
 
     private static class CommandsForKeyColumns
@@ -504,8 +502,7 @@ public class AccordKeyspace
                                     timestampMicros, nowInSeconds, 
command.storedListeners,
                                     ListenerProxy::identifier);
             }
-            ByteBuffer key = 
CommandsColumns.keyComparator.make(commandStore.generation(),
-                                                                
commandStore.index(),
+            ByteBuffer key = 
CommandsColumns.keyComparator.make(commandStore.id(),
                                                                 
serializeTimestamp(command.txnId())).serializeAsPartitionKey();
             PartitionUpdate update = PartitionUpdate.singleRowUpdate(Commands, 
key, builder.build());
             return new Mutation(update);
@@ -563,13 +560,11 @@ public class AccordKeyspace
     public static UntypedResultSet loadCommandRow(CommandStore commandStore, 
TxnId txnId)
     {
         String cql = "SELECT * FROM %s.%s " +
-                     "WHERE store_generation=? " +
-                     "AND store_index=? " +
+                     "WHERE store_id = ? " +
                      "AND txn_id=(?, ?, ?, ?)";
 
         return executeOnceInternal(String.format(cql, ACCORD_KEYSPACE_NAME, 
COMMANDS),
-                                   commandStore.generation(),
-                                   commandStore.index(),
+                                   commandStore.id(),
                                    txnId.epoch, txnId.real, txnId.logical, 
txnId.node.id);
     }
 
@@ -650,8 +645,7 @@ public class AccordKeyspace
 
     private static DecoratedKey makeKey(CommandStore commandStore, 
PartitionKey key)
     {
-        ByteBuffer pk = 
CommandsForKeyColumns.keyComparator.make(commandStore.generation(),
-                                                                  
commandStore.index(),
+        ByteBuffer pk = 
CommandsForKeyColumns.keyComparator.make(commandStore.id(),
                                                                   
serializeKey(key)).serializeAsPartitionKey();
         return CommandsForKey.partitioner.decorateKey(pk);
     }
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java 
b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
index 983641d5a9..7f7a8a86e3 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
@@ -40,7 +40,6 @@ import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.primitives.Unseekables;
 import accord.primitives.Writes;
-import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.service.accord.api.AccordRoutingKey;
 import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
@@ -62,7 +61,7 @@ public class AccordObjectSizes
         return ((AccordRoutingKey) key).estimatedSizeOnHeap();
     }
 
-    private static final long EMPTY_KEY_RANGE_SIZE = 
ObjectSizes.measure(TokenRange.fullRange(TableId.generate()));
+    private static final long EMPTY_KEY_RANGE_SIZE = 
ObjectSizes.measure(TokenRange.fullRange(""));
     public static long range(Range range)
     {
         return EMPTY_KEY_RANGE_SIZE + key(range.start()) + key(range.end());
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordSerializers.java 
b/src/java/org/apache/cassandra/service/accord/AccordSerializers.java
index 3eb3b9705e..dc48d4bae7 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordSerializers.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordSerializers.java
@@ -168,7 +168,7 @@ public class AccordSerializers
         @Override
         public long serializedSize(TableMetadata metadata, int version)
         {
-            return TableId.serializedSize();
+            return metadata.id.serializedSize();
         }
     };
 }
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index fcc8a8cebc..0e99689d5a 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -31,6 +31,7 @@ import accord.coordinate.Timeout;
 import accord.impl.SimpleProgressLog;
 import accord.impl.SizeOfIntersectionSorter;
 import accord.local.Node;
+import accord.local.ShardDistributor.EvenSplit;
 import accord.messages.Request;
 import accord.primitives.Txn;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -41,6 +42,7 @@ import org.apache.cassandra.concurrent.Shutdownable;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.service.accord.api.AccordAgent;
+import 
org.apache.cassandra.service.accord.api.AccordRoutingKey.KeyspaceSplitter;
 import org.apache.cassandra.service.accord.api.AccordScheduler;
 import org.apache.cassandra.service.accord.txn.TxnData;
 import org.apache.cassandra.utils.Clock;
@@ -49,6 +51,9 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.Future;
 import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
+import static 
org.apache.cassandra.config.DatabaseDescriptor.getConcurrentAccordOps;
+import static org.apache.cassandra.config.DatabaseDescriptor.getPartitioner;
+
 public class AccordService implements Shutdownable
 {
     public final Node node;
@@ -84,6 +89,7 @@ public class AccordService implements Shutdownable
                              configService,
                              AccordService::uniqueNow,
                              () -> null,
+                             new KeyspaceSplitter(new 
EvenSplit<>(getConcurrentAccordOps(), getPartitioner().accordSplitter())),
                              new AccordAgent(),
                              new Random(),
                              scheduler,
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordTopologyUtils.java 
b/src/java/org/apache/cassandra/service/accord/AccordTopologyUtils.java
index 22bd631e31..bc6ec7e7b6 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordTopologyUtils.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordTopologyUtils.java
@@ -25,8 +25,6 @@ import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import com.google.common.base.Preconditions;
-
 import accord.topology.Shard;
 import accord.topology.Topology;
 import org.apache.cassandra.db.Keyspace;
@@ -35,9 +33,6 @@ import 
org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.EndpointsForToken;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.schema.Schema;
-import org.apache.cassandra.schema.SchemaConstants;
-import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.accord.api.AccordRoutingKey.SentinelKey;
 import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
@@ -52,26 +47,23 @@ public class AccordTopologyUtils
                          
pending.stream().map(EndpointMapping::getId).collect(Collectors.toSet()));
     }
 
-    private static TokenRange minRange(TableId tableId, Token token)
+    private static TokenRange minRange(String keyspace, Token token)
     {
-        return new TokenRange(SentinelKey.min(tableId), new TokenKey(tableId, 
token));
+        return new TokenRange(SentinelKey.min(keyspace), new 
TokenKey(keyspace, token));
     }
 
-    private static TokenRange maxRange(TableId tableId, Token token)
+    private static TokenRange maxRange(String keyspace, Token token)
     {
-        return new TokenRange(new TokenKey(tableId, token), 
SentinelKey.max(tableId));
+        return new TokenRange(new TokenKey(keyspace, token), 
SentinelKey.max(keyspace));
     }
 
-    private static TokenRange range(TableId tableId, Token left, Token right)
+    private static TokenRange range(String keyspace, Token left, Token right)
     {
-        return new TokenRange(new TokenKey(tableId, left), new 
TokenKey(tableId, right));
+        return new TokenRange(new TokenKey(keyspace, left), new 
TokenKey(keyspace, right));
     }
 
-    public static List<Shard> createShards(TableMetadata tableMetadata, 
TokenMetadata tokenMetadata)
+    public static List<Shard> createShards(String keyspace, TokenMetadata 
tokenMetadata)
     {
-        TableId tableId = tableMetadata.id;
-        String keyspace = tableMetadata.keyspace;
-
         AbstractReplicationStrategy replication = 
Keyspace.open(keyspace).getReplicationStrategy();
         Set<Token> tokenSet = new HashSet<>(tokenMetadata.sortedTokens());
         tokenSet.addAll(tokenMetadata.getBootstrapTokens().keySet());
@@ -88,13 +80,13 @@ public class AccordTopologyUtils
             EndpointsForToken pending = 
tokenMetadata.pendingEndpointsForToken(token, keyspace);
             if (i == 0)
             {
-                shards.add(createShard(minRange(tableId, token), natural, 
pending));
-                finalShard = createShard(maxRange(tableId, tokens.get(mi-1)), 
natural, pending);
+                shards.add(createShard(minRange(keyspace, token), natural, 
pending));
+                finalShard = createShard(maxRange(keyspace, tokens.get(mi-1)), 
natural, pending);
             }
             else
             {
                 Token prev = tokens.get(i - 1);
-                shards.add(createShard(range(tableId, prev, token), natural, 
pending));
+                shards.add(createShard(range(keyspace, prev, token), natural, 
pending));
             }
         }
         shards.add(finalShard);
@@ -104,31 +96,11 @@ public class AccordTopologyUtils
 
     public static Topology createTopology(long epoch)
     {
-        List<TableId> tableIds = new ArrayList<>();
         TokenMetadata tokenMetadata = 
StorageService.instance.getTokenMetadata();
-        for (String ksname: Schema.instance.getKeyspaces())
-        {
-            // TODO: add a table metadata flag to enable and enforce accord use
-            if (SchemaConstants.LOCAL_SYSTEM_KEYSPACE_NAMES.contains(ksname))
-                continue;
-            if 
(SchemaConstants.REPLICATED_SYSTEM_KEYSPACE_NAMES.contains(ksname))
-                continue;
-
-            Keyspace keyspace = Keyspace.open(ksname);
-            for (TableMetadata tableMetadata : keyspace.getMetadata().tables)
-            {
-                tableIds.add(tableMetadata.id);
-            }
-        }
-
-        tableIds.sort(Comparator.naturalOrder());
-
         List<Shard> shards = new ArrayList<>();
-        for (TableId tableId : tableIds)
+        for (String keyspace : Schema.instance.distributedKeyspaces().names())
         {
-            TableMetadata tableMetadata = 
Schema.instance.getTableMetadata(tableId);
-            Preconditions.checkNotNull(tableMetadata);
-            shards.addAll(createShards(tableMetadata, tokenMetadata));
+            shards.addAll(createShards(keyspace, tokenMetadata));
         }
 
         return new Topology(epoch, shards.toArray(new Shard[0]));
diff --git a/src/java/org/apache/cassandra/service/accord/ListenerProxy.java 
b/src/java/org/apache/cassandra/service/accord/ListenerProxy.java
index bd9061d350..ea7a74c0c6 100644
--- a/src/java/org/apache/cassandra/service/accord/ListenerProxy.java
+++ b/src/java/org/apache/cassandra/service/accord/ListenerProxy.java
@@ -34,6 +34,7 @@ import accord.primitives.Keys;
 import accord.primitives.TxnId;
 import org.apache.cassandra.db.marshal.ByteBufferAccessor;
 import org.apache.cassandra.db.marshal.ValueAccessor;
+import 
org.apache.cassandra.service.accord.AccordCommandStore.SafeAccordCommandStore;
 import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.service.accord.async.AsyncContext;
 import org.apache.cassandra.service.accord.serializers.CommandSerializers;
@@ -128,8 +129,8 @@ public abstract class ListenerProxy implements 
CommandListener, Comparable<Liste
         public void onChange(SafeCommandStore safeStore, Command c)
         {
             AccordCommand command = (AccordCommand) c;
-            AccordCommandStore commandStore = (AccordCommandStore) safeStore;
-            AsyncContext context = commandStore.getContext();
+            SafeAccordCommandStore commandStore = (SafeAccordCommandStore) 
safeStore;
+            AsyncContext context = commandStore.context();
             PreLoadContext loadCtx = 
PreLoadContext.contextFor(ImmutableList.of(command.txnId(), txnId), Keys.EMPTY);
             if (context.containsScopedItems(loadCtx))
             {
@@ -228,8 +229,8 @@ public abstract class ListenerProxy implements 
CommandListener, Comparable<Liste
         public void onChange(SafeCommandStore safeStore, Command c)
         {
             AccordCommand command = (AccordCommand) c;
-            AccordCommandStore commandStore = (AccordCommandStore) safeStore;
-            AsyncContext context = commandStore.getContext();
+            SafeAccordCommandStore commandStore = (SafeAccordCommandStore) 
safeStore;
+            AsyncContext context = commandStore.context();
             PreLoadContext loadCtx = 
PreLoadContext.contextFor(ImmutableList.of(command.txnId()), Keys.of(key));
             if (context.containsScopedItems(loadCtx))
             {
diff --git a/src/java/org/apache/cassandra/service/accord/TokenRange.java 
b/src/java/org/apache/cassandra/service/accord/TokenRange.java
index 7fb1ca8f34..81cb329f59 100644
--- a/src/java/org/apache/cassandra/service/accord/TokenRange.java
+++ b/src/java/org/apache/cassandra/service/accord/TokenRange.java
@@ -25,7 +25,6 @@ import accord.primitives.Range;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.accord.api.AccordRoutingKey;
 import org.apache.cassandra.service.accord.api.AccordRoutingKey.SentinelKey;
 
@@ -36,9 +35,9 @@ public class TokenRange extends Range.EndInclusive
         super(start, end);
     }
 
-    public static TokenRange fullRange(TableId tableId)
+    public static TokenRange fullRange(String keyspace)
     {
-        return new TokenRange(SentinelKey.min(tableId), 
SentinelKey.max(tableId));
+        return new TokenRange(SentinelKey.min(keyspace), 
SentinelKey.max(keyspace));
     }
 
     @Override
diff --git 
a/src/java/org/apache/cassandra/service/accord/api/AccordRoutableKey.java 
b/src/java/org/apache/cassandra/service/accord/api/AccordRoutableKey.java
index f4066e9c1c..d19f832ace 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordRoutableKey.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordRoutableKey.java
@@ -22,32 +22,28 @@ import java.util.Objects;
 
 import accord.primitives.RoutableKey;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey.SentinelKey;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
 
 public abstract class AccordRoutableKey implements RoutableKey
 {
-    final TableId tableId;
+    final String keyspace; // TODO (desired): use an id (TrM)
 
-    protected AccordRoutableKey(TableId tableId)
+    protected AccordRoutableKey(String keyspace)
     {
-        this.tableId = tableId;
+        this.keyspace = keyspace;
     }
 
-    public final TableId tableId() { return tableId; }
+    public final String keyspace() { return keyspace; }
     public abstract Token token();
 
-    @Override
-    public final int routingHash()
-    {
-        return token().tokenHash();
-    }
-
     @Override
     public int hashCode()
     {
-        return Objects.hash(tableId, routingHash());
+        return Objects.hash(keyspace, token().tokenHash());
     }
 
+    @Override
     public final int compareTo(RoutableKey that)
     {
         return compareTo((AccordRoutableKey) that);
@@ -55,18 +51,24 @@ public abstract class AccordRoutableKey implements 
RoutableKey
 
     public final int compareTo(AccordRoutableKey that)
     {
-        int cmp = this.tableId().compareTo(that.tableId());
+        int cmp = this.keyspace().compareTo(that.keyspace());
         if (cmp != 0)
             return cmp;
 
-        if (this instanceof AccordRoutingKey.SentinelKey || that instanceof 
AccordRoutingKey.SentinelKey)
+        if (this.getClass() == SentinelKey.class || that.getClass() == 
SentinelKey.class)
         {
-            int leftInt = this instanceof AccordRoutingKey.SentinelKey ? 
((AccordRoutingKey.SentinelKey) this).asInt() : 0;
-            int rightInt = that instanceof AccordRoutingKey.SentinelKey ? 
((AccordRoutingKey.SentinelKey) that).asInt() : 0;
+            int leftInt = this.getClass() == SentinelKey.class ? 
((SentinelKey) this).asInt() : 0;
+            int rightInt = that.getClass() == SentinelKey.class ? 
((SentinelKey) that).asInt() : 0;
             return Integer.compare(leftInt, rightInt);
         }
 
-        return this.token().compareTo(that.token());
+        cmp = this.token().compareTo(that.token());
+        if (cmp != 0)
+            return cmp;
+
+        if (this.getClass() == TokenKey.class)
+            return that.getClass() == TokenKey.class ? 0 : 1;
+        return that.getClass() == TokenKey.class ? -1 : 
((PartitionKey)this).tableId.compareTo(((PartitionKey)that).tableId);
     }
 
     @Override
diff --git 
a/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java 
b/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
index e9c600ae4a..8a763f5e61 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
@@ -19,23 +19,29 @@
 package org.apache.cassandra.service.accord.api;
 
 import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.TreeMap;
 
 import accord.api.Key;
 import accord.api.RoutingKey;
-import org.apache.cassandra.config.DatabaseDescriptor;
+import accord.local.ShardDistributor;
+import accord.primitives.Range;
+import accord.primitives.Ranges;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.schema.Schema;
-import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.ObjectSizes;
 
+import static org.apache.cassandra.config.DatabaseDescriptor.getPartitioner;
+
 public abstract class AccordRoutingKey extends AccordRoutableKey implements 
RoutingKey
 {
     enum RoutingKeyKind
@@ -43,9 +49,9 @@ public abstract class AccordRoutingKey extends 
AccordRoutableKey implements Rout
         TOKEN, SENTINEL
     }
 
-    protected AccordRoutingKey(TableId tableId)
+    protected AccordRoutingKey(String keyspace)
     {
-        super(tableId);
+        super(keyspace);
     }
 
     public abstract RoutingKeyKind kindOfRoutingKey();
@@ -56,22 +62,23 @@ public abstract class AccordRoutingKey extends 
AccordRoutableKey implements Rout
         return (AccordRoutingKey) key;
     }
 
-    public static class SentinelKey extends AccordRoutingKey
+    // final in part because we refer to its class directly in 
AccordRoutableKey.compareTo
+    public static final class SentinelKey extends AccordRoutingKey
     {
         private static final long EMPTY_SIZE = ObjectSizes.measure(new 
SentinelKey(null, true));
 
         private final boolean isMin;
 
-        private SentinelKey(TableId tableId, boolean isMin)
+        private SentinelKey(String keyspace, boolean isMin)
         {
-            super(tableId);
+            super(keyspace);
             this.isMin = isMin;
         }
 
         @Override
         public int hashCode()
         {
-            return Objects.hash(tableId, isMin);
+            return Objects.hash(keyspace, isMin);
         }
 
         @Override
@@ -86,20 +93,20 @@ public abstract class AccordRoutingKey extends 
AccordRoutableKey implements Rout
             return EMPTY_SIZE;
         }
 
-        public static SentinelKey min(TableId tableId)
+        public static SentinelKey min(String keyspace)
         {
-            return new SentinelKey(tableId, true);
+            return new SentinelKey(keyspace, true);
         }
 
-        public static SentinelKey max(TableId tableId)
+        public static SentinelKey max(String keyspace)
         {
-            return new SentinelKey(tableId, false);
+            return new SentinelKey(keyspace, false);
         }
 
         public TokenKey toTokenKey()
         {
-            IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
-            return new TokenKey(tableId, isMin ?
+            IPartitioner partitioner = getPartitioner();
+            return new TokenKey(keyspace, isMin ?
                                          
partitioner.getMinimumToken().increaseSlightly() :
                                          
partitioner.getMaximumToken().decreaseSlightly());
         }
@@ -119,7 +126,7 @@ public abstract class AccordRoutingKey extends 
AccordRoutableKey implements Rout
         public String toString()
         {
             return "SentinelKey{" +
-                   "tableId=" + tableId +
+                   "keyspace=" + keyspace +
                    ", key=" + (isMin ? "min": "max") +
                    '}';
         }
@@ -130,39 +137,40 @@ public abstract class AccordRoutingKey extends 
AccordRoutableKey implements Rout
             public void serialize(SentinelKey key, DataOutputPlus out, int 
version) throws IOException
             {
                 out.writeBoolean(key.isMin);
-                key.tableId().serialize(out);
+                out.writeUTF(key.keyspace);
             }
 
             @Override
             public SentinelKey deserialize(DataInputPlus in, int version) 
throws IOException
             {
                 boolean isMin = in.readBoolean();
-                TableId tableId = TableId.deserialize(in);
-                return new SentinelKey(tableId, isMin);
+                String keyspace = in.readUTF();
+                return new SentinelKey(keyspace, isMin);
             }
 
             @Override
             public long serializedSize(SentinelKey key, int version)
             {
-                return TypeSizes.BOOL_SIZE + TableId.serializedSize();
+                return TypeSizes.BOOL_SIZE + TypeSizes.sizeof(key.keyspace);
             }
         };
     }
 
-    public static class TokenKey extends AccordRoutingKey
+    // final in part because we refer to its class directly in 
AccordRoutableKey.compareToe
+    public static final class TokenKey extends AccordRoutingKey
     {
         private static final long EMPTY_SIZE;
 
         static
         {
-            Token key = 
DatabaseDescriptor.getPartitioner().decorateKey(ByteBufferUtil.EMPTY_BYTE_BUFFER).getToken();
+            Token key = 
getPartitioner().decorateKey(ByteBufferUtil.EMPTY_BYTE_BUFFER).getToken();
             EMPTY_SIZE = ObjectSizes.measureDeep(new TokenKey(null, key));
         }
 
         final Token token;
-        public TokenKey(TableId tableId, Token token)
+        public TokenKey(String keyspace, Token token)
         {
-            super(tableId);
+            super(keyspace);
             this.token = token;
         }
 
@@ -182,7 +190,7 @@ public abstract class AccordRoutingKey extends 
AccordRoutableKey implements Rout
         public String toString()
         {
             return "TokenKey{" +
-                   "tableId=" + tableId() +
+                   "keyspace=" + keyspace() +
                    ", key=" + token() +
                    '}';
         }
@@ -200,24 +208,22 @@ public abstract class AccordRoutingKey extends 
AccordRoutableKey implements Rout
             @Override
             public void serialize(TokenKey key, DataOutputPlus out, int 
version) throws IOException
             {
-                key.tableId().serialize(out);
+                out.writeUTF(key.keyspace);
                 Token.compactSerializer.serialize(key.token, out, version);
             }
 
             @Override
             public TokenKey deserialize(DataInputPlus in, int version) throws 
IOException
             {
-                TableId tableId = TableId.deserialize(in);
-                TableMetadata metadata = 
Schema.instance.getTableMetadata(tableId);
-                // TODO: metadata might be null here if the table was dropped?
-                Token token = Token.compactSerializer.deserialize(in, 
metadata.partitioner, version);
-                return new TokenKey(tableId, token);
+                String keyspace = in.readUTF();
+                Token token = Token.compactSerializer.deserialize(in, 
getPartitioner(), version);
+                return new TokenKey(keyspace, token);
             }
 
             @Override
             public long serializedSize(TokenKey key, int version)
             {
-                return TableId.serializedSize() + 
Token.compactSerializer.serializedSize(key.token(), version);
+                return TypeSizes.sizeof(key.keyspace) + 
Token.compactSerializer.serializedSize(key.token(), version);
             }
         }
     }
@@ -275,4 +281,37 @@ public abstract class AccordRoutingKey extends 
AccordRoutableKey implements Rout
             return size;
         }
     };
+
+    public static class KeyspaceSplitter implements ShardDistributor
+    {
+        final EvenSplit<BigInteger> subSplitter;
+        public KeyspaceSplitter(EvenSplit<BigInteger> subSplitter)
+        {
+            this.subSplitter = subSplitter;
+        }
+
+        @Override
+        public List<Ranges> split(Ranges ranges)
+        {
+            Map<String, List<Range>> byKeyspace = new TreeMap<>();
+            for (Range range : ranges)
+            {
+                
byKeyspace.computeIfAbsent(((AccordRoutableKey)range.start()).keyspace, ignore 
-> new ArrayList<>())
+                          .add(range);
+            }
+
+            List<Ranges> results = new ArrayList<>();
+            for (List<Range> keyspaceRanges : byKeyspace.values())
+            {
+                List<Ranges> splits = 
subSplitter.split(Ranges.ofSortedAndDeoverlapped(keyspaceRanges.toArray(new 
Range[0])));
+
+                for (int i = 0; i < splits.size(); i++)
+                {
+                    if (i == results.size()) results.add(Ranges.EMPTY);
+                    results.set(i, results.get(i).union(splits.get(i)));
+                }
+            }
+            return results;
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java 
b/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java
index 18296d4adc..13e5439802 100644
--- a/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java
+++ b/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.service.accord.api;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Objects;
 
 import com.google.common.base.Preconditions;
 
@@ -45,21 +44,24 @@ import 
org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.ObjectSizes;
 
-public class PartitionKey extends AccordRoutableKey implements Key
+// final in part because we refer to its class directly in 
AccordRoutableKey.compareTo
+public final class PartitionKey extends AccordRoutableKey implements Key
 {
     private static final long EMPTY_SIZE;
 
     static
     {
         DecoratedKey key = 
DatabaseDescriptor.getPartitioner().decorateKey(ByteBufferUtil.EMPTY_BYTE_BUFFER);
-        EMPTY_SIZE = ObjectSizes.measureDeep(new PartitionKey(null, key));
+        EMPTY_SIZE = ObjectSizes.measureDeep(new PartitionKey(null, null, 
key));
     }
 
+    final TableId tableId; // TODO (expected): move to PartitionKey
     final DecoratedKey key;
 
-    public PartitionKey(TableId tableId, DecoratedKey key)
+    public PartitionKey(String keyspace, TableId tableId, DecoratedKey key)
     {
-        super(tableId);
+        super(keyspace);
+        this.tableId = tableId;
         this.key = key;
     }
 
@@ -70,14 +72,16 @@ public class PartitionKey extends AccordRoutableKey 
implements Key
 
     public static PartitionKey of(Partition partition)
     {
-        return new PartitionKey(partition.metadata().id, 
partition.partitionKey());
+        return new PartitionKey(partition.metadata().keyspace, 
partition.metadata().id, partition.partitionKey());
     }
 
     public static PartitionKey of(SinglePartitionReadCommand command)
     {
-        return new PartitionKey(command.metadata().id, command.partitionKey());
+        return new PartitionKey(command.metadata().keyspace, 
command.metadata().id, command.partitionKey());
     }
 
+    public final TableId tableId() { return tableId; }
+
     @Override
     public Token token()
     {
@@ -92,7 +96,7 @@ public class PartitionKey extends AccordRoutableKey 
implements Key
     @Override
     public RoutingKey toUnseekable()
     {
-        return new TokenKey(tableId(), token());
+        return new TokenKey(keyspace, token());
     }
 
     public long estimatedSizeOnHeap()
@@ -147,20 +151,20 @@ public class PartitionKey extends AccordRoutableKey 
implements Key
             TableId tableId = TableId.deserialize(in);
             TableMetadata metadata = 
Schema.instance.getExistingTableMetadata(tableId);
             DecoratedKey key = 
metadata.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in));
-            return new PartitionKey(tableId, key);
+            return new PartitionKey(metadata.keyspace, tableId, key);
         }
 
         public <V> PartitionKey deserialize(V src, ValueAccessor<V> accessor, 
int offset) throws IOException
         {
             TableId tableId = TableId.deserialize(src, accessor, offset);
-            offset += TableId.serializedSize();
+            offset += tableId.serializedSize();
             TableMetadata metadata = Schema.instance.getTableMetadata(tableId);
             int numBytes = accessor.getShort(src, offset);
             offset += TypeSizes.SHORT_SIZE;
             ByteBuffer bytes = ByteBuffer.allocate(numBytes);
             accessor.copyTo(src, offset, bytes, ByteBufferAccessor.instance, 
0, numBytes);
             DecoratedKey key = metadata.partitioner.decorateKey(bytes);
-            return new PartitionKey(tableId, key);
+            return new PartitionKey(metadata.keyspace, tableId, key);
         }
 
         @Override
diff --git 
a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java 
b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
index d52fcb1439..f03eab2f0d 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
@@ -33,6 +33,7 @@ import accord.local.SafeCommandStore;
 import accord.primitives.Seekables;
 import accord.primitives.TxnId;
 import org.apache.cassandra.service.accord.AccordCommandStore;
+import 
org.apache.cassandra.service.accord.AccordCommandStore.SafeAccordCommandStore;
 import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.utils.concurrent.AsyncPromise;
 
@@ -140,6 +141,7 @@ public abstract class AsyncOperation<R> extends 
AsyncPromise<R> implements Runna
 
     protected void runInternal()
     {
+        SafeAccordCommandStore safeStore = commandStore.safeStore(context);
         switch (state)
         {
             case INITIALIZED:
@@ -149,7 +151,7 @@ public abstract class AsyncOperation<R> extends 
AsyncPromise<R> implements Runna
                     return;
 
                 state = State.RUNNING;
-                result = apply(commandStore);
+                result = apply(safeStore);
 
                 state = State.SAVING;
             case SAVING:
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnData.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnData.java
index 55f1400075..20b56b4c9c 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnData.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnData.java
@@ -24,8 +24,6 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 
-import com.google.common.base.Preconditions;
-
 import accord.api.Data;
 import accord.api.Result;
 import org.apache.cassandra.db.TypeSizes;
@@ -131,20 +129,16 @@ public class TxnData implements Data, Result, 
Iterable<FilteredPartition>
         public void serialize(FilteredPartition partition, DataOutputPlus out, 
int version) throws IOException
         {
             partition.metadata().id.serialize(out);
-            TableMetadata metadata = 
Schema.instance.getTableMetadata(partition.metadata().id);
-
             try (UnfilteredRowIterator iterator = 
partition.unfilteredIterator())
             {
-                // TODO: Will metadata be null if we've dropped a table?
-                UnfilteredRowIteratorSerializer.serializer.serialize(iterator, 
ColumnFilter.all(metadata), out, version, partition.rowCount());
+                UnfilteredRowIteratorSerializer.serializer.serialize(iterator, 
ColumnFilter.all(partition.metadata()), out, version, partition.rowCount());
             }
         }
 
         @Override
         public FilteredPartition deserialize(DataInputPlus in, int version) 
throws IOException
         {
-            TableMetadata metadata = 
Schema.instance.getTableMetadata(TableId.deserialize(in));
-            Preconditions.checkState(metadata != null);
+            TableMetadata metadata = 
Schema.instance.getExistingTableMetadata(TableId.deserialize(in));
             try (UnfilteredRowIterator partition = 
UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, metadata, 
ColumnFilter.all(metadata), DeserializationHelper.Flag.FROM_REMOTE))
             {
                 return new 
FilteredPartition(UnfilteredRowIterators.filter(partition, 0));
@@ -154,12 +148,11 @@ public class TxnData implements Data, Result, 
Iterable<FilteredPartition>
         @Override
         public long serializedSize(FilteredPartition partition, int version)
         {
-            long size = TableId.serializedSize();
-            TableMetadata metadata = 
Schema.instance.getTableMetadata(partition.metadata().id);
-            Preconditions.checkState(metadata != null);
+            TableId tableId = partition.metadata().id;
+            long size = tableId.serializedSize();
             try (UnfilteredRowIterator iterator = 
partition.unfilteredIterator())
             {
-                return size + 
UnfilteredRowIteratorSerializer.serializer.serializedSize(iterator, 
ColumnFilter.all(metadata), version, partition.rowCount());
+                return size + 
UnfilteredRowIteratorSerializer.serializer.serializedSize(iterator, 
ColumnFilter.all(partition.metadata()), version, partition.rowCount());
             }
         }
     };
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
index 089e1cda34..fe98121f3f 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
@@ -58,7 +58,7 @@ public class TxnNamedRead extends 
AbstractSerialized<ReadCommand>
     {
         super(value);
         this.name = name;
-        this.key = new PartitionKey(value.metadata().id, value.partitionKey());
+        this.key = new PartitionKey(value.metadata().keyspace, 
value.metadata().id, value.partitionKey());
     }
 
     private TxnNamedRead(TxnDataName name, PartitionKey key, ByteBuffer bytes)
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
index 7b26b2acdd..0eb3e781aa 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
@@ -161,7 +161,7 @@ public class TxnUpdate implements Update
             else if (ByteBufferUtil.compareUnsigned(left[l], right[r]) != 0) { 
throw new IllegalStateException("The same keys have different values in each 
input"); }
             else { out[o++] = left[l++]; r++; }
         }
-        while (l < leftKeys.size()) { out[o++] = left[l]; }
+        while (l < leftKeys.size()) { out[o++] = left[l++]; }
         while (r < rightKeys.size()) { out[o++] = right[r++]; }
         return out;
     }
diff --git a/test/unit/org/apache/cassandra/dht/ByteOrderedPartitionerTest.java 
b/test/unit/org/apache/cassandra/dht/ByteOrderedPartitionerTest.java
index f40c284b0e..e4ff1bc66e 100644
--- a/test/unit/org/apache/cassandra/dht/ByteOrderedPartitionerTest.java
+++ b/test/unit/org/apache/cassandra/dht/ByteOrderedPartitionerTest.java
@@ -17,6 +17,12 @@
  */
 package org.apache.cassandra.dht;
 
+import java.util.Arrays;
+
+import org.junit.Assert;
+
+import org.apache.cassandra.dht.ByteOrderedPartitioner.BytesToken;
+
 public class ByteOrderedPartitionerTest extends PartitionerTestCase
 {
     public void initPartitioner()
@@ -28,4 +34,14 @@ public class ByteOrderedPartitionerTest extends 
PartitionerTestCase
     {
         return false;
     }
+
+    @Override
+    protected void checkRoundTrip(Token original, Token roundTrip)
+    {
+        BytesToken orig = (BytesToken) original;
+        BytesToken rt = (BytesToken) roundTrip;
+        Assert.assertArrayEquals(orig.token, Arrays.copyOf(rt.token, 
orig.token.length));
+        for (int i = orig.token.length ; i < rt.token.length ; ++i)
+            Assert.assertEquals((byte)0, rt.token[i]);
+    }
 }
diff --git a/test/unit/org/apache/cassandra/dht/LengthPartitioner.java 
b/test/unit/org/apache/cassandra/dht/LengthPartitioner.java
index ca6504ced8..626d8e0c41 100644
--- a/test/unit/org/apache/cassandra/dht/LengthPartitioner.java
+++ b/test/unit/org/apache/cassandra/dht/LengthPartitioner.java
@@ -21,7 +21,9 @@ import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
 
+import accord.primitives.Ranges;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.db.BufferDecoratedKey;
@@ -37,7 +39,7 @@ import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.bytecomparable.ByteComparable;
 import org.apache.cassandra.utils.bytecomparable.ByteSource;
 
-public class LengthPartitioner implements IPartitioner
+public class LengthPartitioner extends AccordSplitter implements IPartitioner
 {
     public static final BigInteger ZERO = new BigInteger("0");
     public static final BigIntegerToken MINIMUM = new BigIntegerToken("-1");
@@ -179,4 +181,34 @@ public class LengthPartitioner implements IPartitioner
     {
         return new PartitionerDefinedOrder(this);
     }
+
+    @Override
+    public Function<Ranges, AccordSplitter> accordSplitter()
+    {
+        return ignore -> this;
+    }
+
+    @Override
+    BigInteger valueForToken(Token token)
+    {
+        return ((BigIntegerToken)token).token;
+    }
+
+    @Override
+    Token tokenForValue(BigInteger value)
+    {
+        return new BigIntegerToken(value);
+    }
+
+    @Override
+    BigInteger minimumValue()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    BigInteger maximumValue()
+    {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git 
a/test/unit/org/apache/cassandra/dht/OrderPreservingPartitionerTest.java 
b/test/unit/org/apache/cassandra/dht/OrderPreservingPartitionerTest.java
index 6ab5b456b3..f8e65bd89f 100644
--- a/test/unit/org/apache/cassandra/dht/OrderPreservingPartitionerTest.java
+++ b/test/unit/org/apache/cassandra/dht/OrderPreservingPartitionerTest.java
@@ -18,11 +18,16 @@
 package org.apache.cassandra.dht;
 
 import java.io.IOException;
+import java.math.BigInteger;
 
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.dht.OrderPreservingPartitioner.StringToken;
+import org.apache.cassandra.service.accord.TokenRange;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey;
 
 public class OrderPreservingPartitionerTest extends PartitionerTestCase
 {
@@ -44,15 +49,12 @@ public class OrderPreservingPartitionerTest extends 
PartitionerTestCase
         return false;
     }
 
-    @Test
-    public void testCompare()
+    @Override
+    protected void checkRoundTrip(Token original, Token roundTrip)
     {
-        assert tok("").compareTo(tok("asdf")) < 0;
-        assert tok("asdf").compareTo(tok("")) > 0;
-        assert tok("").compareTo(tok("")) == 0;
-        assert tok("z").compareTo(tok("a")) > 0;
-        assert tok("a").compareTo(tok("z")) < 0;
-        assert tok("asdf").compareTo(tok("asdf")) == 0;
-        assert tok("asdz").compareTo(tok("asdf")) > 0;
+        StringToken orig = (StringToken) original;
+        StringToken rt = (StringToken) roundTrip;
+        Assert.assertEquals(orig.token, rt.token.substring(0, 
orig.token.length()));
+        
Assert.assertTrue(rt.token.substring(orig.token.length()).matches("\0*"));
     }
 }
diff --git a/test/unit/org/apache/cassandra/dht/PartitionerTestCase.java 
b/test/unit/org/apache/cassandra/dht/PartitionerTestCase.java
index ec535b0d6a..a4346b8a0c 100644
--- a/test/unit/org/apache/cassandra/dht/PartitionerTestCase.java
+++ b/test/unit/org/apache/cassandra/dht/PartitionerTestCase.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.dht;
 
+import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -24,13 +25,18 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import accord.primitives.Ranges;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.accord.TokenRange;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -215,4 +221,113 @@ public abstract class PartitionerTestCase
             totalOwnership += ownership;
         assertEquals(1.0, totalOwnership, 0.001);
     }
+
+    @Test
+    public void testCompare()
+    {
+        if (!partitioner.preservesOrder())
+            return;
+
+        assert tok("").compareTo(tok("asdf")) < 0;
+        assert tok("asdf").compareTo(tok("")) > 0;
+        assert tok("").compareTo(tok("")) == 0;
+        assert tok("z").compareTo(tok("a")) > 0;
+        assert tok("a").compareTo(tok("z")) < 0;
+        assert tok("asdf").compareTo(tok("asdf")) == 0;
+        assert tok("asdz").compareTo(tok("asdf")) > 0;
+    }
+
+    @Test
+    public void testCompareSplitter()
+    {
+        for (int i = 0 ; i < 16 ; ++i)
+        {
+            Token a = partitioner.getRandomToken(), b = 
partitioner.getRandomToken();
+            while (a.equals(b))
+                b = partitioner.getRandomToken();
+            if (a.compareTo(b) > 0) { Token tmp = a; a = b; b = tmp; }
+            testCompareSplitter(a, b);
+        }
+
+        if (!partitioner.preservesOrder())
+            return;
+
+        testCompareSplitter(tok(""), tok("asdf"));
+        testCompareSplitter(tok(""), tok(""));
+        testCompareSplitter(tok("a"), tok("z"));
+        testCompareSplitter(tok("asdf"), tok("asdf"));
+        testCompareSplitter(tok("asd"), tok("asdf"));
+        testCompareSplitter(tok("asdf"), tok("asf"));
+        testCompareSplitter(tok("asdf"), tok("asdz"));
+    }
+
+    @Test
+    public void testSplitter()
+    {
+        for (int i = 0 ; i < 1024 ; ++i)
+        {
+            Token a = partitioner.getRandomToken(), b = 
partitioner.getRandomToken();
+            while (a.equals(b))
+                b = partitioner.getRandomToken();
+            if (a.compareTo(b) > 0) { Token tmp = a; a = b; b = tmp; }
+            testSplitter(a, b);
+        }
+
+        if (!partitioner.preservesOrder())
+            return;
+
+        testSplitter(tok(""), tok("asdf"));
+        testSplitter(tok("a"), tok("z"));
+        testSplitter(tok("asd"), tok("asdf"));
+        testSplitter(tok("asdf"), tok("asdz"));
+    }
+
+    void testCompareSplitter(Token less, Token more)
+    {
+        Ranges ranges;
+        if (less.equals(more) && less.isMinimum())
+            ranges = Ranges.EMPTY;
+        else if (less.equals(more))
+            ranges = Ranges.of(new TokenRange(new TokenKey("", 
partitioner.getMinimumToken()), new TokenKey("", less)));
+        else
+            ranges = Ranges.of(new TokenRange(new TokenKey("", less), new 
TokenKey("", more)));
+
+        AccordSplitter splitter = partitioner.accordSplitter().apply(ranges);
+        BigInteger lv = splitter.valueForToken(less);
+        BigInteger rv = splitter.valueForToken(more);
+        Assert.assertEquals(less.equals(more) ? 0 : -1, 
normaliseCompare(lv.compareTo(rv)));
+        Assert.assertEquals(less.equals(more) ? 0 : 1, 
normaliseCompare(rv.compareTo(lv)));
+        checkRoundTrip(less, splitter.tokenForValue(lv));
+        checkRoundTrip(more, splitter.tokenForValue(rv));
+    }
+
+    void testSplitter(Token start, Token end)
+    {
+        accord.primitives.Range range = new TokenRange(new TokenKey("", 
start), new TokenKey("", end));
+        AccordSplitter splitter = 
partitioner.accordSplitter().apply(Ranges.of(range));
+        if (!start.isMinimum())
+            testSplitter(new TokenRange(new TokenKey("", 
partitioner.getMinimumToken()), new TokenKey("", start)));
+        testSplitter(new TokenRange(new TokenKey("", start), new TokenKey("", 
splitter.tokenForValue(splitter.maximumValue()))));
+        checkRoundTrip(start, 
splitter.tokenForValue(splitter.valueForToken(start)));
+        checkRoundTrip(end, 
splitter.tokenForValue(splitter.valueForToken(end)));
+    }
+
+    void testSplitter(accord.primitives.Range range)
+    {
+        AccordSplitter splitter = 
partitioner.accordSplitter().apply(Ranges.of(range));
+        BigInteger size = splitter.sizeOf(range);
+        Assert.assertEquals(range, splitter.subRange(range, BigInteger.ZERO, 
size));
+    }
+
+    protected void checkRoundTrip(Token original, Token roundTrip)
+    {
+        Assert.assertEquals(original, roundTrip);
+    }
+
+    static int normaliseCompare(int compareResult)
+    {
+        if (compareResult < 0) return -1;
+        if (compareResult > 0) return 1;
+        return 0;
+    }
 }
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
index 67683661e1..fa16a474f8 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
@@ -49,6 +49,7 @@ import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.StorageService;
+import 
org.apache.cassandra.service.accord.AccordCommandStore.SafeAccordCommandStore;
 import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -75,7 +76,7 @@ public class AccordCommandTest
     private static PartitionKey key(int k)
     {
         TableMetadata metadata = Schema.instance.getTableMetadata("ks", "tbl");
-        return new PartitionKey(metadata.id, 
metadata.partitioner.decorateKey(ByteBufferUtil.bytes(k)));
+        return new PartitionKey(metadata.keyspace, metadata.id, 
metadata.partitioner.decorateKey(ByteBufferUtil.bytes(k)));
     }
 
     /**
@@ -85,7 +86,7 @@ public class AccordCommandTest
     public void basicCycleTest() throws ExecutionException, 
InterruptedException
     {
         AccordCommandStore commandStore = 
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
-        commandStore.execute(PreLoadContext.empty(), instance -> { 
((AccordCommandStore) instance).setCacheSize(0); }).get();
+        commandStore.execute(PreLoadContext.empty(), instance -> { 
((SafeAccordCommandStore) instance).commandStore().setCacheSize(0); }).get();
 
 
         TxnId txnId = txnId(1, clock.incrementAndGet(), 0, 1);
@@ -167,7 +168,7 @@ public class AccordCommandTest
     public void computeDeps() throws Throwable
     {
         AccordCommandStore commandStore = 
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
-        commandStore.execute(PreLoadContext.empty(), instance -> { 
((AccordCommandStore) instance).setCacheSize(0); }).get();
+        commandStore.execute(PreLoadContext.empty(), instance -> { 
((SafeAccordCommandStore) instance).commandStore().setCacheSize(0); }).get();
 
         TxnId txnId1 = txnId(1, clock.incrementAndGet(), 0, 1);
         Txn txn = createTxn(2);
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java 
b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
index 20142c439b..b835e0fbf5 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
@@ -18,14 +18,9 @@
 
 package org.apache.cassandra.service.accord;
 
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.function.LongSupplier;
 
 import javax.annotation.Nullable;
@@ -41,7 +36,7 @@ import accord.api.RoutingKey;
 import accord.api.Write;
 import accord.impl.InMemoryCommandStore;
 import accord.local.Command;
-import accord.local.CommandStore;
+import accord.local.CommandStores;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.local.NodeTimeService;
@@ -51,7 +46,6 @@ import accord.primitives.Ballot;
 import accord.primitives.Ranges;
 import accord.primitives.Keys;
 import accord.primitives.PartialTxn;
-import accord.primitives.Range;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
@@ -63,7 +57,6 @@ import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.statements.TransactionStatement;
 import org.apache.cassandra.schema.Schema;
-import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.accord.api.AccordAgent;
@@ -97,23 +90,6 @@ public class AccordTestUtils
         @Override public void waiting(TxnId blockedBy, Known blockedUntil, 
Unseekables<?, ?> blockedOn) {}
     };
 
-    public static Topology simpleTopology(TableId... tables)
-    {
-        Arrays.sort(tables, Comparator.naturalOrder());
-        Id node = localNodeId();
-        Shard[] shards = new Shard[tables.length];
-
-        List<Id> nodes = Lists.newArrayList(node);
-        Set<Id> fastPath = Sets.newHashSet(node);
-        for (int i=0; i<tables.length; i++)
-        {
-            Range range = TokenRange.fullRange(tables[i]);
-            shards[i] = new Shard(range, nodes, fastPath, 
Collections.emptySet());
-        }
-
-        return new Topology(1, shards);
-    }
-
     public static TxnId txnId(long epoch, long real, int logical, long node)
     {
         return new TxnId(epoch, real, logical, new Node.Id(node));
@@ -143,7 +119,7 @@ public class AccordTestUtils
                                 .map(key -> {
                                     try
                                     {
-                                        return read.read(key, command.kind(), 
commandStore, command.executeAt(), null).get();
+                                        return read.read(key, command.kind(), 
instance, command.executeAt(), null).get();
                                     }
                                     catch (InterruptedException e)
                                     {
@@ -202,8 +178,8 @@ public class AccordTestUtils
 
     public static Ranges fullRange(Txn txn)
     {
-        TableId tableId = ((PartitionKey)txn.keys().get(0)).tableId();
-        return Ranges.of(TokenRange.fullRange(tableId));
+        PartitionKey key = (PartitionKey) txn.keys().get(0);
+        return Ranges.of(TokenRange.fullRange(key.keyspace()));
     }
 
     public static PartialTxn createPartialTxn(int key)
@@ -213,46 +189,21 @@ public class AccordTestUtils
         return new PartialTxn.InMemory(ranges, txn.kind(), txn.keys(), 
txn.read(), txn.query(), txn.update());
     }
 
-    private static class SingleEpochRanges implements 
CommandStore.RangesForEpoch
+    private static class SingleEpochRanges extends 
CommandStores.RangesForEpochHolder
     {
         private final Ranges ranges;
 
         public SingleEpochRanges(Ranges ranges)
         {
             this.ranges = ranges;
-        }
-
-        @Override
-        public Ranges at(long epoch)
-        {
-            assert epoch == 1;
-            return ranges;
-        }
-
-        @Override
-        public Ranges between(long fromInclusive, long toInclusive)
-        {
-            return ranges;
-        }
-
-        @Override
-        public Ranges since(long epoch)
-        {
-            assert epoch == 1;
-            return ranges;
-        }
-
-        @Override
-        public boolean owns(long epoch, RoutingKey key)
-        {
-            return ranges.contains(key);
+            this.current = new CommandStores.RangesForEpoch(1, ranges);
         }
     }
 
     public static InMemoryCommandStore.Synchronized 
createInMemoryCommandStore(LongSupplier now, String keyspace, String table)
     {
         TableMetadata metadata = Schema.instance.getTableMetadata(keyspace, 
table);
-        TokenRange range = TokenRange.fullRange(metadata.id);
+        TokenRange range = TokenRange.fullRange(metadata.keyspace);
         Node.Id node = 
EndpointMapping.endpointToId(FBUtilities.getBroadcastAddressAndPort());
         Topology topology = new Topology(1, new Shard(range, 
Lists.newArrayList(node), Sets.newHashSet(node), Collections.emptySet()));
         NodeTimeService time = new NodeTimeService()
@@ -262,7 +213,7 @@ public class AccordTestUtils
             @Override public long now() {return now.getAsLong(); }
             @Override public Timestamp uniqueNow(Timestamp atLeast) { return 
new Timestamp(1, now.getAsLong(), 0, node); }
         };
-        return new InMemoryCommandStore.Synchronized(0, 0, 1, 8,
+        return new InMemoryCommandStore.Synchronized(0,
                                                      time,
                                                      new AccordAgent(),
                                                      null,
@@ -272,11 +223,6 @@ public class AccordTestUtils
 
     public static AccordCommandStore createAccordCommandStore(Node.Id node, 
LongSupplier now, Topology topology)
     {
-        ExecutorService executor = Executors.newSingleThreadExecutor(r -> {
-            Thread thread = new Thread(r);
-            thread.setName(CommandStore.class.getSimpleName() + '[' + node + 
':' + 0 + ']');
-            return thread;
-        });
         NodeTimeService time = new NodeTimeService()
         {
             @Override public Id id() { return node;}
@@ -284,21 +230,22 @@ public class AccordTestUtils
             @Override public long now() {return now.getAsLong(); }
             @Override public Timestamp uniqueNow(Timestamp atLeast) { return 
new Timestamp(1, now.getAsLong(), 0, node); }
         };
-        return new AccordCommandStore(0, 0, 0, 1,
+        return new AccordCommandStore(0,
                                       time,
                                       new AccordAgent(),
                                       null,
                                       cs -> NOOP_PROGRESS_LOG,
-                                      new 
SingleEpochRanges(topology.rangesForNode(node)),
-                                      executor);
+                                      new 
SingleEpochRanges(topology.rangesForNode(node)));
     }
     public static AccordCommandStore createAccordCommandStore(LongSupplier 
now, String keyspace, String table)
     {
         TableMetadata metadata = Schema.instance.getTableMetadata(keyspace, 
table);
-        TokenRange range = TokenRange.fullRange(metadata.id);
+        TokenRange range = TokenRange.fullRange(metadata.keyspace);
         Node.Id node = 
EndpointMapping.endpointToId(FBUtilities.getBroadcastAddressAndPort());
         Topology topology = new Topology(1, new Shard(range, 
Lists.newArrayList(node), Sets.newHashSet(node), Collections.emptySet()));
-        return createAccordCommandStore(node, now, topology);
+        AccordCommandStore store = createAccordCommandStore(node, now, 
topology);
+        store.execute(PreLoadContext.empty(), safeStore -> 
((AccordCommandStore)safeStore.commandStore()).setCacheSize(1 << 20));
+        return store;
     }
 
     public static void execute(AccordCommandStore commandStore, Runnable 
runnable)
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordTopologyTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordTopologyTest.java
index a7a015a7c1..8b34803d6a 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordTopologyTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordTopologyTest.java
@@ -62,10 +62,10 @@ public class AccordTopologyTest
         Token maxToken = partitioner.getMaximumToken();
 
 //        topology.forKey(new AccordKey.TokenKey(tableId, 
minToken.minKeyBound()));
-        topology.forKey(new PartitionKey(tableId, new 
BufferDecoratedKey(minToken, ByteBufferUtil.bytes(0))).toUnseekable());
+        topology.forKey(new PartitionKey("ks", tableId, new 
BufferDecoratedKey(minToken, ByteBufferUtil.bytes(0))).toUnseekable());
 //        topology.forKey(new AccordKey.TokenKey(tableId, 
minToken.maxKeyBound()));
 //        topology.forKey(new AccordKey.TokenKey(tableId, 
maxToken.minKeyBound()));
-        topology.forKey(new PartitionKey(tableId, new 
BufferDecoratedKey(maxToken, ByteBufferUtil.bytes(0))).toUnseekable());
+        topology.forKey(new PartitionKey("ks", tableId, new 
BufferDecoratedKey(maxToken, ByteBufferUtil.bytes(0))).toUnseekable());
 //        topology.forKey(new AccordKey.TokenKey(tableId, 
maxToken.maxKeyBound()));
     }
 }
diff --git 
a/test/unit/org/apache/cassandra/service/accord/api/AccordKeyTest.java 
b/test/unit/org/apache/cassandra/service/accord/api/AccordKeyTest.java
index f520a6bb6c..64feaedc98 100644
--- a/test/unit/org/apache/cassandra/service/accord/api/AccordKeyTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/api/AccordKeyTest.java
@@ -59,7 +59,7 @@ public class AccordKeyTest
     public void partitionKeyTest()
     {
         DecoratedKey dk = 
partitioner(TABLE1).decorateKey(ByteBufferUtil.bytes(5));
-        PartitionKey pk = new PartitionKey(TABLE1, dk);
+        PartitionKey pk = new PartitionKey("ks", TABLE1, dk);
         SerializerTestUtils.assertSerializerIOEquality(pk, 
PartitionKey.serializer);
     }
 
@@ -67,7 +67,7 @@ public class AccordKeyTest
     public void tokenKeyTest()
     {
         DecoratedKey dk = 
partitioner(TABLE1).decorateKey(ByteBufferUtil.bytes(5));
-        TokenKey pk = new TokenKey(TABLE1, dk.getToken());
+        TokenKey pk = new TokenKey("", dk.getToken());
         SerializerTestUtils.assertSerializerIOEquality(pk, 
TokenKey.serializer);
     }
 
@@ -75,12 +75,12 @@ public class AccordKeyTest
     public void comparisonTest()
     {
         DecoratedKey dk = 
partitioner(TABLE1).decorateKey(ByteBufferUtil.bytes(5));
-        PartitionKey pk = new PartitionKey(TABLE1, dk);
-        TokenKey tk = new TokenKey(TABLE1, dk.getToken());
-        TokenKey tkLow = new TokenKey(TABLE1, 
dk.getToken().decreaseSlightly());
-        TokenKey tkHigh = new TokenKey(TABLE1, 
dk.getToken().increaseSlightly());
+        PartitionKey pk = new PartitionKey("", TABLE1, dk);
+        TokenKey tk = new TokenKey("", dk.getToken());
+        TokenKey tkLow = new TokenKey("", dk.getToken().decreaseSlightly());
+        TokenKey tkHigh = new TokenKey("", dk.getToken().increaseSlightly());
 
-        Assert.assertTrue(tk.compareTo(pk) == 0);
+        Assert.assertTrue(tk.compareTo(pk) > 0);
         Assert.assertTrue(tkLow.compareTo(pk) < 0);
         Assert.assertTrue(pk.compareTo(tkHigh) < 0);
     }
@@ -91,10 +91,22 @@ public class AccordKeyTest
         Assert.assertTrue(TABLE1.compareTo(TABLE2) < 0);
 
         DecoratedKey dk1 = 
partitioner(TABLE1).decorateKey(ByteBufferUtil.bytes(5));
-        PartitionKey pk1 = new PartitionKey(TABLE1, dk1);
+        PartitionKey pk1 = new PartitionKey("", TABLE1, dk1);
 
         DecoratedKey dk2 = 
partitioner(TABLE2).decorateKey(ByteBufferUtil.bytes(5));
-        PartitionKey pk2 = new PartitionKey(TABLE2, dk2);
+        PartitionKey pk2 = new PartitionKey("", TABLE2, dk2);
+
+        Assert.assertTrue(pk1.compareTo(pk2) < 0);
+    }
+
+    @Test
+    public void keyspaceComparisonTest()
+    {
+        DecoratedKey dk1 = 
partitioner(TABLE1).decorateKey(ByteBufferUtil.bytes(5));
+        PartitionKey pk1 = new PartitionKey("a", TABLE1, dk1);
+
+        DecoratedKey dk2 = 
partitioner(TABLE1).decorateKey(ByteBufferUtil.bytes(5));
+        PartitionKey pk2 = new PartitionKey("b", TABLE1, dk2);
 
         Assert.assertTrue(pk1.compareTo(pk2) < 0);
     }
@@ -104,16 +116,18 @@ public class AccordKeyTest
     {
         Assert.assertTrue(TABLE1.compareTo(TABLE2) < 0);
         DecoratedKey dk1 = 
partitioner(TABLE1).decorateKey(ByteBufferUtil.bytes(5));
-        PartitionKey pk1 = new PartitionKey(TABLE1, dk1);
+        PartitionKey pk1 = new PartitionKey("a", TABLE1, dk1);
 
         DecoratedKey dk2 = 
partitioner(TABLE2).decorateKey(ByteBufferUtil.bytes(5));
-        PartitionKey pk2 = new PartitionKey(TABLE2, dk2);
+        PartitionKey pk2 = new PartitionKey("b", TABLE2, dk2);
 
-        SentinelKey loSentinel = SentinelKey.min(TABLE1);
-        SentinelKey hiSentinel = SentinelKey.max(TABLE1);
+        SentinelKey loSentinel = SentinelKey.min("a");
+        SentinelKey hiSentinel = SentinelKey.max("a");
         Assert.assertTrue(loSentinel.compareTo(hiSentinel) < 0);
         Assert.assertTrue(pk1.compareTo(loSentinel) > 0);
         Assert.assertTrue(loSentinel.compareTo(pk1) < 0);
+        Assert.assertTrue(pk1.compareTo(hiSentinel) < 0);
+        Assert.assertTrue(hiSentinel.compareTo(pk1) > 0);
         Assert.assertTrue(hiSentinel.compareTo(pk2) < 0);
     }
 }
diff --git 
a/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java 
b/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
index 30edd5f8af..f12943bbb5 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
@@ -109,8 +109,8 @@ public class AsyncOperationTest
         Txn txn = createTxn((int)clock.incrementAndGet());
         PartitionKey key = (PartitionKey) Iterables.getOnlyElement(txn.keys());
 
-        commandStore.execute(contextFor(Collections.emptyList(), 
Keys.of(key)),instance -> {
-            CommandsForKey cfk = commandStore.maybeCommandsForKey(key);
+        commandStore.execute(contextFor(Collections.emptyList(), 
Keys.of(key)), instance -> {
+            CommandsForKey cfk = instance.maybeCommandsForKey(key);
             Assert.assertNull(cfk);
         }).get();
 
diff --git 
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandSerializersTest.java
 
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandSerializersTest.java
index 3a4435b8c6..34fda94017 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandSerializersTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandSerializersTest.java
@@ -26,7 +26,6 @@ import accord.primitives.Ranges;
 import accord.primitives.Txn;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.accord.AccordTestUtils;
 import org.apache.cassandra.service.accord.TokenRange;
 import org.apache.cassandra.service.accord.api.PartitionKey;
@@ -55,8 +54,8 @@ public class CommandSerializersTest
                                                  "    INSERT INTO ks.tbl (k, 
c, v) VALUES (0, 0, 1);\n" +
                                                  "  END IF\n" +
                                                  "COMMIT TRANSACTION");
-        TableId tableId = ((PartitionKey) txn.keys().get(0)).tableId();
-        PartialTxn expected = 
txn.slice(Ranges.of(TokenRange.fullRange(tableId)), true);
+        PartitionKey key = (PartitionKey) txn.keys().get(0);
+        PartialTxn expected = 
txn.slice(Ranges.of(TokenRange.fullRange(key.keyspace())), true);
         SerializerTestUtils.assertSerializerIOEquality(expected, 
CommandSerializers.partialTxn);
     }
 }
diff --git 
a/test/unit/org/apache/cassandra/service/accord/txn/AbstractKeySortedTest.java 
b/test/unit/org/apache/cassandra/service/accord/txn/AbstractKeySortedTest.java
index 001206d858..890c760b0d 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/txn/AbstractKeySortedTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/txn/AbstractKeySortedTest.java
@@ -114,7 +114,7 @@ public class AbstractKeySortedTest
     private static PartitionKey key(int k)
     {
         DecoratedKey dk = 
ByteOrderedPartitioner.instance.decorateKey(ByteBufferUtil.bytes(k));
-        return new PartitionKey(TABLE1, dk);
+        return new PartitionKey("", TABLE1, dk);
     }
 
     private static Item item(int k, int v)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to