This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 3c37d9908dbfd64acc428738e1f9fe0a735294a4
Author: Blake Eggleston <bl...@ultrablake.com>
AuthorDate: Fri Sep 20 13:12:20 2024 -0700

    CEP-15 (C*): Read accord repair cfk keys from sstable index.
    
    Patch by Blake Eggleston; Reviewed by David Capwell for CASSANDRA-19920
---
 .../db/marshal/AbstractCompositeType.java          |   5 +
 .../dht/LocalCompositePrefixPartitioner.java       | 341 ++++++++++++++++++++
 .../org/apache/cassandra/dht/LocalPartitioner.java |   2 +-
 .../dht/ReversedLongLocalPartitioner.java          |   4 +-
 .../apache/cassandra/io/sstable/KeyIterator.java   |  35 ++-
 .../cassandra/io/sstable/format/SSTableReader.java |   4 +-
 .../io/sstable/format/big/BigTableReader.java      |  26 +-
 .../io/sstable/format/bti/BtiTableReader.java      |  30 +-
 .../apache/cassandra/service/StorageService.java   |   4 +-
 .../cassandra/service/StorageServiceMBean.java     |   2 +-
 .../cassandra/service/accord/AccordKeyspace.java   | 350 ++++++++-------------
 .../service/accord/repair/AccordRepair.java        |   3 +-
 .../tools/nodetool/ConsensusMigrationAdmin.java    |   2 +-
 .../org/apache/cassandra/utils/MergeIterator.java  |  17 +
 .../io/sstable/format/ForwardingSSTableReader.java |   6 +
 .../cassandra/db/marshal/AbstractTypeTest.java     |   9 +
 .../org/apache/cassandra/dht/IPartitionerTest.java |  57 ++++
 .../dht/LocalCompositePrefixPartitionerTest.java   | 115 +++++++
 .../service/accord/AccordKeyspaceTest.java         |  24 +-
 .../apache/cassandra/utils/AccordGenerators.java   |   2 +-
 .../cassandra/utils/CassandraGenerators.java       |  81 ++++-
 .../cassandra/utils/CassandraGeneratorsTest.java   |  44 +++
 22 files changed, 884 insertions(+), 279 deletions(-)

diff --git 
a/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java 
b/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
index dfa7496ea6..44e2158078 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
@@ -88,6 +88,11 @@ public abstract class AbstractCompositeType extends 
AbstractType<ByteBuffer>
             ++i;
         }
 
+        return compareCustomRemainder(left, accessorL, offsetL, right, 
accessorR, offsetR);
+    }
+
+    protected  <VL, VR> int compareCustomRemainder(VL left, ValueAccessor<VL> 
accessorL, int offsetL, VR right, ValueAccessor<VR> accessorR, int offsetR)
+    {
         if (accessorL.isEmptyFromOffset(left, offsetL))
             return accessorR.sizeFromOffset(right, offsetR) == 0 ? 0 : -1;
 
diff --git 
a/src/java/org/apache/cassandra/dht/LocalCompositePrefixPartitioner.java 
b/src/java/org/apache/cassandra/dht/LocalCompositePrefixPartitioner.java
new file mode 100644
index 0000000000..c0a056e043
--- /dev/null
+++ b/src/java/org/apache/cassandra/dht/LocalCompositePrefixPartitioner.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import accord.utils.Invariants;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.lifecycle.View;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.ValueAccessor;
+import org.apache.cassandra.db.memtable.Memtable;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.io.sstable.SSTableReadsListener;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.utils.MergeIterator;
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+import org.apache.cassandra.utils.bytecomparable.ByteSource;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Local partitioner that supports doing range scans of composite primary keys 
using composite prefixes using the iterator
+ * methods it provides. This is neccessary for correctly handling exclusive 
start and inclusive end prefixes, since
+ * these won't work as intended given normal byte/component comparisons
+ */
+public class LocalCompositePrefixPartitioner extends LocalPartitioner
+{
+    /**
+     * Composite type that only compares
+     */
+    private static class PrefixCompositeType extends CompositeType
+    {
+        public PrefixCompositeType(List<AbstractType<?>> types)
+        {
+            super(types);
+        }
+
+        @Override
+        protected  <VL, VR> int compareCustomRemainder(VL left, 
ValueAccessor<VL> accessorL, int offsetL, VR right, ValueAccessor<VR> 
accessorR, int offsetR)
+        {
+            return 0;
+        }
+    }
+
+    public abstract class AbstractCompositePrefixToken extends LocalToken
+    {
+        public AbstractCompositePrefixToken(ByteBuffer token)
+        {
+            super(token);
+        }
+
+        @Override
+        public int compareTo(Token o)
+        {
+            Invariants.checkArgument(o instanceof 
AbstractCompositePrefixToken);
+            AbstractCompositePrefixToken that = (AbstractCompositePrefixToken) 
o;
+            CompositeType comparator = 
comparatorForPrefixLength(Math.min(this.prefixSize(), that.prefixSize()));
+            return comparator.compare(this.token, that.token);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean equals(Object obj)
+        {
+            if (!(obj instanceof AbstractCompositePrefixToken))
+                return false;
+            return compareTo((AbstractCompositePrefixToken) obj) == 0;
+        }
+
+        @Override
+        public ByteSource asComparableBytes(ByteComparable.Version version)
+        {
+            return 
comparatorForPrefixLength(prefixSize()).asComparableBytes(ByteBufferAccessor.instance,
 token, version);
+        }
+
+        ByteBuffer token()
+        {
+            return token;
+        }
+
+        abstract int prefixSize();
+    }
+
+    public class FullToken extends AbstractCompositePrefixToken
+    {
+
+        public FullToken(ByteBuffer token)
+        {
+            super(token);
+        }
+
+        @Override
+        int prefixSize()
+        {
+            return prefixComparators.size();
+        }
+    }
+
+    public class PrefixToken extends AbstractCompositePrefixToken
+    {
+        final int prefixSize;
+        public PrefixToken(ByteBuffer token, int prefixSize)
+        {
+            super(token);
+            Invariants.checkArgument(prefixSize > 0);
+            this.prefixSize = prefixSize;
+        }
+
+        @Override
+        int prefixSize()
+        {
+            return prefixSize;
+        }
+    }
+
+    private final Token.TokenFactory tokenFactory = new Token.TokenFactory()
+    {
+        @Override
+        public Token fromComparableBytes(ByteSource.Peekable comparableBytes, 
ByteComparable.Version version)
+        {
+            ByteBuffer tokenData = 
comparator.fromComparableBytes(ByteBufferAccessor.instance, comparableBytes, 
version);
+            return new FullToken(tokenData);
+        }
+
+        @Override
+        public ByteBuffer toByteArray(Token token)
+        {
+            return ((FullToken)token).token();
+        }
+
+        @Override
+        public Token fromByteArray(ByteBuffer bytes)
+        {
+            return new FullToken(bytes);
+        }
+
+        @Override
+        public String toString(Token token)
+        {
+            return comparator.getString(((FullToken)token).token());
+        }
+
+        @Override
+        public void validate(String token)
+        {
+            comparator.validate(comparator.fromString(token));
+        }
+
+        @Override
+        public Token fromString(String string)
+        {
+            return new FullToken(comparator.fromString(string));
+        }
+    };
+
+    private final List<CompositeType> prefixComparators;
+
+    public LocalCompositePrefixPartitioner(CompositeType comparator)
+    {
+        super(comparator);
+        ArrayList<CompositeType> comparators = new 
ArrayList<>(comparator.subTypes().size());
+        comparators.add(comparator);
+
+        List<AbstractType<?>> subtypes = comparator.subTypes();
+        subtypes = subtypes.subList(0, subtypes.size() - 1);
+        while (!subtypes.isEmpty())
+        {
+            comparators.add(new PrefixCompositeType(subtypes));
+            subtypes = subtypes.subList(0, subtypes.size() - 1);
+        }
+
+        prefixComparators = ImmutableList.copyOf(Lists.reverse(comparators));
+    }
+
+
+    @SuppressWarnings("rawtypes")
+    public LocalCompositePrefixPartitioner(AbstractType... types)
+    {
+        this(CompositeType.getInstance(types));
+    }
+
+    private CompositeType comparatorForPrefixLength(int size)
+    {
+        return prefixComparators.get(size - 1);
+    }
+
+    public ByteBuffer createPrefixKey(Object... values)
+    {
+        return comparatorForPrefixLength(values.length).decompose(values);
+    }
+
+    public AbstractCompositePrefixToken createPrefixToken(Object... values)
+    {
+        ByteBuffer key = createPrefixKey(values);
+        return values.length == prefixComparators.size() ? new FullToken(key) 
: new PrefixToken(key, values.length);
+    }
+
+    public DecoratedKey decoratedKey(Object... values)
+    {
+        Invariants.checkArgument(values.length == prefixComparators.size());
+        ByteBuffer key = createPrefixKey(values);
+        return decorateKey(key);
+    }
+
+
+    @Override
+    public LocalToken getToken(ByteBuffer key)
+    {
+        return new FullToken(key);
+    }
+
+    @Override
+    public LocalToken getMinimumToken()
+    {
+        return new FullToken(ByteBufferUtil.EMPTY_BYTE_BUFFER);
+    }
+
+    @Override
+    public Token.TokenFactory getTokenFactory()
+    {
+        return tokenFactory;
+    }
+
+
+    /**
+     * Returns a DecoratedKey iterator for the given range. Skips reading data 
files for sstable formats with a partition index file
+     */
+    private static CloseableIterator<DecoratedKey> keyIterator(Memtable 
memtable, AbstractBounds<PartitionPosition> range)
+    {
+
+        AbstractBounds<PartitionPosition> memtableRange = 
range.withNewRight(memtable.metadata().partitioner.getMinimumToken().maxKeyBound());
+        DataRange dataRange = new DataRange(memtableRange, new 
ClusteringIndexSliceFilter(Slices.ALL, false));
+        UnfilteredPartitionIterator iter = 
memtable.partitionIterator(ColumnFilter.NONE, dataRange, 
SSTableReadsListener.NOOP_LISTENER);
+
+        int rangeStartCmpMin = range.isStartInclusive() ? 0 : 1;
+        int rangeEndCmpMax = range.isEndInclusive() ? 0 : -1;
+
+        return new AbstractIterator<>()
+        {
+            @Override
+            protected DecoratedKey computeNext()
+            {
+                while (iter.hasNext())
+                {
+                    DecoratedKey key = iter.next().partitionKey();
+                    if (key.compareTo(range.left) < rangeStartCmpMin)
+                        continue;
+
+                    if (key.compareTo(range.right) > rangeEndCmpMax)
+                        break;
+
+                    return key;
+                }
+                return endOfData();
+            }
+
+            @Override
+            public void close()
+            {
+                iter.close();
+            }
+        };
+    }
+
+    public static CloseableIterator<DecoratedKey> keyIterator(TableMetadata 
metadata, AbstractBounds<PartitionPosition> range) throws IOException
+    {
+        ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata);
+        ColumnFamilyStore.ViewFragment view = 
cfs.select(View.selectLive(range));
+
+        List<CloseableIterator<?>> closeableIterators = new ArrayList<>();
+        List<Iterator<DecoratedKey>> iterators = new ArrayList<>();
+
+        try
+        {
+            for (Memtable memtable : view.memtables)
+            {
+                CloseableIterator<DecoratedKey> iter = keyIterator(memtable, 
range);
+                iterators.add(iter);
+                closeableIterators.add(iter);
+            }
+
+            for (SSTableReader sstable : view.sstables)
+            {
+                CloseableIterator<DecoratedKey> iter = 
sstable.keyIterator(range);
+                iterators.add(iter);
+                closeableIterators.add(iter);
+            }
+        }
+        catch (Throwable e)
+        {
+            for (CloseableIterator<?> iter: closeableIterators)
+            {
+                try
+                {
+                    iter.close();
+                }
+                catch (Throwable e2)
+                {
+                    e.addSuppressed(e2);
+                }
+            }
+            throw e;
+        }
+
+        return MergeIterator.get(iterators, DecoratedKey::compareTo, new 
MergeIterator.Reducer.Trivial<>());
+    }
+}
diff --git a/src/java/org/apache/cassandra/dht/LocalPartitioner.java 
b/src/java/org/apache/cassandra/dht/LocalPartitioner.java
index c2886fd539..0a1ede356b 100644
--- a/src/java/org/apache/cassandra/dht/LocalPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/LocalPartitioner.java
@@ -40,7 +40,7 @@ public class LocalPartitioner implements IPartitioner
 {
     private static final long EMPTY_SIZE = ObjectSizes.measure(new 
LocalPartitioner(null).new LocalToken());
 
-    final AbstractType<?> comparator;   // package-private to avoid access 
workarounds in embedded LocalToken.
+    protected final AbstractType<?> comparator;
 
     public LocalPartitioner(AbstractType<?> comparator)
     {
diff --git 
a/src/java/org/apache/cassandra/dht/ReversedLongLocalPartitioner.java 
b/src/java/org/apache/cassandra/dht/ReversedLongLocalPartitioner.java
index abee8868e7..f95f1e7764 100644
--- a/src/java/org/apache/cassandra/dht/ReversedLongLocalPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/ReversedLongLocalPartitioner.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.function.Function;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Longs;
 
 import accord.primitives.Ranges;
@@ -163,7 +164,8 @@ public class ReversedLongLocalPartitioner implements 
IPartitioner
         throw new UnsupportedOperationException("Accord is not supported by " 
+ getClass().getName());
     }
 
-    private static class ReversedLongLocalToken extends Token
+    @VisibleForTesting
+    public static class ReversedLongLocalToken extends Token
     {
         private final long token;
 
diff --git a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java 
b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
index dbe501f36e..c8c1709503 100644
--- a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
@@ -21,12 +21,15 @@ import java.io.IOException;
 import java.util.concurrent.locks.ReadWriteLock;
 
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.utils.AbstractIterator;
 import org.apache.cassandra.utils.CloseableIterator;
 
 public class KeyIterator extends AbstractIterator<DecoratedKey> implements 
CloseableIterator<DecoratedKey>
 {
+    private final AbstractBounds<PartitionPosition> bounds;
     private final IPartitioner partitioner;
     private final KeyReader it;
     private final ReadWriteLock fileAccessLock;
@@ -34,8 +37,9 @@ public class KeyIterator extends 
AbstractIterator<DecoratedKey> implements Close
 
     private boolean initialized = false;
 
-    public KeyIterator(KeyReader it, IPartitioner partitioner, long 
totalBytes, ReadWriteLock fileAccessLock)
+    public KeyIterator(AbstractBounds<PartitionPosition> bounds, KeyReader it, 
IPartitioner partitioner, long totalBytes, ReadWriteLock fileAccessLock)
     {
+        this.bounds = bounds;
         this.it = it;
         this.partitioner = partitioner;
         this.totalBytes = totalBytes;
@@ -48,19 +52,26 @@ public class KeyIterator extends 
AbstractIterator<DecoratedKey> implements Close
             fileAccessLock.readLock().lock();
         try
         {
-            if (!initialized)
+            while (true)
             {
-                initialized = true;
-                return it.isExhausted()
-                       ? endOfData()
-                       : partitioner.decorateKey(it.key());
-            }
-            else
-            {
-                return it.advance()
-                       ? partitioner.decorateKey(it.key())
-                       : endOfData();
+                if (!initialized)
+                {
+                    initialized = true;
+                    if (it.isExhausted())
+                        break;
+                }
+                else if (!it.advance())
+                    break;
+
+                DecoratedKey key = partitioner.decorateKey(it.key());
+                if (bounds == null || bounds.contains(key))
+                    return key;
+
+                if (key.compareTo(bounds.right) >= 0)
+                    break;
             }
+
+            return endOfData();
         }
         catch (IOException e)
         {
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 716d9a8157..cfbe73d810 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -825,9 +825,11 @@ public abstract class SSTableReader extends SSTable 
implements UnfilteredSource,
      */
     public KeyIterator keyIterator() throws IOException
     {
-        return new KeyIterator(keyReader(), getPartitioner(), 
uncompressedLength(), new ReentrantReadWriteLock());
+        return new KeyIterator(null, keyReader(), getPartitioner(), 
uncompressedLength(), new ReentrantReadWriteLock());
     }
 
+    public abstract KeyIterator keyIterator(AbstractBounds<PartitionPosition> 
range) throws IOException;
+
     /**
      * Finds and returns the first key beyond a given token in this SSTable or 
null if no such key exists.
      */
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
index b58dbc532e..b6038237bd 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@ -26,11 +26,14 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.utils.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,16 +52,6 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.AbstractRowIndexEntry;
-import org.apache.cassandra.io.sstable.CorruptSSTableException;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.Downsampling;
-import org.apache.cassandra.io.sstable.ISSTableScanner;
-import org.apache.cassandra.io.sstable.IVerifier;
-import org.apache.cassandra.io.sstable.IndexInfo;
-import org.apache.cassandra.io.sstable.KeyReader;
-import org.apache.cassandra.io.sstable.SSTable;
-import org.apache.cassandra.io.sstable.SSTableReadsListener;
 import org.apache.cassandra.io.sstable.SSTableReadsListener.SelectionReason;
 import org.apache.cassandra.io.sstable.SSTableReadsListener.SkippingReason;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -74,9 +67,6 @@ import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.FileHandle;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.IFilter;
-import org.apache.cassandra.utils.OutputHandler;
 
 import static 
org.apache.cassandra.utils.concurrent.SharedCloseable.sharedCopyOrNull;
 
@@ -157,6 +147,16 @@ public class BigTableReader extends 
SSTableReaderWithFilter implements IndexSumm
         return BigTableKeyReader.create(ifile, rowIndexEntrySerializer);
     }
 
+    @Override
+    public KeyIterator keyIterator(AbstractBounds<PartitionPosition> range) 
throws IOException
+    {
+
+        RandomAccessReader ifileReader = ifile.createReader();
+        ifileReader.seek(getIndexScanPosition(range.left));
+        BigTableKeyReader keyReader = BigTableKeyReader.create(ifileReader, 
rowIndexEntrySerializer);
+        return new KeyIterator(range, keyReader, getPartitioner(), 
uncompressedLength(), new ReentrantReadWriteLock());
+    }
+
     /**
      * Direct I/O SSTableScanner over an iterator of bounds.
      *
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReader.java
index c5571e7fbb..2ff3c2bc89 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReader.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -43,21 +44,14 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.CorruptSSTableException;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.ISSTableScanner;
-import org.apache.cassandra.io.sstable.IVerifier;
-import org.apache.cassandra.io.sstable.SSTable;
-import org.apache.cassandra.io.sstable.SSTableReadsListener;
+import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.SSTableReadsListener.SelectionReason;
 import org.apache.cassandra.io.sstable.SSTableReadsListener.SkippingReason;
 import org.apache.cassandra.io.sstable.format.SSTableReaderWithFilter;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.FileHandle;
 import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.IFilter;
-import org.apache.cassandra.utils.OutputHandler;
+import org.apache.cassandra.utils.*;
 
 import static org.apache.cassandra.io.sstable.format.SSTableReader.Operator.EQ;
 import static org.apache.cassandra.io.sstable.format.SSTableReader.Operator.GE;
@@ -487,6 +481,24 @@ public class BtiTableReader extends SSTableReaderWithFilter
         return BtiTableScanner.getScanner(this, columnFilter, dataRange, 
listener);
     }
 
+    @Override
+    public KeyIterator keyIterator(AbstractBounds<PartitionPosition> range)
+    {
+        PartitionIterator iter;
+        try
+        {
+            iter = PartitionIterator.create(partitionIndex, 
metadata().partitioner, rowIndexFile, dfile,
+                                            range.left, bounds.inclusiveLeft() 
? -1 : 0,
+                                            null, 0, descriptor.version);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+
+        return new KeyIterator(range, iter, metadata().partitioner, 
uncompressedLength(), new ReentrantReadWriteLock());
+    }
+
     @Override
     public IVerifier getVerifier(ColumnFamilyStore cfs, OutputHandler 
outputHandler, boolean isOffline, IVerifier.Options options)
     {
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index e8edb4d34e..97bab8ec5d 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1678,10 +1678,10 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
     public Integer finishConsensusMigration(@Nonnull String keyspace,
                                             @Nullable List<String> 
maybeTableNames,
                                             @Nullable String maybeRangesStr,
-                                            @Nonnull ConsensusMigrationTarget 
target)
+                                            @Nonnull String target)
     {
         
checkArgument(!keyspace.equals(SchemaConstants.METADATA_KEYSPACE_NAME));
-        return finishMigrationToConsensusProtocol(keyspace, 
Optional.ofNullable(maybeTableNames), Optional.ofNullable(maybeRangesStr), 
target);
+        return finishMigrationToConsensusProtocol(keyspace, 
Optional.ofNullable(maybeTableNames), Optional.ofNullable(maybeRangesStr), 
ConsensusMigrationTarget.valueOf(target));
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 8845a9850f..46d5de2a9e 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -1083,7 +1083,7 @@ public interface StorageServiceMBean extends 
NotificationEmitter
     Integer finishConsensusMigration(@Nonnull String keyspace,
                                      @Nullable List<String> maybeTableNames,
                                      @Nullable String maybeRangesStr,
-                                     @Nonnull ConsensusMigrationTarget target);
+                                     @Nonnull String target);
 
     String listConsensusMigrations(@Nullable Set<String> keyspaceNames, 
@Nullable Set<String> tableNames, @Nonnull String format);
 
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java 
b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index 8046b9d388..b32d50da86 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -21,7 +21,6 @@ package org.apache.cassandra.service.accord;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -30,7 +29,6 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -43,12 +41,10 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import accord.api.Key;
-import accord.local.cfk.CommandsForKey;
 import accord.impl.TimestampsForKey;
 import accord.local.Command;
 import accord.local.CommandStore;
@@ -58,17 +54,15 @@ import accord.local.RedundantBefore;
 import accord.local.SaveStatus;
 import accord.local.Status;
 import accord.local.Status.Durability;
+import accord.local.cfk.CommandsForKey;
 import accord.primitives.Ranges;
-import accord.primitives.Routable;
 import accord.primitives.Route;
 import accord.primitives.Timestamp;
-import accord.primitives.Txn;
 import accord.primitives.TxnId;
 import accord.topology.Topology;
 import accord.utils.Invariants;
 import accord.utils.ReducingRangeMap;
 import accord.utils.async.Observable;
-import org.apache.cassandra.concurrent.DebuggableTask;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.QueryOptions;
@@ -86,9 +80,11 @@ import org.apache.cassandra.db.IMutation;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.LivenessInfo;
 import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.db.ReadExecutionController;
 import org.apache.cassandra.db.RegularAndStaticColumns;
 import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.WriteContext;
 import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter;
 import org.apache.cassandra.db.filter.ColumnFilter;
@@ -111,10 +107,16 @@ import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.Row.Deletion;
 import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.db.transform.FilteredPartitions;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.ExcludingBounds;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.IncludingExcludingBounds;
+import org.apache.cassandra.dht.LocalCompositePrefixPartitioner;
 import org.apache.cassandra.dht.LocalPartitioner;
 import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.index.accord.RouteIndex;
 import org.apache.cassandra.io.IVersionedSerializer;
@@ -148,9 +150,11 @@ import 
org.apache.cassandra.service.accord.serializers.KeySerializers;
 import org.apache.cassandra.service.accord.serializers.TopologySerializers;
 import org.apache.cassandra.transport.Dispatcher;
 import org.apache.cassandra.utils.Clock.Global;
+import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.btree.BTree;
 import org.apache.cassandra.utils.btree.BTreeSet;
 import org.apache.cassandra.utils.bytecomparable.ByteComparable;
+import org.apache.cassandra.utils.concurrent.OpOrder;
 
 import static accord.utils.Invariants.checkArgument;
 import static accord.utils.Invariants.checkState;
@@ -188,15 +192,23 @@ public class AccordKeyspace
     private static final TupleType KEY_TYPE = new 
TupleType(Arrays.asList(UUIDType.instance, BytesType.instance));
     private static final String KEY_TUPLE = KEY_TYPE.asCQL3Type().toString();
 
-    // shared LocalPartitioner for all *_for_key Accord tables with (store_id, 
key_token, key) partition key
-    private static final LocalPartitioner FOR_KEYS_LOCAL_PARTITIONER =
-        new LocalPartitioner(CompositeType.getInstance(Int32Type.instance, 
BytesType.instance, KEY_TYPE));
-
     private static final ClusteringIndexFilter FULL_PARTITION = new 
ClusteringIndexNamesFilter(BTreeSet.of(new ClusteringComparator(), 
Clustering.EMPTY), false);
 
     //TODO (now, performance): should this be partitioner rather than TableId? 
 As of this patch distributed tables should only have 1 partitioner...
     private static final ConcurrentMap<TableId, 
AccordRoutingKeyByteSource.Serializer> TABLE_SERIALIZERS = new 
ConcurrentHashMap<>();
 
+    private static AccordRoutingKeyByteSource.Serializer 
getRoutingKeySerializer(AccordRoutingKey key)
+    {
+        return TABLE_SERIALIZERS.computeIfAbsent(key.table(), ignore -> {
+            IPartitioner partitioner;
+            if (key.kindOfRoutingKey() == 
AccordRoutingKey.RoutingKeyKind.TOKEN)
+                partitioner = key.asTokenKey().token().getPartitioner();
+            else
+                partitioner = 
SchemaHolder.schema.getTablePartitioner(key.table());
+            return AccordRoutingKeyByteSource.variableLength(partitioner);
+        });
+    }
+
     // Schema needs all system keyspace, and this is a system keyspace!  So 
can not touch schema in init
     private static class SchemaHolder
     {
@@ -475,14 +487,13 @@ public class AccordKeyspace
               + format("last_write_timestamp %s, ", TIMESTAMP_TUPLE)
               + "PRIMARY KEY((store_id, key_token, key))"
               + ')')
-        .partitioner(FOR_KEYS_LOCAL_PARTITIONER)
+        .partitioner(new 
LocalPartitioner(CompositeType.getInstance(Int32Type.instance, 
BytesType.instance, KEY_TYPE)))
         .build();
 
     public static class TimestampsForKeyColumns
     {
         static final ClusteringComparator keyComparator = 
TimestampsForKeys.partitionKeyAsClusteringComparator();
         static final CompositeType partitionKeyType = (CompositeType) 
TimestampsForKeys.partitionKeyType;
-        static final ColumnFilter allColumns = 
ColumnFilter.all(TimestampsForKeys);
         static final ColumnMetadata store_id = getColumn(TimestampsForKeys, 
"store_id");
         static final ColumnMetadata key_token = getColumn(TimestampsForKeys, 
"key_token");
         static final ColumnMetadata key = getColumn(TimestampsForKeys, "key");
@@ -579,22 +590,24 @@ public class AccordKeyspace
         }
     }
 
+    private static final LocalCompositePrefixPartitioner CFKPartitioner = new 
LocalCompositePrefixPartitioner(Int32Type.instance, UUIDType.instance, 
BytesType.instance, BytesType.instance);
     private static final TableMetadata CommandsForKeys = 
commandsForKeysTable(COMMANDS_FOR_KEY);
 
     private static TableMetadata commandsForKeysTable(String tableName)
     {
         return parse(tableName,
-              "accord commands per key",
-              "CREATE TABLE %s ("
-              + "store_id int, "
-              + "key_token blob, " // can't use "token" as this is restricted 
word in CQL
-              + format("key %s, ", KEY_TUPLE)
-              + "data blob, "
-              + "PRIMARY KEY((store_id, key_token, key))"
-              + ')'
-               + " WITH compression = {'class':'NoopCompressor'};")
-        .partitioner(FOR_KEYS_LOCAL_PARTITIONER)
-        .build();
+                     "accord commands per key",
+                     "CREATE TABLE %s ("
+                     + "store_id int, "
+                     + "table_id uuid, "
+                     + "key_token blob, " // can't use "token" as this is 
restricted word in CQL
+                     + "key blob, "
+                     + "data blob, "
+                     + "PRIMARY KEY((store_id, table_id, key_token, key))"
+                     + ')'
+                     + " WITH compression = {'class':'NoopCompressor'};")
+               .partitioner(CFKPartitioner)
+               .build();
     }
 
     public static class CommandsForKeyAccessor
@@ -604,6 +617,7 @@ public class AccordKeyspace
         final CompositeType partitionKeyType;
         final ColumnFilter allColumns;
         final ColumnMetadata store_id;
+        final ColumnMetadata table_id;
         final ColumnMetadata key_token;
         final ColumnMetadata key;
         final ColumnMetadata data;
@@ -617,6 +631,7 @@ public class AccordKeyspace
             this.partitionKeyType = (CompositeType) table.partitionKeyType;
             this.allColumns = ColumnFilter.all(table);
             this.store_id = getColumn(table, "store_id");
+            this.table_id = getColumn(table, "table_id");
             this.key_token = getColumn(table, "key_token");
             this.key = getColumn(table, "key");
             this.data = getColumn(table, "data");
@@ -633,6 +648,11 @@ public class AccordKeyspace
             return 
Int32Type.instance.compose(partitionKeyComponents[store_id.position()]);
         }
 
+        public TableId getTableId(ByteBuffer[] partitionKeyComponents)
+        {
+            return 
TableId.fromUUID(UUIDType.instance.compose(partitionKeyComponents[table_id.position()]));
+        }
+
         public PartitionKey getKey(DecoratedKey key)
         {
             return getKey(splitPartitionKey(key));
@@ -640,7 +660,12 @@ public class AccordKeyspace
 
         public PartitionKey getKey(ByteBuffer[] partitionKeyComponents)
         {
-            return deserializeKey(partitionKeyComponents[key.position()]);
+            TableId tableId = 
TableId.fromUUID(UUIDSerializer.instance.deserialize(partitionKeyComponents[table_id.position()]));
+            ByteBuffer keyBytes = partitionKeyComponents[key.position()];
+            IPartitioner partitioner = 
SchemaHolder.schema.getTablePartitioner(tableId);
+            if (partitioner == null)
+                throw new IllegalStateException("Table with id " + tableId + " 
could not be found; was it deleted?");
+            return new PartitionKey(tableId, 
partitioner.decorateKey(keyBytes));
         }
 
         public CommandsForKey getCommandsForKey(PartitionKey key, Row row)
@@ -652,6 +677,13 @@ public class AccordKeyspace
             return CommandsForKeySerializer.fromBytes(key, cell.buffer());
         }
 
+        @VisibleForTesting
+        public ByteBuffer serializeKeyNoTable(AccordRoutingKey key)
+        {
+            byte[] bytes = getRoutingKeySerializer(key).serializeNoTable(key);
+            return ByteBuffer.wrap(bytes);
+        }
+
         // TODO (expected): garbage-free filtering, reusing encoding
         public Row withoutRedundantCommands(PartitionKey key, Row row, 
RedundantBefore.Entry redundantBefore)
         {
@@ -674,6 +706,19 @@ public class AccordKeyspace
             ByteBuffer buffer = 
CommandsForKeySerializer.toBytesWithoutKey(updated);
             return BTreeRow.singleCellRow(Clustering.EMPTY, 
BufferCell.live(data, cell.timestamp(), buffer));
         }
+
+        public LocalCompositePrefixPartitioner.AbstractCompositePrefixToken 
getPrefixToken(int commandStore, AccordRoutingKey key)
+        {
+            if (key.kindOfRoutingKey() == 
AccordRoutingKey.RoutingKeyKind.TOKEN)
+            {
+                ByteBuffer tokenBytes = 
ByteBuffer.wrap(getRoutingKeySerializer(key).serializeNoTable(key));
+                return CFKPartitioner.createPrefixToken(commandStore, 
key.table().asUUID(), tokenBytes);
+            }
+            else
+            {
+                return CFKPartitioner.createPrefixToken(commandStore, 
key.table().asUUID());
+            }
+        }
     }
 
     public static final CommandsForKeyAccessor CommandsForKeysAccessor = new 
CommandsForKeyAccessor(CommandsForKeys);
@@ -940,201 +985,70 @@ public class AccordKeyspace
                                txnId.msb, txnId.lsb, txnId.node.id);
     }
 
-    private static abstract class TableWalk implements Runnable, DebuggableTask
+    /**
+     * Calculates token bounds based on key prefixes.
+     */
+    public static void findAllKeysBetween(int commandStore,
+                                          AccordRoutingKey start, boolean 
startInclusive,
+                                          AccordRoutingKey end, boolean 
endInclusive,
+                                          Observable<PartitionKey> callback)
     {
-        private final long creationTimeNanos = Global.nanoTime();
-        private final Executor executor;
-        private final Observable<UntypedResultSet.Row> callback;
-        private long startTimeNanos = -1;
-        private int numQueries = 0;
-        private UntypedResultSet.Row lastSeen = null;
 
-        private TableWalk(Executor executor, Observable<UntypedResultSet.Row> 
callback)
-        {
-            this.executor = executor;
-            this.callback = callback;
-        }
+        Token startToken = 
CommandsForKeysAccessor.getPrefixToken(commandStore, start);
+        Token endToken = CommandsForKeysAccessor.getPrefixToken(commandStore, 
end);
 
-        protected abstract UntypedResultSet query(UntypedResultSet.Row 
lastSeen);
+        if (start instanceof AccordRoutingKey.SentinelKey)
+            startInclusive = true;
+        if (end instanceof AccordRoutingKey.SentinelKey)
+            endInclusive = true;
 
-        public final void schedule()
-        {
-            executor.execute(this);
-        }
+        PartitionPosition startPosition = startInclusive ? 
startToken.minKeyBound() : startToken.maxKeyBound();
+        PartitionPosition endPosition = endInclusive ? endToken.maxKeyBound() 
: endToken.minKeyBound();
+        AbstractBounds<PartitionPosition> bounds;
+        if (startInclusive && endInclusive)
+            bounds = new Bounds<>(startPosition, endPosition);
+        else if (endInclusive)
+            bounds = new Range<>(startPosition, endPosition);
+        else if (startInclusive)
+            bounds = new IncludingExcludingBounds<>(startPosition, 
endPosition);
+        else
+            bounds = new ExcludingBounds<>(startPosition, endPosition);
 
-        @Override
-        public final void run()
-        {
-            try
+        Stage.READ.executor().submit(() -> {
+            ColumnFamilyStore baseCfs = 
Keyspace.openAndGetStore(CommandsForKeys);
+            try (OpOrder.Group baseOp = baseCfs.readOrdering.start();
+                 WriteContext writeContext = 
baseCfs.keyspace.getWriteHandler().createContextForRead();
+                 CloseableIterator<DecoratedKey> iter = 
LocalCompositePrefixPartitioner.keyIterator(CommandsForKeys, bounds))
             {
-                if (startTimeNanos == -1)
-                    startTimeNanos = Global.nanoTime();
-                numQueries++;
-                UntypedResultSet result = query(lastSeen);
-                if (result.isEmpty())
+                // Need the second try to handle callback errors vs read 
errors.
+                // Callback will see the read errors, but if the callback 
fails the outer try will see those errors
+                try
                 {
+                    while (iter.hasNext())
+                    {
+                        PartitionKey pk = 
CommandsForKeysAccessor.getKey(iter.next());
+                        callback.onNext(pk);
+                    }
                     callback.onCompleted();
-                    return;
                 }
-                UntypedResultSet.Row lastRow = null;
-                for (UntypedResultSet.Row row : result)
+                catch (Exception e)
                 {
-                    callback.onNext(row);
-                    lastRow = row;
+                    callback.onError(e);
                 }
-                lastSeen = lastRow;
-                schedule();
             }
-            catch (Throwable t)
-            {
-                callback.onError(t);
-            }
-        }
-
-        @Override
-        public long creationTimeNanos()
-        {
-            return creationTimeNanos;
-        }
-
-        @Override
-        public long startTimeNanos()
-        {
-            return startTimeNanos;
-        }
-
-        @Override
-        public String description()
-        {
-            return format("Table Walker for %s; queries = %d", 
getClass().getSimpleName(), numQueries);
-        }
-    }
-
-    private static String selection(TableMetadata metadata, Set<String> 
requiredColumns, Set<String> forIteration)
-    {
-        StringBuilder selection = new StringBuilder();
-        if (requiredColumns.isEmpty())
-            selection.append("*");
-        else
-        {
-            Sets.SetView<String> other = Sets.difference(requiredColumns, 
forIteration);
-            for (String name : other)
-            {
-                ColumnMetadata meta = metadata.getColumn(new 
ColumnIdentifier(name, true));
-                if (meta == null)
-                    throw new IllegalArgumentException("Unknown column: " + 
name);
-            }
-            List<String> names = new ArrayList<>(forIteration.size() + 
other.size());
-            names.addAll(forIteration);
-            names.addAll(other);
-            // this sort is to make sure the CQL is determanistic
-            Collections.sort(names);
-            for (int i = 0; i < names.size(); i++)
-            {
-                if (i > 0)
-                    selection.append(", ");
-                selection.append(names.get(i));
-            }
-        }
-        return selection.toString();
-    }
-
-    private static class WalkCommandsForDomain extends TableWalk
-    {
-        private static final Set<String> COLUMNS_FOR_ITERATION = 
ImmutableSet.of("txn_id", "store_id", "domain");
-        private final String cql;
-        private final int storeId, domain;
-
-        private WalkCommandsForDomain(int commandStore, Routable.Domain 
domain, Set<String> requiredColumns, Executor executor, 
Observable<UntypedResultSet.Row> callback)
-        {
-            super(executor, callback);
-            this.storeId = commandStore;
-            this.domain = domain.ordinal();
-            cql = format("SELECT %s " +
-                                "FROM %s " +
-                                "WHERE store_id = ? " +
-                                "      AND domain = ? " +
-                                "      AND token(store_id, domain, txn_id) > 
token(?, ?, (?, ?, ?)) " +
-                                "ALLOW FILTERING", selection(Commands, 
requiredColumns, COLUMNS_FOR_ITERATION), Commands);
-        }
-
-        @Override
-        protected UntypedResultSet query(UntypedResultSet.Row lastSeen)
-        {
-            TxnId lastTxnId = lastSeen == null ?
-                              new TxnId(0, 0, Txn.Kind.Read, 
Routable.Domain.Key, Node.Id.NONE)
-                              : deserializeTxnId(lastSeen);
-            return executeInternal(cql, storeId, domain, storeId, domain, 
lastTxnId.msb, lastTxnId.lsb, lastTxnId.node.id);
-        }
-    }
-
-    public static void findAllKeysBetween(int commandStore,
-                                          AccordRoutingKey start, boolean 
startInclusive,
-                                          AccordRoutingKey end, boolean 
endInclusive,
-                                          Observable<PartitionKey> callback)
-    {
-        //TODO (optimize) : CQL doesn't look smart enough to only walk 
Index.db, and ends up walking the Data.db file for each row in the partitions 
found (for frequent keys, this cost adds up)
-        // it would be possible to find all SSTables that "could" intersect 
this range, then have a merge iterator over the Index.db (filtered to the 
range; index stores partition liveness)...
-        KeysBetween work = new KeysBetween(commandStore,
-                                           
AccordKeyspace.serializeRoutingKey(start), startInclusive,
-                                           
AccordKeyspace.serializeRoutingKey(end), endInclusive,
-                                           ImmutableSet.of("key"),
-                                           Stage.READ.executor(), 
Observable.distinct(callback).map(AccordKeyspace::deserializeKey));
-        work.schedule();
-    }
-
-    private static class KeysBetween extends TableWalk
-    {
-        private static final Set<String> COLUMNS_FOR_ITERATION = 
ImmutableSet.of("store_id", "key_token");
-
-        private final int storeId;
-        private final ByteBuffer start, end;
-        private final String cqlFirst;
-        private final String cqlContinue;
-
-        private KeysBetween(int storeId,
-                            ByteBuffer start, boolean startInclusive,
-                            ByteBuffer end, boolean endInclusive,
-                            Set<String> requiredColumns,
-                            Executor executor, 
Observable<UntypedResultSet.Row> callback)
-        {
-            super(executor, callback);
-            this.storeId = storeId;
-            this.start = start;
-            this.end = end;
-
-            String selection = selection(CommandsForKeys, requiredColumns, 
COLUMNS_FOR_ITERATION);
-            this.cqlFirst = format("SELECT DISTINCT %s\n" +
-                                          "FROM %s\n" +
-                                          "WHERE store_id = ?\n" +
-                                          (startInclusive ? "  AND key_token 
>= ?\n" : "  AND key_token > ?\n") +
-                                          (endInclusive ? "  AND key_token <= 
?\n" : "  AND key_token < ?\n") +
-                                          "ALLOW FILTERING",
-                                          selection, CommandsForKeys);
-            this.cqlContinue = format("SELECT DISTINCT %s\n" +
-                                             "FROM %s\n" +
-                                             "WHERE store_id = ?\n" +
-                                             "  AND key_token > ?\n" +
-                                             "  AND key > ?\n" +
-                                             (endInclusive ? "  AND key_token 
<= ?\n" : "  AND key_token < ?\n") +
-                                             "ALLOW FILTERING",
-                                             selection, CommandsForKeys);
-        }
-
-        @Override
-        protected UntypedResultSet query(UntypedResultSet.Row lastSeen)
-        {
-            if (lastSeen == null)
-            {
-                return executeInternal(cqlFirst, storeId, start, end);
-            }
-            else
+            catch (IOException e)
             {
-                ByteBuffer previousToken = lastSeen.getBytes("key_token");
-                ByteBuffer previousKey = lastSeen.getBytes("key");
-                return executeInternal(cqlContinue, storeId, previousToken, 
previousKey, end);
+                try
+                {
+                    callback.onError(e);
+                }
+                catch (Throwable t)
+                {
+                    e.addSuppressed(t);
+                }
+                throw new RuntimeException(e);
             }
-        }
+        });
     }
 
     public static TxnId deserializeTxnId(UntypedResultSet.Row row)
@@ -1272,26 +1186,24 @@ public class AccordKeyspace
     private static DecoratedKey makeKey(CommandsForKeyAccessor accessor, int 
storeId, PartitionKey key)
     {
         ByteBuffer pk = accessor.keyComparator.make(storeId,
+                                                    
UUIDSerializer.instance.serialize(key.table().asUUID()),
                                                     
serializeRoutingKey(key.toUnseekable()),
-                                                    
serializeKey(key)).serializeAsPartitionKey();
+                                                    
key.partitionKey().getKey()).serializeAsPartitionKey();
         return accessor.table.partitioner.decorateKey(pk);
     }
 
     @VisibleForTesting
     public static ByteBuffer serializeRoutingKey(AccordRoutingKey routingKey)
     {
-        AccordRoutingKeyByteSource.Serializer serializer = 
TABLE_SERIALIZERS.computeIfAbsent(routingKey.table(), ignore -> {
-            IPartitioner partitioner;
-            if (routingKey.kindOfRoutingKey() == 
AccordRoutingKey.RoutingKeyKind.TOKEN)
-                partitioner = routingKey.asTokenKey().token().getPartitioner();
-            else
-                partitioner = 
SchemaHolder.schema.getTablePartitioner(routingKey.table());
-            return AccordRoutingKeyByteSource.variableLength(partitioner);
-        });
-        byte[] bytes = serializer.serialize(routingKey);
+        byte[] bytes = 
getRoutingKeySerializer(routingKey).serialize(routingKey);
         return ByteBuffer.wrap(bytes);
     }
 
+    public static ByteBuffer serializeRoutingKeyNoTable(AccordRoutingKey key)
+    {
+        return CommandsForKeysAccessor.serializeKeyNoTable(key);
+    }
+
     private static PartitionUpdate getCommandsForKeyPartitionUpdate(int 
storeId, PartitionKey key, CommandsForKey commandsForKey, long timestampMicros)
     {
         ByteBuffer bytes = 
CommandsForKeySerializer.toBytesWithoutKey(commandsForKey);
@@ -1475,7 +1387,7 @@ public class AccordKeyspace
 
     /**
      * Update the disk state for this epoch, if it's higher than the one we 
have one disk.
-     *
+     * <p>
      * This is meant to be called before any update involving the new epoch, 
not after. This way if the update
      * fails, we can detect and cleanup. If we updated disk state after an 
update and it failed, we could "forget"
      * about (now acked) topology updates after a restart.
@@ -1624,7 +1536,6 @@ public class AccordKeyspace
         Ranges redundant = row.has("redundant") ? 
blobMapToRanges(row.getMap("redundant", BytesType.instance, 
BytesType.instance)) : Ranges.EMPTY;
 
         consumer.load(epoch, topology, syncStatus, pendingSyncNotify, 
remoteSyncComplete, closed, redundant);
-
     }
 
     public static EpochDiskState loadTopologies(TopologyLoadConsumer consumer)
@@ -1710,8 +1621,8 @@ public class AccordKeyspace
     public interface CommandStoreMetadataConsumer
     {
         void accept(ReducingRangeMap<Timestamp> rejectBefore, DurableBefore 
durableBefore, RedundantBefore redundantBefore, NavigableMap<TxnId, Ranges> 
bootstrapBeganAt, NavigableMap<Timestamp, Ranges> safeToRead);
-
     }
+
     public static void loadCommandStoreMetadata(int id, 
CommandStoreMetadataConsumer consumer)
     {
         UntypedResultSet result = executeOnceInternal(format("SELECT * FROM 
%s.%s WHERE store_id=?", ACCORD_KEYSPACE_NAME, COMMAND_STORE_METADATA), id);
@@ -1758,5 +1669,4 @@ public class AccordKeyspace
         TABLE_SERIALIZERS.clear();
         SchemaHolder.schema = Schema.instance;
     }
-
 }
diff --git 
a/src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java 
b/src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java
index 18fb045475..51497792cd 100644
--- a/src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java
+++ b/src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java
@@ -124,10 +124,11 @@ public class AccordRepair
         List<accord.primitives.Range> repairedRanges = new ArrayList<>();
         int rangeStepUpdateInterval = 
ACCORD_REPAIR_RANGE_STEP_UPDATE_INTERVAL.getInt();
         RoutingKey remainingStart = range.start();
+        // TODO (expected): repair ranges should have a configurable lower 
limit of split size so already small repairs aren't broken up into excessively 
tiny ones
         BigInteger rangeSize = splitter.sizeOf(range);
         if (rangeStep == null)
         {
-            BigInteger divide = splitter.divide(rangeSize, 1000);
+            BigInteger divide = splitter.divide(rangeSize, 10000);
             rangeStep = divide.equals(BigInteger.ZERO) ? rangeSize : 
BigInteger.ONE.max(divide);
         }
 
diff --git 
a/src/java/org/apache/cassandra/tools/nodetool/ConsensusMigrationAdmin.java 
b/src/java/org/apache/cassandra/tools/nodetool/ConsensusMigrationAdmin.java
index e7e1fd9cfc..7eea5a021b 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/ConsensusMigrationAdmin.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/ConsensusMigrationAdmin.java
@@ -119,7 +119,7 @@ public abstract class ConsensusMigrationAdmin extends 
NodeTool.NodeToolCmd
             @Override
             public Integer start()
             {
-                return 
probe.getStorageService().finishConsensusMigration(keyspace, maybeTableNames, 
maybeRangesStr, target);
+                return 
probe.getStorageService().finishConsensusMigration(keyspace, maybeTableNames, 
maybeRangesStr, target.toString());
             }
         }
 
diff --git a/src/java/org/apache/cassandra/utils/MergeIterator.java 
b/src/java/org/apache/cassandra/utils/MergeIterator.java
index 1dd1f7833b..7ffb55a2db 100644
--- a/src/java/org/apache/cassandra/utils/MergeIterator.java
+++ b/src/java/org/apache/cassandra/utils/MergeIterator.java
@@ -421,6 +421,23 @@ public abstract class MergeIterator<In,Out> extends 
AbstractIterator<Out> implem
     /** Accumulator that collects values of type A, and outputs a value of 
type B. */
     public static abstract class Reducer<In,Out>
     {
+        public static class Trivial<T> extends Reducer<T, T>
+        {
+            private T reduced = null;
+
+            @Override
+            public boolean trivialReduceIsTrivial() { return true; }
+
+            @Override
+            public void reduce(int idx, T current) { reduced = current; }
+
+            @Override
+            protected T getReduced() { return reduced; }
+
+            @Override
+            protected void onKeyChange() { reduced = null; }
+        }
+
         /**
          * @return true if Out is the same as In for the case of a single 
source iterator
          */
diff --git 
a/test/distributed/org/apache/cassandra/io/sstable/format/ForwardingSSTableReader.java
 
b/test/distributed/org/apache/cassandra/io/sstable/format/ForwardingSSTableReader.java
index 710a426647..743583e362 100644
--- 
a/test/distributed/org/apache/cassandra/io/sstable/format/ForwardingSSTableReader.java
+++ 
b/test/distributed/org/apache/cassandra/io/sstable/format/ForwardingSSTableReader.java
@@ -249,6 +249,12 @@ public abstract class ForwardingSSTableReader extends 
SSTableReader
         return delegate.keyIterator();
     }
 
+    @Override
+    public KeyIterator keyIterator(AbstractBounds<PartitionPosition> range) 
throws IOException
+    {
+        return delegate.keyIterator(range);
+    }
+
     @Override
     public DecoratedKey firstKeyBeyond(PartitionPosition token)
     {
diff --git a/test/unit/org/apache/cassandra/db/marshal/AbstractTypeTest.java 
b/test/unit/org/apache/cassandra/db/marshal/AbstractTypeTest.java
index 35ecfab60d..d498323e85 100644
--- a/test/unit/org/apache/cassandra/db/marshal/AbstractTypeTest.java
+++ b/test/unit/org/apache/cassandra/db/marshal/AbstractTypeTest.java
@@ -233,6 +233,8 @@ public class AbstractTypeTest
                 continue;
             if (isTestType(klass))
                 continue;
+            if (isPrefixCompositeType(klass))
+                continue;
             String name = klass.getCanonicalName();
             if (name == null)
                 name = klass.getName();
@@ -259,6 +261,13 @@ public class AbstractTypeTest
         return "test".equals(new File(src.getLocation().getPath()).name());
     }
 
+    @SuppressWarnings("rawtypes")
+    private boolean isPrefixCompositeType(Class<? extends AbstractType> klass)
+    {
+        String name = klass.getCanonicalName();
+        return name.contains("PrefixCompositeType");
+    }
+
     @Test
     public void unsafeSharedSerializer()
     {
diff --git a/test/unit/org/apache/cassandra/dht/IPartitionerTest.java 
b/test/unit/org/apache/cassandra/dht/IPartitionerTest.java
index 5e46f09ed6..e531e25da5 100644
--- a/test/unit/org/apache/cassandra/dht/IPartitionerTest.java
+++ b/test/unit/org/apache/cassandra/dht/IPartitionerTest.java
@@ -18,21 +18,78 @@
 
 package org.apache.cassandra.dht;
 
+import java.lang.reflect.Modifier;
+import java.security.CodeSource;
+import java.security.ProtectionDomain;
 import java.util.Objects;
+import java.util.Set;
 
+import com.google.common.collect.Sets;
 import org.junit.Test;
 
+import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.utils.AbstractTypeGenerators;
 import org.apache.cassandra.utils.AccordGenerators;
 import org.apache.cassandra.utils.CassandraGenerators;
 import org.apache.cassandra.utils.bytecomparable.ByteComparable;
 import org.apache.cassandra.utils.bytecomparable.ByteSource;
 import org.assertj.core.api.Assertions;
+import org.reflections.Reflections;
+import org.reflections.scanners.Scanners;
+import org.reflections.util.ConfigurationBuilder;
 
 import static accord.utils.Property.qt;
 
 public class IPartitionerTest
 {
+    //TODO (now, maintaince): this is copied from AbstractTypeTest
+    private static final Reflections reflections = new Reflections(new 
ConfigurationBuilder()
+                                                                   
.forPackage("org.apache.cassandra")
+                                                                   
.setScanners(Scanners.SubTypes)
+                                                                   
.setExpandSuperTypes(true)
+                                                                   
.setParallel(true));
+
+    @Test
+    public void allCovered()
+    {
+        Set<Class<? extends IPartitioner>> subTypes = 
reflections.getSubTypesOf(IPartitioner.class);
+        Set<Class<? extends IPartitioner>> coverage = 
CassandraGenerators.knownPartitioners();
+        StringBuilder sb = new StringBuilder();
+        for (Class<? extends IPartitioner> klass : Sets.difference(subTypes, 
coverage))
+        {
+            if (Modifier.isAbstract(klass.getModifiers()))
+                continue;
+            if (isTestType(klass))
+                continue;
+            if (ReversedLongLocalPartitioner.class.equals(klass))
+                continue;
+            String name = klass.getCanonicalName();
+            if (name == null)
+                name = klass.getName();
+            sb.append(name).append('\n');
+        }
+        if (sb.length() > 0)
+            throw new AssertionError("Uncovered types:\n" + sb);
+    }
+
+    private boolean isTestType(Class<? extends IPartitioner> klass)
+    {
+        String name = klass.getCanonicalName();
+        if (name == null)
+            name = klass.getName();
+        if (name == null)
+            name = klass.toString();
+        if (name.contains("Test"))
+            return true;
+        if (name.equals(LengthPartitioner.class.getCanonicalName()))
+            return true;
+        ProtectionDomain domain = klass.getProtectionDomain();
+        if (domain == null) return false;
+        CodeSource src = domain.getCodeSource();
+        if (src == null) return false;
+        return "test".equals(new File(src.getLocation().getPath()).name());
+    }
+
     @Test
     public void byteCompareSerde()
     {
diff --git 
a/test/unit/org/apache/cassandra/dht/LocalCompositePrefixPartitionerTest.java 
b/test/unit/org/apache/cassandra/dht/LocalCompositePrefixPartitionerTest.java
new file mode 100644
index 0000000000..565e5a59fd
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/dht/LocalCompositePrefixPartitionerTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import com.google.common.collect.Lists;
+import org.apache.cassandra.CassandraTestBase;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.TableMetadata;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.String.format;
+import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
+import static org.apache.cassandra.utils.ByteBufferUtil.hexToBytes;
+
+public class LocalCompositePrefixPartitionerTest extends CassandraTestBase
+{
+    private static final String KEYSPACE = "ks";
+    @BeforeClass
+    public static void setupClass()
+    {
+        SchemaLoader.prepareServer();
+    }
+
+    private static TableMetadata.Builder parse(String keyspace, String name, 
String cql)
+    {
+        return CreateTableStatement.parse(format(cql, name), KEYSPACE)
+                .gcGraceSeconds((int) TimeUnit.DAYS.toSeconds(90));
+    }
+
+    private static LocalCompositePrefixPartitioner partitioner(AbstractType... 
types)
+    {
+        return new LocalCompositePrefixPartitioner(types);
+    }
+
+    private static void assertKeysMatch(Iterable<DecoratedKey> expected, 
Iterator<DecoratedKey> actual)
+    {
+        List<DecoratedKey> expectedList = Lists.newArrayList(expected);
+        List<DecoratedKey> actualList = Lists.newArrayList(actual);
+        Assert.assertEquals(expectedList, actualList);
+    }
+
+    @Test
+    public void keyIteratorTest() throws Throwable
+    {
+        String keyspaceName = "ks";
+        String tableName = "tbl";
+        LocalCompositePrefixPartitioner partitioner = 
partitioner(Int32Type.instance, BytesType.instance, Int32Type.instance);
+        TableMetadata metadata = parse(keyspaceName, tableName,
+                                       "CREATE TABLE %s (" +
+                                               "p1 int," +
+                                               "p2 blob," +
+                                               "p3 int," +
+                                               "v int," +
+                                               "PRIMARY KEY ((p1, p2, p3))" +
+                                               
")").partitioner(partitioner).build();
+        SchemaLoader.createKeyspace(keyspaceName, KeyspaceParams.local(), 
metadata);
+
+        executeOnceInternal(String.format("INSERT INTO %s.%s (p1, p2, p3, v) 
VALUES (1, 0x00, 5, 0)", keyspaceName, tableName));
+        executeOnceInternal(String.format("INSERT INTO %s.%s (p1, p2, p3, v) 
VALUES (1, 0x0000, 5, 0)", keyspaceName, tableName));
+        executeOnceInternal(String.format("INSERT INTO %s.%s (p1, p2, p3, v) 
VALUES (2, 0x00, 5, 0)", keyspaceName, tableName));
+        executeOnceInternal(String.format("INSERT INTO %s.%s (p1, p2, p3, v) 
VALUES (2, 0x0100, 5, 0)", keyspaceName, tableName));
+        executeOnceInternal(String.format("INSERT INTO %s.%s (p1, p2, p3, v) 
VALUES (2, 0x02, 5, 0)", keyspaceName, tableName));
+        executeOnceInternal(String.format("INSERT INTO %s.%s (p1, p2, p3, v) 
VALUES (2, 0x02, 6, 0)", keyspaceName, tableName));
+
+        Token startToken = partitioner.createPrefixToken(1, 
hexToBytes("0000"));
+        Token endToken1 = partitioner.createPrefixToken(2, hexToBytes("0100"));
+        Token endToken2 = partitioner.createPrefixToken(2, hexToBytes("02"));
+
+
+        assertKeysMatch(List.of(partitioner.decoratedKey(2, hexToBytes("00"), 
5),
+                                partitioner.decoratedKey(2, 
hexToBytes("0100"), 5)
+        ), partitioner.keyIterator(metadata, new 
Range<>(startToken.maxKeyBound(), endToken1.maxKeyBound())));
+
+        assertKeysMatch(List.of(partitioner.decoratedKey(1, 
hexToBytes("0000"), 5),
+                                partitioner.decoratedKey(2, hexToBytes("00"), 
5),
+                                partitioner.decoratedKey(2, 
hexToBytes("0100"), 5),
+                                partitioner.decoratedKey(2, hexToBytes("02"), 
5),
+                                partitioner.decoratedKey(2, hexToBytes("02"), 
6)
+                        ), partitioner.keyIterator(metadata, new 
Bounds<>(startToken.minKeyBound(), endToken2.maxKeyBound())));
+
+        assertKeysMatch(List.of(partitioner.decoratedKey(1, 
hexToBytes("0000"), 5),
+                                partitioner.decoratedKey(2, hexToBytes("00"), 
5),
+                                partitioner.decoratedKey(2, 
hexToBytes("0100"), 5)
+        ), partitioner.keyIterator(metadata, new 
IncludingExcludingBounds<>(startToken.minKeyBound(), endToken2.minKeyBound())));
+
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordKeyspaceTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordKeyspaceTest.java
index 717cda1de3..02d823cfd4 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordKeyspaceTest.java
@@ -60,6 +60,7 @@ import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.dht.ReversedLongLocalPartitioner;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.schema.MemtableParams;
 import org.apache.cassandra.schema.Schema;
@@ -183,8 +184,13 @@ public class AccordKeyspaceTest extends CQLTester.InMemory
                 {
                     TableId tableId = 
rs.pickOrderedSet(tables.navigableKeySet());
                     IPartitioner partitioner = tables.get(tableId);
-                    ByteBuffer data = !(partitioner instanceof 
LocalPartitioner) ? Int32Type.instance.decompose(rs.nextInt())
-                                                                               
  : fromQT(getTypeSupport(partitioner.getTokenValidator()).bytesGen()).next(rs);
+                    ByteBuffer data;
+                    if (partitioner instanceof ReversedLongLocalPartitioner)
+                        data = 
fromQT(CassandraGenerators.reversedLongLocalKeys()).next(rs);
+                    else if (partitioner instanceof LocalPartitioner)
+                        data = 
fromQT(getTypeSupport(partitioner.getTokenValidator()).bytesGen()).next(rs);
+                    else
+                        data = Int32Type.instance.decompose(rs.nextInt());
                     PartitionKey key = new PartitionKey(tableId, 
tables.get(tableId).decorateKey(data));
                     if (keys.add(key))
                     {
@@ -200,8 +206,8 @@ public class AccordKeyspaceTest extends CQLTester.InMemory
                         // The memtable will allow the write, but it will be 
dropped when writing to the SSTable...
                         //TODO (now, correctness): since we store the user 
token + user key, if a key is close to the PK limits then we could tip over and 
loose our CFK
 //                        new 
Mutation(AccordKeyspace.getCommandsForKeyPartitionUpdate(store, pk, 42, 
ByteBufferUtil.EMPTY_BYTE_BUFFER)).apply();
-                        execute("INSERT INTO system_accord.commands_for_key 
(store_id, key_token, key) VALUES (?, ?, ?)",
-                                store, 
AccordKeyspace.serializeRoutingKey(pk.toUnseekable()), 
AccordKeyspace.serializeKey(pk));
+                        execute("INSERT INTO system_accord.commands_for_key 
(store_id, table_id, key_token, key) VALUES (?, ?, ?, ?)",
+                                store, pk.table().asUUID(), 
AccordKeyspace.serializeRoutingKeyNoTable(pk.toUnseekable()), 
pk.partitionKey().getKey());
                     }
                     catch (IllegalArgumentException | InvalidRequestException 
e)
                     {
@@ -235,17 +241,21 @@ public class AccordKeyspaceTest extends CQLTester.InMemory
                     for (var e : storesToKeys.entrySet())
                     {
                         int store = e.getKey();
-                        expectedCqlStoresToKeys.put(store, new 
TreeSet<>(e.getValue().stream().map(p -> 
AccordKeyspace.serializeRoutingKey(p.toUnseekable())).collect(Collectors.toList())));
+                        SortedSet<PartitionKey> keys = e.getValue();
+                        if (keys.isEmpty())
+                            continue;
+                        expectedCqlStoresToKeys.put(store, new 
TreeSet<>(keys.stream().map(p -> 
AccordKeyspace.serializeRoutingKeyNoTable(p.toUnseekable())).collect(Collectors.toList())));
                     }
 
                     // make sure no data loss... when this test was written 
sstable had all the rows but the sstable didn't... this
                     // is mostly a santity check to detect that case early
-                    var resultSet = execute("SELECT store_id, key_token FROM 
system_accord.commands_for_key ALLOW FILTERING");
+                    var resultSet = execute("SELECT store_id, table_id, 
key_token FROM system_accord.commands_for_key ALLOW FILTERING");
                     TreeMap<Integer, SortedSet<ByteBuffer>> cqlStoresToKeys = 
new TreeMap<>();
                     for (var row : resultSet)
                     {
                         int storeId = row.getInt("store_id");
                         ByteBuffer bb = row.getBytes("key_token");
+                        // FIXME: include table_id
                         cqlStoresToKeys.computeIfAbsent(storeId, ignore -> new 
TreeSet<>()).add(bb);
                     }
                     
Assertions.assertThat(cqlStoresToKeys).isEqualTo(expectedCqlStoresToKeys);
@@ -255,6 +265,8 @@ public class AccordKeyspaceTest extends CQLTester.InMemory
                 {
                     int store = 
rs.pickOrderedSet(storesToKeys.navigableKeySet());
                     var keysForStore = new 
ArrayList<>(storesToKeys.get(store));
+                    if (keysForStore.isEmpty())
+                        continue;
 
                     int offset;
                     int offsetEnd;
diff --git a/test/unit/org/apache/cassandra/utils/AccordGenerators.java 
b/test/unit/org/apache/cassandra/utils/AccordGenerators.java
index e827bb33d6..cf23494d84 100644
--- a/test/unit/org/apache/cassandra/utils/AccordGenerators.java
+++ b/test/unit/org/apache/cassandra/utils/AccordGenerators.java
@@ -70,7 +70,7 @@ import static 
org.apache.cassandra.service.accord.AccordTestUtils.createPartialT
 
 public class AccordGenerators
 {
-    private static final Gen<IPartitioner> PARTITIONER_GEN = 
fromQT(CassandraGenerators.partitioners());
+    private static final Gen<IPartitioner> PARTITIONER_GEN = 
fromQT(CassandraGenerators.nonLocalPartitioners());
 
     private AccordGenerators()
     {
diff --git a/test/unit/org/apache/cassandra/utils/CassandraGenerators.java 
b/test/unit/org/apache/cassandra/utils/CassandraGenerators.java
index 2294183191..f45d05b7ce 100644
--- a/test/unit/org/apache/cassandra/utils/CassandraGenerators.java
+++ b/test/unit/org/apache/cassandra/utils/CassandraGenerators.java
@@ -37,6 +37,7 @@ import javax.annotation.Nullable;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import org.apache.commons.lang3.builder.MultilineRecursiveToStringStyle;
 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
 
@@ -57,11 +58,13 @@ import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.dht.ByteOrderedPartitioner;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.LocalCompositePrefixPartitioner;
 import org.apache.cassandra.dht.LocalPartitioner;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.dht.OrderPreservingPartitioner;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.ReversedLongLocalPartitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.EndpointState;
@@ -609,11 +612,21 @@ public final class CassandraGenerators
         return rs -> partitioner.getToken(bytes.generate(rs));
     }
 
-    public static Gen<IPartitioner> localPartitioner()
+    public static Gen<LocalPartitioner> localPartitioner()
     {
         return AbstractTypeGenerators.safeTypeGen().map(LocalPartitioner::new);
     }
 
+    public static Gen<LocalCompositePrefixPartitioner> 
localCompositePrefixPartitioner()
+    {
+        return AbstractTypeGenerators.safeTypeGen().map(type -> {
+            if (type instanceof CompositeType)
+                return new LocalCompositePrefixPartitioner((CompositeType) 
type);
+            else
+                return new LocalCompositePrefixPartitioner(type);
+        });
+    }
+
     public static Gen<Token> localPartitionerToken()
     {
         var lpGen = localPartitioner();
@@ -624,6 +637,31 @@ public final class CassandraGenerators
         };
     }
 
+    public static Gen<Token> localCompositePrefixPartitionerToken()
+    {
+        var lpGen = localCompositePrefixPartitioner();
+        return rs -> {
+            var lp = lpGen.generate(rs);
+            var bytes = 
AbstractTypeGenerators.getTypeSupport(lp.getTokenValidator()).bytesGen();
+            return lp.getToken(bytes.generate(rs));
+        };
+    }
+
+    public static Gen<Token> reversedLongLocalToken()
+    {
+        Constraint range = Constraint.between(0, Long.MAX_VALUE);
+        return rs -> new 
ReversedLongLocalPartitioner.ReversedLongLocalToken(rs.next(range));
+    }
+
+    public static Gen<ByteBuffer> reversedLongLocalKeys()
+    {
+        Constraint range = Constraint.between(0, Long.MAX_VALUE);
+        return rs -> {
+            long value = rs.next(range);
+            return ByteBufferUtil.bytes(value);
+        };
+    }
+
     public static Gen<Token> orderPreservingToken()
     {
         // empty token only happens if partition key is byte[0], which isn't 
allowed
@@ -640,23 +678,39 @@ public final class CassandraGenerators
 
     private enum SupportedPartitioners
     {
-        Murmur(ignore -> Murmur3Partitioner.instance),
-        ByteOrdered(ignore -> ByteOrderedPartitioner.instance),
-        Random(ignore -> RandomPartitioner.instance),
-        Local(localPartitioner()),
-        OrderPreserving(ignore -> OrderPreservingPartitioner.instance);
+        Murmur(Murmur3Partitioner.class,                                ignore 
-> Murmur3Partitioner.instance),
+        ByteOrdered(ByteOrderedPartitioner.class,                       ignore 
-> ByteOrderedPartitioner.instance),
+        Random(RandomPartitioner.class,                                 ignore 
-> RandomPartitioner.instance),
+        Local(LocalPartitioner.class,                                   
localPartitioner()),
+        OrderPreserving(OrderPreservingPartitioner.class,               ignore 
-> OrderPreservingPartitioner.instance),
+        LocalCompositePrefix(LocalCompositePrefixPartitioner.class,     
localCompositePrefixPartitioner());
 
-        private final Gen<IPartitioner> partitioner;
+        private final Class<? extends IPartitioner> clazz;
+        private final Gen<? extends IPartitioner> partitioner;
 
-        SupportedPartitioners(Gen<IPartitioner> partitionerGen)
+        <T extends IPartitioner> SupportedPartitioners(Class<T> clazz, Gen<T> 
partitionerGen)
         {
+            this.clazz = clazz;
             partitioner = partitionerGen;
         }
 
-        public Gen<IPartitioner> partitioner()
+        public Gen<? extends IPartitioner> partitioner()
         {
             return partitioner;
         }
+
+        public static Set<Class<? extends IPartitioner>> knownPartitioners()
+        {
+            ImmutableSet.Builder<Class<? extends IPartitioner>> builder = 
ImmutableSet.builder();
+            for (SupportedPartitioners p : values())
+                builder.add(p.clazz);
+            return builder.build();
+        }
+    }
+
+    public static Set<Class<? extends IPartitioner>> knownPartitioners()
+    {
+        return SupportedPartitioners.knownPartitioners();
     }
 
     public static Gen<IPartitioner> partitioners()
@@ -665,10 +719,12 @@ public final class CassandraGenerators
                         .flatMap(SupportedPartitioners::partitioner);
     }
 
+
     public static Gen<IPartitioner> nonLocalPartitioners()
     {
         return SourceDSL.arbitrary().enumValues(SupportedPartitioners.class)
-                        .assuming(p -> p != SupportedPartitioners.Local)
+                        .assuming(p -> p != SupportedPartitioners.Local &&
+                                       p != 
SupportedPartitioners.LocalCompositePrefix)
                         .flatMap(SupportedPartitioners::partitioner);
     }
 
@@ -682,6 +738,7 @@ public final class CassandraGenerators
         if (partitioner instanceof Murmur3Partitioner) return murmurToken();
         if (partitioner instanceof ByteOrderedPartitioner) return 
byteOrderToken();
         if (partitioner instanceof RandomPartitioner) return 
randomPartitionerToken();
+        if (partitioner instanceof LocalCompositePrefixPartitioner) return 
localCompositePrefixPartitionerToken();
         if (partitioner instanceof LocalPartitioner) return 
localPartitionerToken((LocalPartitioner) partitioner);
         if (partitioner instanceof OrderPreservingPartitioner) return 
orderPreservingToken();
         throw new UnsupportedOperationException("Unsupported partitioner: " + 
partitioner.getClass());
@@ -842,6 +899,10 @@ public final class CassandraGenerators
                 LocalPartitioner lp = (LocalPartitioner) partitioner;
                 valueGen = 
AbstractTypeGenerators.getTypeSupport(lp.getTokenValidator()).bytesGen();
             }
+            else if (partitioner instanceof ReversedLongLocalPartitioner)
+            {
+                valueGen = reversedLongLocalKeys();
+            }
             return partitioner.decorateKey(valueGen.generate(rs));
         };
     }
diff --git a/test/unit/org/apache/cassandra/utils/CassandraGeneratorsTest.java 
b/test/unit/org/apache/cassandra/utils/CassandraGeneratorsTest.java
new file mode 100644
index 0000000000..b4e037ef3a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/CassandraGeneratorsTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import org.junit.Test;
+
+import accord.utils.Gens;
+import org.assertj.core.api.Assertions;
+
+import static accord.utils.Property.qt;
+import static org.apache.cassandra.utils.Generators.toGen;
+
+public class CassandraGeneratorsTest
+{
+    @Test
+    public void partitionerToToken()
+    {
+        qt().forAll(Gens.random(), toGen(CassandraGenerators.partitioners()))
+            .check((rs, p) -> 
Assertions.assertThat(toGen(CassandraGenerators.token(p)).next(rs)).isNotNull());
+    }
+
+    @Test
+    public void partitionerKeys()
+    {
+        qt().forAll(Gens.random(), toGen(CassandraGenerators.partitioners()))
+            .check((rs, p) -> 
Assertions.assertThat(toGen(CassandraGenerators.decoratedKeys(i -> 
p)).next(rs)).isNotNull());
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to