This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 2c003710881860bde420d6a2dc1cb71e845bdb28
Author: Sam Tunnicliffe <s...@apache.org>
AuthorDate: Fri Apr 26 09:12:38 2024 +0100

    Push down repair tokens and partitioner through paxos repair
    
    Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson for
    CASSANDRA-19714
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   4 +-
 src/java/org/apache/cassandra/db/Keyspace.java     |   6 ++
 .../org/apache/cassandra/db/SnapshotCommand.java   |  54 +++++++++++--
 .../org/apache/cassandra/db/SystemKeyspace.java    |   6 +-
 .../cassandra/dht/RangeFetchMapCalculator.java     |   2 +-
 src/java/org/apache/cassandra/net/ParamType.java   |   3 -
 .../org/apache/cassandra/repair/RepairJobDesc.java |  31 ++++----
 .../cassandra/repair/consistent/LocalSessions.java |  17 ++--
 .../cassandra/repair/messages/SyncRequest.java     |   6 +-
 .../cassandra/service/SnapshotVerbHandler.java     |  11 +--
 .../service/paxos/PaxosRepairHistory.java          |  87 ++++++++++++++++-----
 .../paxos/cleanup/PaxosCleanupComplete.java        |   7 +-
 .../service/paxos/cleanup/PaxosCleanupHistory.java |   2 +-
 .../service/paxos/cleanup/PaxosCleanupRequest.java |   8 +-
 .../paxos/cleanup/PaxosStartPrepareCleanup.java    |   8 +-
 .../paxos/uncommitted/PaxosStateTracker.java       |   6 +-
 .../paxos/uncommitted/PaxosUncommittedTracker.java |   9 ++-
 .../paxos/uncommitted/UncommittedDataFile.java     |   3 +-
 .../paxos/uncommitted/UncommittedTableData.java    |  54 +++++++------
 .../cassandra/utils/DiagnosticSnapshotService.java |  22 ++++--
 .../apache/cassandra/utils/RangesSerializer.java   |  73 -----------------
 .../serialization/5.1/service.SyncComplete.bin     | Bin 344 -> 256 bytes
 .../data/serialization/5.1/service.SyncRequest.bin | Bin 155 -> 111 bytes
 .../5.1/service.ValidationComplete.bin             | Bin 729 -> 597 bytes
 .../5.1/service.ValidationRequest.bin              | Bin 118 -> 74 bytes
 .../distributed/test/tcm/PaxosRepairTCMTest.java   |  68 ++++++++++++++++
 .../cassandra/db/ColumnFamilyStoreMBeanTest.java   |   1 +
 .../service/paxos/PaxosRepairHistoryTest.java      |  21 ++---
 .../uncommitted/UncommittedTableDataTest.java      |   8 +-
 30 files changed, 309 insertions(+), 209 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 91d3d51abc..862df73c83 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Use table-specific partitioners during Paxos repair (CASSANDRA-19714)
  * Expose current compaction throughput in nodetool (CASSANDRA-13890)
  * CEP-24 Password validation / generation (CASSANDRA-17457)
  * Reconfigure CMS after replacement, bootstrap and move operations 
(CASSANDRA-19705)
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 56d64d009c..d56d690f4a 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -2495,7 +2495,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean, Memtable.Owner
     @Override
     public void forceCompactionForTokenRanges(String... strings)
     {
-        CompactionManager.instance.forceCompactionForTokenRange(this, 
toTokenRanges(DatabaseDescriptor.getPartitioner(), strings));
+        CompactionManager.instance.forceCompactionForTokenRange(this, 
toTokenRanges(getPartitioner(), strings));
     }
 
     static Set<Range<Token>> toTokenRanges(IPartitioner partitioner, String... 
strings)
@@ -3340,7 +3340,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean, Memtable.Owner
         if (ksName == null || cfName == null)
             return null;
 
-        Keyspace keyspace = Keyspace.open(ksName);
+        Keyspace keyspace = Keyspace.openIfExists(ksName);
         if (keyspace == null)
             return null;
 
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java 
b/src/java/org/apache/cassandra/db/Keyspace.java
index 551a4a77b2..7a18f112ba 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -149,6 +149,12 @@ public class Keyspace
         return ks;
     }
 
+    public static Keyspace openIfExists(String keyspaceName)
+    {
+        assert initialized || 
SchemaConstants.isLocalSystemKeyspace(keyspaceName) : "Initialized: " + 
initialized;
+        return Schema.instance.getKeyspaceInstance(keyspaceName);
+    }
+
     // to only be used by org.apache.cassandra.tools.Standalone* classes
     public static Keyspace openWithoutSSTables(String keyspaceName)
     {
diff --git a/src/java/org/apache/cassandra/db/SnapshotCommand.java 
b/src/java/org/apache/cassandra/db/SnapshotCommand.java
index e909e50c94..a5e522abff 100644
--- a/src/java/org/apache/cassandra/db/SnapshotCommand.java
+++ b/src/java/org/apache/cassandra/db/SnapshotCommand.java
@@ -18,10 +18,17 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
 
 public class SnapshotCommand
 {
@@ -31,13 +38,15 @@ public class SnapshotCommand
     public final String column_family;
     public final String snapshot_name;
     public final boolean clear_snapshot;
+    public final List<Range<Token>> ranges;
 
-    public SnapshotCommand(String keyspace, String columnFamily, String 
snapshotName, boolean clearSnapshot)
+    public SnapshotCommand(String keyspace, String columnFamily, 
List<Range<Token>> ranges, String snapshotName, boolean clearSnapshot)
     {
         this.keyspace = keyspace;
         this.column_family = columnFamily;
         this.snapshot_name = snapshotName;
         this.clear_snapshot = clearSnapshot;
+        this.ranges = ranges;
     }
 
     @Override
@@ -46,7 +55,8 @@ public class SnapshotCommand
         return "SnapshotCommand{" + "keyspace='" + keyspace + '\'' +
                                   ", column_family='" + column_family + '\'' +
                                   ", snapshot_name=" + snapshot_name +
-                                  ", clear_snapshot=" + clear_snapshot + '}';
+                                  ", clear_snapshot=" + clear_snapshot +
+                                  ", ranges=" + ranges + '}';
     }
 }
 
@@ -58,6 +68,15 @@ class SnapshotCommandSerializer implements 
IVersionedSerializer<SnapshotCommand>
         out.writeUTF(snapshot_command.column_family);
         out.writeUTF(snapshot_command.snapshot_name);
         out.writeBoolean(snapshot_command.clear_snapshot);
+        if (version >= MessagingService.VERSION_51)
+        {
+            out.writeUnsignedVInt32(snapshot_command.ranges.size());
+            for (Range<Token> r : snapshot_command.ranges)
+            {
+                Token.serializer.serialize(r.left, out, version);
+                Token.serializer.serialize(r.right, out, version);
+            }
+        }
     }
 
     public SnapshotCommand deserialize(DataInputPlus in, int version) throws 
IOException
@@ -66,14 +85,35 @@ class SnapshotCommandSerializer implements 
IVersionedSerializer<SnapshotCommand>
         String column_family = in.readUTF();
         String snapshot_name = in.readUTF();
         boolean clear_snapshot = in.readBoolean();
-        return new SnapshotCommand(keyspace, column_family, snapshot_name, 
clear_snapshot);
+        if (version >= MessagingService.VERSION_51)
+        {
+            IPartitioner partitioner = 
Keyspace.open(keyspace).getColumnFamilyStore(column_family).getPartitioner();
+            int count = in.readUnsignedVInt32();
+            List<Range<Token>> ranges = new ArrayList<>(count);
+            for (int i = 0; i < count; i++)
+            {
+                Token start = Token.serializer.deserialize(in, partitioner, 
version);
+                Token end = Token.serializer.deserialize(in, partitioner, 
version);
+                ranges.add(new Range<>(start, end));
+            }
+            return new SnapshotCommand(keyspace, column_family, ranges, 
snapshot_name, clear_snapshot);
+        }
+        return new SnapshotCommand(keyspace, column_family, 
Collections.emptyList(), snapshot_name, clear_snapshot);
     }
 
     public long serializedSize(SnapshotCommand sc, int version)
     {
-        return TypeSizes.sizeof(sc.keyspace)
-             + TypeSizes.sizeof(sc.column_family)
-             + TypeSizes.sizeof(sc.snapshot_name)
-             + TypeSizes.sizeof(sc.clear_snapshot);
+        long size =  TypeSizes.sizeof(sc.keyspace)
+                     + TypeSizes.sizeof(sc.column_family)
+                     + TypeSizes.sizeof(sc.snapshot_name)
+                     + TypeSizes.sizeof(sc.clear_snapshot);
+        if (version >= MessagingService.VERSION_51)
+        {
+            size += TypeSizes.sizeofUnsignedVInt(sc.ranges.size());
+            for (Range<Token> r : sc.ranges)
+                size += Token.serializer.serializedSize(r.left, version)
+                        + Token.serializer.serializedSize(r.right, version);
+        }
+        return size;
     }
 }
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java 
b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 0ee893c010..8709453280 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -1549,16 +1549,16 @@ public final class SystemKeyspace
     public static PaxosRepairHistory loadPaxosRepairHistory(String keyspace, 
String table)
     {
         if (SchemaConstants.LOCAL_SYSTEM_KEYSPACE_NAMES.contains(keyspace))
-            return PaxosRepairHistory.EMPTY;
+            return PaxosRepairHistory.empty(keyspace, table);
 
         UntypedResultSet results = executeInternal(String.format("SELECT * 
FROM system.%s WHERE keyspace_name=? AND table_name=?", PAXOS_REPAIR_HISTORY), 
keyspace, table);
         if (results.isEmpty())
-            return PaxosRepairHistory.EMPTY;
+            return PaxosRepairHistory.empty(keyspace, table);
 
         UntypedResultSet.Row row = Iterables.getOnlyElement(results);
         List<ByteBuffer> points = row.getList("points", BytesType.instance);
 
-        return PaxosRepairHistory.fromTupleBufferList(points);
+        return PaxosRepairHistory.fromTupleBufferList(keyspace, table, points);
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java 
b/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
index 8ddb0ec825..949cf99f09 100644
--- a/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
+++ b/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
@@ -98,7 +98,7 @@ public class RangeFetchMapCalculator
 
     static boolean isTrivial(Range<Token> range)
     {
-        IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
+        IPartitioner partitioner = range.left.getPartitioner();
         if (partitioner.splitter().isPresent())
         {
             BigInteger l = 
partitioner.splitter().get().valueForToken(range.left);
diff --git a/src/java/org/apache/cassandra/net/ParamType.java 
b/src/java/org/apache/cassandra/net/ParamType.java
index 77c0f32771..2367b1a390 100644
--- a/src/java/org/apache/cassandra/net/ParamType.java
+++ b/src/java/org/apache/cassandra/net/ParamType.java
@@ -23,11 +23,9 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.Int32Serializer;
 import org.apache.cassandra.utils.Int64Serializer;
-import org.apache.cassandra.utils.RangesSerializer;
 import org.apache.cassandra.utils.TimeUUID;
 
 import static java.lang.Math.max;
-
 import static 
org.apache.cassandra.locator.InetAddressAndPort.FwdFrmSerializer.fwdFrmSerializer;
 
 /**
@@ -55,7 +53,6 @@ public enum ParamType
     ROW_INDEX_READ_SIZE_FAIL         (12, Int64Serializer.serializer),
     ROW_INDEX_READ_SIZE_WARN         (13, Int64Serializer.serializer),
     CUSTOM_MAP                       (14, CustomParamsSerializer.serializer),
-    SNAPSHOT_RANGES                  (15, RangesSerializer.serializer),
     TOO_MANY_REFERENCED_INDEXES_WARN (16, Int32Serializer.serializer),
     TOO_MANY_REFERENCED_INDEXES_FAIL (17, Int32Serializer.serializer);
 
diff --git a/src/java/org/apache/cassandra/repair/RepairJobDesc.java 
b/src/java/org/apache/cassandra/repair/RepairJobDesc.java
index 796b1c6f57..360d0429e1 100644
--- a/src/java/org/apache/cassandra/repair/RepairJobDesc.java
+++ b/src/java/org/apache/cassandra/repair/RepairJobDesc.java
@@ -35,10 +35,9 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.streaming.PreviewKind;
-import org.apache.cassandra.tcm.ClusterMetadata;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.TimeUUID;
 
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
@@ -80,6 +79,17 @@ public class RepairJobDesc
         return UUID.nameUUIDFromBytes(bytes);
     }
 
+    public IPartitioner partitioner()
+    {
+        return partitioner(this.keyspace, this.columnFamily);
+    }
+
+    public static IPartitioner partitioner(String keyspace, String 
columnFamily)
+    {
+        TableMetadata tm = Schema.instance.getTableMetadata(keyspace, 
columnFamily);
+        return tm != null ? tm.partitioner : IPartitioner.global();
+    }
+
     @Override
     public String toString()
     {
@@ -125,8 +135,6 @@ public class RepairJobDesc
             desc.sessionId.serialize(out);
             out.writeUTF(desc.keyspace);
             out.writeUTF(desc.columnFamily);
-            if (version >= MessagingService.VERSION_51)
-                
out.writeUTF(getPartitioner(desc).getClass().getCanonicalName());
             out.writeInt(desc.ranges.size());
             for (Range<Token> rt : desc.ranges)
                 AbstractBounds.tokenSerializer.serialize(rt, out, version);
@@ -140,8 +148,9 @@ public class RepairJobDesc
             TimeUUID sessionId = TimeUUID.deserialize(in);
             String keyspace = in.readUTF();
             String columnFamily = in.readUTF();
+
             IPartitioner partitioner = version >= MessagingService.VERSION_51
-                                       ? 
FBUtilities.newPartitioner(in.readUTF())
+                                       ? partitioner(keyspace, columnFamily)
                                        : IPartitioner.global();
 
             int nRanges = in.readInt();
@@ -164,11 +173,6 @@ public class RepairJobDesc
             size += TimeUUID.sizeInBytes();
             size += TypeSizes.sizeof(desc.keyspace);
             size += TypeSizes.sizeof(desc.columnFamily);
-            if (version >= MessagingService.VERSION_51)
-            {
-                String partitioner = 
getPartitioner(desc).getClass().getCanonicalName();
-                size += TypeSizes.sizeof(partitioner);
-            }
             size += TypeSizes.sizeof(desc.ranges.size());
             for (Range<Token> rt : desc.ranges)
             {
@@ -177,12 +181,5 @@ public class RepairJobDesc
             return size;
         }
 
-        private IPartitioner getPartitioner(RepairJobDesc desc)
-        {
-            TableMetadata tm = 
ClusterMetadata.current().schema.getKeyspaceMetadata(desc.keyspace)
-                                                               
.getTableOrViewNullable(desc.columnFamily);
-            return tm != null ? tm.partitioner : IPartitioner.global();
-
-        }
     }
 }
diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java 
b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
index 420f6d4028..f74d2b337d 100644
--- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CopyOnWriteArraySet;
@@ -56,7 +57,6 @@ import org.apache.cassandra.locator.RangesAtEndpoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.net.Verb;
@@ -507,11 +507,10 @@ public class LocalSessions
         return buffers;
     }
 
-    private static Range<Token> deserializeRange(ByteBuffer bb)
+    private static Range<Token> deserializeRange(ByteBuffer bb, IPartitioner 
partitioner)
     {
         try (DataInputBuffer in = new DataInputBuffer(bb, false))
         {
-            IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
             Token left = Token.serializer.deserialize(in, partitioner, 0);
             Token right = Token.serializer.deserialize(in, partitioner, 0);
             return new Range<>(left, right);
@@ -522,10 +521,10 @@ public class LocalSessions
         }
     }
 
-    private static Set<Range<Token>> deserializeRanges(Set<ByteBuffer> buffers)
+    private static Set<Range<Token>> deserializeRanges(Set<ByteBuffer> 
buffers, IPartitioner partitioner)
     {
         Set<Range<Token>> ranges = new HashSet<>(buffers.size());
-        buffers.forEach(bb -> ranges.add(deserializeRange(bb)));
+        buffers.forEach(bb -> ranges.add(deserializeRange(bb, partitioner)));
         return ranges;
     }
 
@@ -579,9 +578,13 @@ public class LocalSessions
             row.getInetAddress("coordinator"),
             row.getInt("coordinator_port"));
         builder.withCoordinator(coordinator);
-        builder.withTableIds(uuidToTableId(row.getSet("cfids", 
UUIDType.instance)));
+        Set<TableId> tableIds = uuidToTableId(row.getSet("cfids", 
UUIDType.instance));
+        builder.withTableIds(tableIds);
         builder.withRepairedAt(row.getTimestamp("repaired_at").getTime());
-        builder.withRanges(deserializeRanges(row.getSet("ranges", 
BytesType.instance)));
+        Set<IPartitioner> partitioners = 
tableIds.stream().map(ColumnFamilyStore::getIfExists).filter(Objects::nonNull).map(ColumnFamilyStore::getPartitioner).collect(Collectors.toSet());
+        assert partitioners.size() <= 1 : "Mismatching partitioners for a 
localsession: " + partitioners;
+        IPartitioner partitioner = partitioners.isEmpty() ? 
IPartitioner.global() : partitioners.iterator().next();
+        builder.withRanges(deserializeRanges(row.getSet("ranges", 
BytesType.instance), partitioner));
         //There is no cross version streaming and thus no cross version repair 
so assume that
         //any valid repair sessions has the participants_wp column and any 
that doesn't is malformed
         Set<String> participants = row.getSet("participants_wp", 
UTF8Type.instance);
diff --git a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java 
b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
index 92a1ac4eb8..137147d898 100644
--- a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
@@ -32,10 +32,8 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.MetaStrategy;
 import org.apache.cassandra.repair.RepairJobDesc;
 import org.apache.cassandra.streaming.PreviewKind;
-import org.apache.cassandra.tcm.ClusterMetadata;
 
 import static 
org.apache.cassandra.locator.InetAddressAndPort.Serializer.inetAddressAndPortSerializer;
 
@@ -115,9 +113,7 @@ public class SyncRequest extends RepairMessage
             InetAddressAndPort dst = 
inetAddressAndPortSerializer.deserialize(in, version);
             int rangesCount = in.readInt();
             List<Range<Token>> ranges = new ArrayList<>(rangesCount);
-            IPartitioner partitioner = 
ClusterMetadata.current().schema.getKeyspaceMetadata(desc.keyspace).params.replication.isMeta()
-                                       ? MetaStrategy.partitioner
-                                       : IPartitioner.global();
+            IPartitioner partitioner = desc.partitioner();
             for (int i = 0; i < rangesCount; ++i)
                 ranges.add((Range<Token>) 
AbstractBounds.tokenSerializer.deserialize(in, partitioner, version));
             PreviewKind previewKind = PreviewKind.deserialize(in.readInt());
diff --git a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java 
b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
index 99b5105406..db09578981 100644
--- a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
@@ -17,22 +17,16 @@
  */
 package org.apache.cassandra.service;
 
-import java.util.Collections;
-import java.util.List;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.SnapshotCommand;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.DiagnosticSnapshotService;
 
-import static org.apache.cassandra.net.ParamType.SNAPSHOT_RANGES;
 
 public class SnapshotVerbHandler implements IVerbHandler<SnapshotCommand>
 {
@@ -48,10 +42,7 @@ public class SnapshotVerbHandler implements 
IVerbHandler<SnapshotCommand>
         }
         else if 
(DiagnosticSnapshotService.isDiagnosticSnapshotRequest(command))
         {
-            List<Range<Token>> ranges = Collections.emptyList();
-            if (message.header.params().containsKey(SNAPSHOT_RANGES))
-                ranges = (List<Range<Token>>) 
message.header.params().get(SNAPSHOT_RANGES);
-            DiagnosticSnapshotService.snapshot(command, ranges, 
message.from());
+            DiagnosticSnapshotService.snapshot(command, message.from());
         }
         else
         {
diff --git 
a/src/java/org/apache/cassandra/service/paxos/PaxosRepairHistory.java 
b/src/java/org/apache/cassandra/service/paxos/PaxosRepairHistory.java
index d9e52a7409..fa99419626 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosRepairHistory.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosRepairHistory.java
@@ -27,15 +27,19 @@ import java.util.stream.IntStream;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.marshal.TupleType;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.FBUtilities;
 
 import static java.lang.Math.min;
 import static org.apache.cassandra.service.paxos.Commit.isAfter;
@@ -43,9 +47,6 @@ import static 
org.apache.cassandra.service.paxos.Commit.latest;
 
 public class PaxosRepairHistory
 {
-    public static final PaxosRepairHistory EMPTY = new PaxosRepairHistory(new 
Token[0], new Ballot[] { Ballot.none() });
-    private static final Token.TokenFactory TOKEN_FACTORY = 
DatabaseDescriptor.getPartitioner().getTokenFactory();
-    private static final Token MIN_TOKEN = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
     private static final TupleType TYPE = new 
TupleType(ImmutableList.of(BytesType.instance, BytesType.instance));
 
     /**
@@ -63,12 +64,37 @@ public class PaxosRepairHistory
      *   (t4, MAX_VALUE) => none()
      */
 
+    public final IPartitioner partitioner;
     private final Token[] tokenInclusiveUpperBound;
     private final Ballot[] ballotLowBound; // always one longer to capture 
values up to "MAX_VALUE" (which in some cases doesn't exist, as is infinite)
 
-    PaxosRepairHistory(Token[] tokenInclusiveUpperBound, Ballot[] 
ballotLowBound)
+    private static IPartitioner partitioner(String keyspace, String table)
+    {
+        TableMetadata tm = Schema.instance.getTableMetadata(keyspace, table);
+        return tm != null ? tm.partitioner : IPartitioner.global();
+    }
+
+    @VisibleForTesting
+    public static PaxosRepairHistory empty()
+    {
+        return empty(IPartitioner.global());
+    }
+
+    public static PaxosRepairHistory empty(String keyspace, String table)
+    {
+        return empty(partitioner(keyspace, table));
+    }
+
+    public static PaxosRepairHistory empty(IPartitioner partitioner)
+    {
+        return new PaxosRepairHistory(partitioner, new Token[0], new Ballot[] 
{ Ballot.none() } );
+    }
+
+    PaxosRepairHistory(IPartitioner partitioner,
+                       Token[] tokenInclusiveUpperBound, Ballot[] 
ballotLowBound)
     {
         assert ballotLowBound.length == tokenInclusiveUpperBound.length + 1;
+        this.partitioner = partitioner;
         this.tokenInclusiveUpperBound = tokenInclusiveUpperBound;
         this.ballotLowBound = ballotLowBound;
     }
@@ -158,23 +184,28 @@ public class PaxosRepairHistory
 
     private Token tokenExclusiveLowerBound(int i)
     {
-        return i == 0 ? MIN_TOKEN : tokenInclusiveUpperBound[i - 1];
+        return i == 0 ? partitioner.getMinimumToken() : 
tokenInclusiveUpperBound[i - 1];
     }
 
     private Token tokenInclusiveUpperBound(int i)
     {
-        return i == tokenInclusiveUpperBound.length ? MIN_TOKEN : 
tokenInclusiveUpperBound[i];
+        return i == tokenInclusiveUpperBound.length ? 
partitioner.getMinimumToken() : tokenInclusiveUpperBound[i];
     }
 
     public List<ByteBuffer> toTupleBufferList()
     {
         List<ByteBuffer> tuples = new ArrayList<>(size() + 1);
         for (int i = 0 ; i < 1 + size() ; ++i)
-            
tuples.add(TYPE.pack(TOKEN_FACTORY.toByteArray(tokenInclusiveUpperBound(i)), 
ballotLowBound[i].toBytes()));
+            
tuples.add(TYPE.pack(partitioner.getTokenFactory().toByteArray(tokenInclusiveUpperBound(i)),
 ballotLowBound[i].toBytes()));
         return tuples;
     }
 
-    public static PaxosRepairHistory fromTupleBufferList(List<ByteBuffer> 
tuples)
+    public static PaxosRepairHistory fromTupleBufferList(String keyspace, 
String table, List<ByteBuffer> tuples)
+    {
+        return fromTupleBufferList(partitioner(keyspace, table), tuples);
+    }
+
+    public static PaxosRepairHistory fromTupleBufferList(IPartitioner 
partitioner, List<ByteBuffer> tuples)
     {
         Token[] tokenInclusiveUpperBounds = new Token[tuples.size() - 1];
         Ballot[] ballotLowBounds = new Ballot[tuples.size()];
@@ -182,11 +213,11 @@ public class PaxosRepairHistory
         {
             List<ByteBuffer> elements = TYPE.unpack(tuples.get(i));
             if (i < tokenInclusiveUpperBounds.length)
-                tokenInclusiveUpperBounds[i] = 
TOKEN_FACTORY.fromByteArray(elements.get(0));
+                tokenInclusiveUpperBounds[i] = 
partitioner.getTokenFactory().fromByteArray(elements.get(0));
             ballotLowBounds[i] = Ballot.deserialize(elements.get(1));
         }
 
-        return new PaxosRepairHistory(tokenInclusiveUpperBounds, 
ballotLowBounds);
+        return new PaxosRepairHistory(partitioner, tokenInclusiveUpperBounds, 
ballotLowBounds);
     }
 
     // append the item to the given list, modifying the underlying list
@@ -200,7 +231,11 @@ public class PaxosRepairHistory
         if (historyRight == null)
             return historyLeft;
 
-        Builder builder = new Builder(historyLeft.size() + 
historyRight.size());
+        assert historyLeft.partitioner == historyRight.partitioner : 
String.format("Mismatching partitioners (%s != %s)",
+                                                                               
    historyLeft.partitioner,
+                                                                               
    historyRight.partitioner);
+
+        Builder builder = new Builder(historyLeft.partitioner, 
historyLeft.size() + historyRight.size());
 
         RangeIterator left = historyLeft.rangeIterator();
         RangeIterator right = historyRight.rangeIterator();
@@ -243,7 +278,7 @@ public class PaxosRepairHistory
     public static PaxosRepairHistory add(PaxosRepairHistory existing, 
Collection<Range<Token>> ranges, Ballot ballot)
     {
         ranges = Range.normalize(ranges);
-        Builder builder = new Builder(ranges.size() * 2);
+        Builder builder = new Builder(existing.partitioner, ranges.size() * 2);
         for (Range<Token> range : ranges)
         {
             // don't add a point for an opening min token, since it
@@ -261,7 +296,7 @@ public class PaxosRepairHistory
     @VisibleForTesting
     static PaxosRepairHistory trim(PaxosRepairHistory existing, 
Collection<Range<Token>> ranges)
     {
-        Builder builder = new Builder(existing.size());
+        Builder builder = new Builder(existing.partitioner, existing.size());
 
         ranges = Range.normalize(ranges);
         for (Range<Token> select : ranges)
@@ -310,10 +345,12 @@ public class PaxosRepairHistory
         else return a;
     }
 
-    public static final IVersionedSerializer<PaxosRepairHistory> serializer = 
new IVersionedSerializer<PaxosRepairHistory>()
+    public static final IVersionedSerializer<PaxosRepairHistory> serializer = 
new IVersionedSerializer<>()
     {
         public void serialize(PaxosRepairHistory history, DataOutputPlus out, 
int version) throws IOException
         {
+            if (version >= MessagingService.VERSION_51)
+                
out.writeUTF(history.partitioner.getClass().getCanonicalName());
             out.writeUnsignedVInt32(history.size());
             for (int i = 0; i < history.size() ; ++i)
             {
@@ -325,21 +362,27 @@ public class PaxosRepairHistory
 
         public PaxosRepairHistory deserialize(DataInputPlus in, int version) 
throws IOException
         {
+            IPartitioner partitioner = version >= MessagingService.VERSION_51
+                                       ? 
FBUtilities.newPartitioner(in.readUTF())
+                                       : IPartitioner.global();
             int size = in.readUnsignedVInt32();
             Token[] tokenInclusiveUpperBounds = new Token[size];
             Ballot[] ballotLowBounds = new Ballot[size + 1];
             for (int i = 0; i < size; i++)
             {
-                tokenInclusiveUpperBounds[i] = 
Token.serializer.deserialize(in, DatabaseDescriptor.getPartitioner(), version);
+                tokenInclusiveUpperBounds[i] = 
Token.serializer.deserialize(in, partitioner, version);
                 ballotLowBounds[i] = Ballot.deserialize(in);
             }
             ballotLowBounds[size] = Ballot.deserialize(in);
-            return new PaxosRepairHistory(tokenInclusiveUpperBounds, 
ballotLowBounds);
+            return new PaxosRepairHistory(partitioner, 
tokenInclusiveUpperBounds, ballotLowBounds);
         }
 
         public long serializedSize(PaxosRepairHistory history, int version)
         {
-            long size = TypeSizes.sizeofUnsignedVInt(history.size());
+            long size = 0;
+            if (version >= MessagingService.VERSION_51)
+                size += 
TypeSizes.sizeof(history.partitioner.getClass().getCanonicalName());
+            size += TypeSizes.sizeofUnsignedVInt(history.size());
             for (int i = 0; i < history.size() ; ++i)
             {
                 size += 
Token.serializer.serializedSize(history.tokenInclusiveUpperBound[i], version);
@@ -411,11 +454,13 @@ public class PaxosRepairHistory
 
     static class Builder
     {
+        final IPartitioner partitioner;
         final List<Token> tokenInclusiveUpperBounds;
         final List<Ballot> ballotLowBounds;
 
-        Builder(int capacity)
+        Builder(IPartitioner partitioner, int capacity)
         {
+            this.partitioner = partitioner;
             this.tokenInclusiveUpperBounds = new ArrayList<>(capacity);
             this.ballotLowBounds = new ArrayList<>(capacity + 1);
         }
@@ -473,7 +518,9 @@ public class PaxosRepairHistory
         {
             if (tokenInclusiveUpperBounds.size() == ballotLowBounds.size())
                 ballotLowBounds.add(Ballot.none());
-            return new 
PaxosRepairHistory(tokenInclusiveUpperBounds.toArray(new Token[0]), 
ballotLowBounds.toArray(new Ballot[0]));
+            return new PaxosRepairHistory(partitioner,
+                                          
tokenInclusiveUpperBounds.toArray(new Token[0]),
+                                          ballotLowBounds.toArray(new 
Ballot[0]));
         }
     }
 }
diff --git 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupComplete.java 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupComplete.java
index c0c9a7e1d4..682375668a 100644
--- 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupComplete.java
+++ 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupComplete.java
@@ -24,6 +24,7 @@ import java.util.*;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.RequestFailureReason;
@@ -37,10 +38,10 @@ import org.apache.cassandra.net.RequestCallbackWithFailure;
 import org.apache.cassandra.repair.SharedContext;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.paxos.Ballot;
 import org.apache.cassandra.utils.concurrent.AsyncFuture;
 
-import static org.apache.cassandra.config.DatabaseDescriptor.getPartitioner;
 import static org.apache.cassandra.net.NoPayload.noPayload;
 import static org.apache.cassandra.net.Verb.PAXOS2_CLEANUP_COMPLETE_REQ;
 
@@ -124,11 +125,13 @@ public class PaxosCleanupComplete extends 
AsyncFuture<Void> implements RequestCa
         {
             TableId tableId = TableId.deserialize(in);
             Ballot lowBound = Ballot.deserialize(in);
+            TableMetadata table = Schema.instance.getTableMetadata(tableId);
+            IPartitioner partitioner = table != null ? table.partitioner : 
IPartitioner.global();
             int numRanges = in.readInt();
             List<Range<Token>> ranges = new ArrayList<>();
             for (int i = 0; i < numRanges; i++)
             {
-                Range<Token> range = (Range<Token>) 
AbstractBounds.tokenSerializer.deserialize(in, getPartitioner(), version);
+                Range<Token> range = (Range<Token>) 
AbstractBounds.tokenSerializer.deserialize(in, partitioner, version);
                 ranges.add(range);
             }
             return new Request(tableId, lowBound, ranges);
diff --git 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupHistory.java 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupHistory.java
index 70b4099eb7..04907e7964 100644
--- 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupHistory.java
+++ 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupHistory.java
@@ -40,7 +40,7 @@ public class PaxosCleanupHistory
         this.history = history;
     }
 
-    public static final IVersionedSerializer<PaxosCleanupHistory> serializer = 
new IVersionedSerializer<PaxosCleanupHistory>()
+    public static final IVersionedSerializer<PaxosCleanupHistory> serializer = 
new IVersionedSerializer<>()
     {
         public void serialize(PaxosCleanupHistory message, DataOutputPlus out, 
int version) throws IOException
         {
diff --git 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupRequest.java 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupRequest.java
index 6d3fb731ea..103441940b 100644
--- 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupRequest.java
+++ 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupRequest.java
@@ -31,6 +31,7 @@ import com.google.common.util.concurrent.FutureCallback;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.IVersionedSerializer;
@@ -40,7 +41,9 @@ import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessageFlag;
 import org.apache.cassandra.repair.SharedContext;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.utils.UUIDSerializer;
@@ -129,12 +132,13 @@ public class PaxosCleanupRequest
         {
             UUID session = UUIDSerializer.serializer.deserialize(in, version);
             TableId tableId = TableId.deserialize(in);
-
+            TableMetadata table = Schema.instance.getTableMetadata(tableId);
+            IPartitioner partitioner = table != null ? table.partitioner : 
IPartitioner.global();
             int numRanges = in.readInt();
             List<Range<Token>> ranges = new ArrayList<>(numRanges);
             for (int i=0; i<numRanges; i++)
             {
-                ranges.add((Range<Token>) 
AbstractBounds.tokenSerializer.deserialize(in, 
DatabaseDescriptor.getPartitioner(), version));
+                ranges.add((Range<Token>) 
AbstractBounds.tokenSerializer.deserialize(in, partitioner, version));
             }
             return new PaxosCleanupRequest(session, tableId, ranges);
         }
diff --git 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosStartPrepareCleanup.java
 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosStartPrepareCleanup.java
index 41735526a4..ac9da8e0e7 100644
--- 
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosStartPrepareCleanup.java
+++ 
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosStartPrepareCleanup.java
@@ -24,9 +24,9 @@ import java.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.RequestFailureReason;
@@ -42,6 +42,7 @@ import org.apache.cassandra.net.RequestCallbackWithFailure;
 import org.apache.cassandra.repair.SharedContext;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.paxos.Ballot;
 import org.apache.cassandra.service.paxos.Commit;
 import org.apache.cassandra.service.paxos.PaxosRepairHistory;
@@ -159,12 +160,13 @@ public class PaxosStartPrepareCleanup extends 
AsyncFuture<PaxosCleanupHistory> i
         {
             TableId tableId = TableId.deserialize(in);
             EndpointState epState = EndpointState.serializer.deserialize(in, 
version);
-
+            TableMetadata table = Schema.instance.getTableMetadata(tableId);
+            IPartitioner partitioner = table != null ? table.partitioner : 
IPartitioner.global();
             int numRanges = in.readInt();
             List<Range<Token>> ranges = new ArrayList<>();
             for (int i = 0; i < numRanges; i++)
             {
-                Range<Token> range = (Range<Token>) 
AbstractBounds.tokenSerializer.deserialize(in, 
DatabaseDescriptor.getPartitioner(), version);
+                Range<Token> range = (Range<Token>) 
AbstractBounds.tokenSerializer.deserialize(in, partitioner, version);
                 ranges.add(range);
             }
             return new Request(tableId, epState, ranges);
diff --git 
a/src/java/org/apache/cassandra/service/paxos/uncommitted/PaxosStateTracker.java
 
b/src/java/org/apache/cassandra/service/paxos/uncommitted/PaxosStateTracker.java
index 1b5f6a07fe..69494ee60e 100644
--- 
a/src/java/org/apache/cassandra/service/paxos/uncommitted/PaxosStateTracker.java
+++ 
b/src/java/org/apache/cassandra/service/paxos/uncommitted/PaxosStateTracker.java
@@ -46,6 +46,7 @@ import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.paxos.Ballot;
 import org.apache.cassandra.service.paxos.Commit;
@@ -270,12 +271,13 @@ public class PaxosStateTracker
                         Row row = partition.next();
                         Clustering clustering = row.clustering();
                         String tableName = 
UTF8Type.instance.compose(clustering.get(0), clustering.accessor());
-                        if (Schema.instance.getTableMetadata(keyspaceName, 
tableName) == null)
+                        TableMetadata tm = 
Schema.instance.getTableMetadata(keyspaceName, tableName);
+                        if (tm == null)
                             continue;
 
                         Cell pointsCell = row.getCell(pointsColumn);
                         List<ByteBuffer> points = 
listType.compose(pointsCell.value(), pointsCell.accessor());
-                        PaxosRepairHistory history = 
PaxosRepairHistory.fromTupleBufferList(points);
+                        PaxosRepairHistory history = 
PaxosRepairHistory.fromTupleBufferList(tm.partitioner, points);
                         lowBound = Commit.latest(lowBound, 
history.maxLowBound());
                     }
                 }
diff --git 
a/src/java/org/apache/cassandra/service/paxos/uncommitted/PaxosUncommittedTracker.java
 
b/src/java/org/apache/cassandra/service/paxos/uncommitted/PaxosUncommittedTracker.java
index 062de3cf59..44d147e134 100644
--- 
a/src/java/org/apache/cassandra/service/paxos/uncommitted/PaxosUncommittedTracker.java
+++ 
b/src/java/org/apache/cassandra/service/paxos/uncommitted/PaxosUncommittedTracker.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.memtable.Memtable;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.util.File;
@@ -45,6 +46,7 @@ import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState;
 import org.apache.cassandra.utils.CloseableIterator;
@@ -191,7 +193,12 @@ public class PaxosUncommittedTracker
 
     public CloseableIterator<UncommittedPaxosKey> 
uncommittedKeyIterator(TableId tableId, Collection<Range<Token>> ranges)
     {
-        ranges = (ranges == null || ranges.isEmpty()) ? 
Collections.singleton(FULL_RANGE) : Range.normalize(ranges);
+        TableMetadata table = Schema.instance.getTableMetadata(tableId);
+        if (table == null || table.partitioner != IPartitioner.global())
+            ranges = Collections.singleton(FULL_RANGE);
+        else
+            ranges = (ranges == null || ranges.isEmpty()) ? 
Collections.singleton(FULL_RANGE) : Range.normalize(ranges);
+
         CloseableIterator<PaxosKeyState> updates = 
updateSupplier.repairIterator(tableId, ranges);
 
         try
diff --git 
a/src/java/org/apache/cassandra/service/paxos/uncommitted/UncommittedDataFile.java
 
b/src/java/org/apache/cassandra/service/paxos/uncommitted/UncommittedDataFile.java
index b2a5004b80..e440d60a1b 100644
--- 
a/src/java/org/apache/cassandra/service/paxos/uncommitted/UncommittedDataFile.java
+++ 
b/src/java/org/apache/cassandra/service/paxos/uncommitted/UncommittedDataFile.java
@@ -34,7 +34,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.PeekingIterator;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.dht.Range;
@@ -343,7 +342,7 @@ public class UncommittedDataFile
                 nextKey:
                 while (!reader.isEOF())
                 {
-                    DecoratedKey key = 
DatabaseDescriptor.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(reader));
+                    DecoratedKey key = 
currentRange.left.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(reader));
 
                     while (!currentRange.contains(key))
                     {
diff --git 
a/src/java/org/apache/cassandra/service/paxos/uncommitted/UncommittedTableData.java
 
b/src/java/org/apache/cassandra/service/paxos/uncommitted/UncommittedTableData.java
index 3130333813..bfc31c6d85 100644
--- 
a/src/java/org/apache/cassandra/service/paxos/uncommitted/UncommittedTableData.java
+++ 
b/src/java/org/apache/cassandra/service/paxos/uncommitted/UncommittedTableData.java
@@ -47,6 +47,7 @@ import org.apache.cassandra.concurrent.ExecutorPlus;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.SchemaElement;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.FSReadError;
@@ -109,6 +110,7 @@ public class UncommittedTableData
         private final CloseableIterator<PaxosKeyState> wrapped;
         private final PeekingIterator<PaxosKeyState> peeking;
         private final PeekingIterator<Range<Token>> rangeIterator;
+        private final IPartitioner partitioner;
         private final PaxosRepairHistory.Searcher historySearcher;
 
         FilteringIterator(CloseableIterator<PaxosKeyState> wrapped, 
List<Range<Token>> ranges, PaxosRepairHistory history)
@@ -116,6 +118,7 @@ public class UncommittedTableData
             this.wrapped = wrapped;
             this.peeking = Iterators.peekingIterator(wrapped);
             this.rangeIterator = 
Iterators.peekingIterator(Range.normalize(ranges).iterator());
+            this.partitioner = history.partitioner;
             this.historySearcher = history.searcher();
         }
 
@@ -128,35 +131,25 @@ public class UncommittedTableData
 
                 Range<Token> range = rangeIterator.peek();
 
-                PaxosKeyState peeked = peeking.peek();
-                Token token = peeked.key.getToken();
-
-                // If repairing the distributed metadata log table, we skip 
the filtering of paxos keys where the token
-                // is outside the range of the repair. This check would be 
complicated by the fact that the system.paxos
-                // table keys are tokenized with the global partitioner, but 
the log table uses its own specific
-                // partitioner. This means that the repair ranges will be a 
ReversedLongLocalToken pair, while the
-                // tokens obtained from the PaxosKeyState iterator will be 
whatever the global partitioner uses.
-                // However, as the replicas of the distributed log table (i.e. 
CMS members) always replicate the entire
-                // table, the range check is superfluous in this case anyway.
-                // For the purposes of the actual paxos repair (and for paxos 
reads/writes in general), this mismatch is
-                // also fine as the keys/tokens are opaque to the paxos 
implentation itself.
-                if (range.left.getPartitioner() == MetaStrategy.partitioner)
+                Token token = peeking.peek().key.getToken();
+                if (!range.contains(token))
                 {
-                    assert 
peeked.tableId.equals(DistributedMetadataLogKeyspace.LOG_TABLE_ID);
-                }
-                else
-                {
-                    if (!range.contains(token))
-                    {
-                        if (range.right.compareTo(token) < 0)
-                            rangeIterator.next();
-                        else
-                            peeking.next();
-                        continue;
-                    }
+                    if (!range.right.isMinimum() && 
range.right.compareTo(token) < 0)
+                        rangeIterator.next();
+                    else
+                        peeking.next();
+                    continue;
                 }
 
                 PaxosKeyState next = peeking.next();
+                // If repairing a table with a partioner different from 
IPartitioner.global(), such as the distributed
+                // metadata log table, we don't filter paxos keys outside the 
data range of the repair. Instead, we
+                // repair everything present for that table. Replicas of the 
distributed log table (i.e. CMS members)
+                // always replicate the entire table, so this is not much of 
an issue at present.
+                // In this case, we also need to obtain the appropriate token 
for the paxos key, according to the
+                // table specific partitioner, in order to look up the low 
bound ballot for it the repair history.
+                if (partitioner != IPartitioner.global())
+                    token = partitioner.getToken(next.key.getKey());
 
                 Ballot lowBound = historySearcher.ballotForToken(token);
                 if (Commit.isAfter(lowBound, next.ballot))
@@ -204,6 +197,10 @@ public class UncommittedTableData
             if (table == null)
                 return Range.normalize(FULL_RANGE);
 
+            // for tables using a different partitioner to the globally 
configured one, don't filter anything
+            if (table.getPartitioner() != IPartitioner.global())
+                return Range.normalize(FULL_RANGE);
+
             String ksName = table.getKeyspaceName();
             Collection<Range<Token>> ranges = 
StorageService.instance.getLocalAndPendingRanges(ksName);
 
@@ -218,7 +215,12 @@ public class UncommittedTableData
         {
             ColumnFamilyStore cfs = 
Schema.instance.getColumnFamilyStoreInstance(tableId);
             if (cfs == null)
-                return PaxosRepairHistory.EMPTY;
+            {
+                IPartitioner partitioner = 
tableId.equals(DistributedMetadataLogKeyspace.LOG_TABLE_ID)
+                                           ? MetaStrategy.partitioner
+                                           : IPartitioner.global();
+                return PaxosRepairHistory.empty(partitioner);
+            }
 
             return cfs.getPaxosRepairHistory();
         }
diff --git a/src/java/org/apache/cassandra/utils/DiagnosticSnapshotService.java 
b/src/java/org/apache/cassandra/utils/DiagnosticSnapshotService.java
index 5329ceef0d..dcae575940 100644
--- a/src/java/org/apache/cassandra/utils/DiagnosticSnapshotService.java
+++ b/src/java/org/apache/cassandra/utils/DiagnosticSnapshotService.java
@@ -22,7 +22,10 @@ import java.time.LocalDate;
 import java.time.format.DateTimeFormatter;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -30,7 +33,9 @@ import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.SnapshotCommand;
 import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -41,10 +46,9 @@ import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 
+import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.DIAGNOSTIC_SNAPSHOT_INTERVAL_NANOS;
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
-import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
-import static org.apache.cassandra.net.ParamType.SNAPSHOT_RANGES;
 
 /**
  * Provides a means to take snapshots when triggered by anomalous events or 
when the breaking of invariants is
@@ -108,10 +112,10 @@ public class DiagnosticSnapshotService
             || 
command.snapshot_name.startsWith(DUPLICATE_ROWS_DETECTED_SNAPSHOT_PREFIX);
     }
 
-    public static void snapshot(SnapshotCommand command, List<Range<Token>> 
ranges, InetAddressAndPort initiator)
+    public static void snapshot(SnapshotCommand command, InetAddressAndPort 
initiator)
     {
         Preconditions.checkArgument(isDiagnosticSnapshotRequest(command));
-        instance.maybeSnapshot(command, ranges, initiator);
+        instance.maybeSnapshot(command, command.ranges, initiator);
     }
 
     public static String getSnapshotName(String prefix)
@@ -138,14 +142,16 @@ public class DiagnosticSnapshotService
         long interval = DIAGNOSTIC_SNAPSHOT_INTERVAL_NANOS.getLong();
         if (now - last > interval && cached.compareAndSet(last, now))
         {
+            if (ranges.size() > MAX_SNAPSHOT_RANGE_COUNT)
+                ranges = Collections.emptyList();
+
             Message<SnapshotCommand> msg = Message.out(Verb.SNAPSHOT_REQ,
                                                        new 
SnapshotCommand(metadata.keyspace,
                                                                            
metadata.name,
+                                                                           
ranges,
                                                                            
getSnapshotName(prefix),
                                                                            
false));
 
-            if (!ranges.isEmpty() && ranges.size() < MAX_SNAPSHOT_RANGE_COUNT)
-                msg = msg.withParam(SNAPSHOT_RANGES, ranges);
             for (InetAddressAndPort replica : endpoints)
                 MessagingService.instance().send(msg, replica);
         }
diff --git a/src/java/org/apache/cassandra/utils/RangesSerializer.java 
b/src/java/org/apache/cassandra/utils/RangesSerializer.java
deleted file mode 100644
index 5707503f6b..0000000000
--- a/src/java/org/apache/cassandra/utils/RangesSerializer.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.utils;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputPlus;
-
-public class RangesSerializer implements 
IVersionedSerializer<Collection<Range<Token>>>
-{
-    public static final RangesSerializer serializer = new RangesSerializer();
-
-    @Override
-    public void serialize(Collection<Range<Token>> ranges, DataOutputPlus out, 
int version) throws IOException
-    {
-        out.writeInt(ranges.size());
-        for (Range<Token> r : ranges)
-        {
-            Token.serializer.serialize(r.left, out, version);
-            Token.serializer.serialize(r.right, out, version);
-        }
-    }
-
-    @Override
-    public Collection<Range<Token>> deserialize(DataInputPlus in, int version) 
throws IOException
-    {
-        int count = in.readInt();
-        List<Range<Token>> ranges = new ArrayList<>(count);
-        IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
-        for (int i = 0; i < count; i++)
-        {
-            Token start = Token.serializer.deserialize(in, partitioner, 
version);
-            Token end = Token.serializer.deserialize(in, partitioner, version);
-            ranges.add(new Range<>(start, end));
-        }
-        return ranges;
-    }
-
-    @Override
-    public long serializedSize(Collection<Range<Token>> ranges, int version)
-    {
-        int size = TypeSizes.sizeof(ranges.size());
-        if (ranges.size() > 0)
-            size += ranges.size() * 2 * 
Token.serializer.serializedSize(ranges.iterator().next().left, version);
-        return size;
-    }
-}
diff --git a/test/data/serialization/5.1/service.SyncComplete.bin 
b/test/data/serialization/5.1/service.SyncComplete.bin
index 69ddbdda04..b5f3633e7b 100644
Binary files a/test/data/serialization/5.1/service.SyncComplete.bin and 
b/test/data/serialization/5.1/service.SyncComplete.bin differ
diff --git a/test/data/serialization/5.1/service.SyncRequest.bin 
b/test/data/serialization/5.1/service.SyncRequest.bin
index d1046c1435..f853b20f9c 100644
Binary files a/test/data/serialization/5.1/service.SyncRequest.bin and 
b/test/data/serialization/5.1/service.SyncRequest.bin differ
diff --git a/test/data/serialization/5.1/service.ValidationComplete.bin 
b/test/data/serialization/5.1/service.ValidationComplete.bin
index 85c0940b20..888aa1a5a8 100644
Binary files a/test/data/serialization/5.1/service.ValidationComplete.bin and 
b/test/data/serialization/5.1/service.ValidationComplete.bin differ
diff --git a/test/data/serialization/5.1/service.ValidationRequest.bin 
b/test/data/serialization/5.1/service.ValidationRequest.bin
index 5f4cbf664b..04c492a8a1 100644
Binary files a/test/data/serialization/5.1/service.ValidationRequest.bin and 
b/test/data/serialization/5.1/service.ValidationRequest.bin differ
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/tcm/PaxosRepairTCMTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/tcm/PaxosRepairTCMTest.java
new file mode 100644
index 0000000000..3a2bb0f112
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/tcm/PaxosRepairTCMTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.tcm;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+
+public class PaxosRepairTCMTest extends TestBaseImpl
+{
+    @Test
+    public void paxosRepairWithReversePartitionerTest() throws IOException, 
InterruptedException
+    {
+        try (Cluster cluster = init(Cluster.build(2).withConfig(c -> 
c.with(Feature.NETWORK, Feature.GOSSIP)).start()))
+        {
+            cluster.get(1).nodetoolResult("cms", "reconfigure", 
"2").asserts().success();
+            cluster.schemaChange(withKeyspace("create table %s.tbl (id int 
primary key)"));
+            for (int i = 0; i < 1000; i++)
+                cluster.coordinator(1).execute(withKeyspace("insert into 
%s.tbl (id) values (?)"), ConsistencyLevel.ALL, i);
+            cluster.forEach(i -> i.flush(KEYSPACE));
+
+            // flush system.paxos to create some UncommittedTableData files
+            for (int i = 0; i < 3; i++)
+            {
+                for (int j = 0; j < 3; j++)
+                    cluster.schemaChange(withKeyspace(String.format("alter 
table %%s.tbl with comment = 'comment " + i + j + "'", i, j)));
+                cluster.forEach(inst -> inst.flush("system"));
+            }
+
+            // unflushed rows in the system.paxos memtable
+            for (int i = 0; i < 3; i++)
+                cluster.schemaChange(withKeyspace(String.format("alter table 
%%s.tbl with comment = 'comment x " + i + "'", i)));
+
+            long start = Long.MIN_VALUE;
+            long interval = Long.MAX_VALUE/5;
+            long end = start + interval;
+            for (int i = 0; i < 10; i++)
+            {
+                cluster.get(1).nodetoolResult("repair", "-full", "-st", 
String.valueOf(start), "-et", String.valueOf(end));
+                start += interval;
+                end += interval;
+            }
+
+            cluster.schemaChange(withKeyspace("create table %s.tbl2 (id int 
primary key)"));
+        }
+    }
+}
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreMBeanTest.java 
b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreMBeanTest.java
index 8d15f678e3..006ace0fb5 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreMBeanTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreMBeanTest.java
@@ -77,6 +77,7 @@ public class ColumnFamilyStoreMBeanTest
     public void testInvalidateTokenRangesFormat()
     {
         ColumnFamilyStore store = Mockito.mock(ColumnFamilyStore.class);
+        
Mockito.when(store.getPartitioner()).thenReturn(ByteOrderedPartitioner.instance);
         
Mockito.doCallRealMethod().when(store).forceCompactionForTokenRanges(Mockito.any());
         IPartitioner previous = DatabaseDescriptor.getPartitioner();
         try
diff --git 
a/test/unit/org/apache/cassandra/service/paxos/PaxosRepairHistoryTest.java 
b/test/unit/org/apache/cassandra/service/paxos/PaxosRepairHistoryTest.java
index cd13616ccd..9846895a12 100644
--- a/test/unit/org/apache/cassandra/service/paxos/PaxosRepairHistoryTest.java
+++ b/test/unit/org/apache/cassandra/service/paxos/PaxosRepairHistoryTest.java
@@ -33,6 +33,7 @@ import com.google.common.collect.Lists;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
@@ -116,7 +117,7 @@ public class PaxosRepairHistoryTest
             ballots[i] = points[i].right;
         }
         ballots[length - 1] = length == points.length ? points[length - 
1].right : none();
-        return new PaxosRepairHistory(tokens, ballots);
+        return new PaxosRepairHistory(IPartitioner.global(), tokens, ballots);
     }
 
     static
@@ -127,7 +128,7 @@ public class PaxosRepairHistoryTest
 
     private static class Builder
     {
-        PaxosRepairHistory history = PaxosRepairHistory.EMPTY;
+        PaxosRepairHistory history = PaxosRepairHistory.empty();
 
         Builder add(Ballot ballot, Range<Token>... ranges)
         {
@@ -137,7 +138,7 @@ public class PaxosRepairHistoryTest
 
         Builder clear()
         {
-            history = PaxosRepairHistory.EMPTY;
+            history = PaxosRepairHistory.empty();
             return this;
         }
     }
@@ -149,7 +150,7 @@ public class PaxosRepairHistoryTest
 
     private static void checkSystemTableIO(PaxosRepairHistory history)
     {
-        Assert.assertEquals(history, 
PaxosRepairHistory.fromTupleBufferList(history.toTupleBufferList()));
+        Assert.assertEquals(history, 
PaxosRepairHistory.fromTupleBufferList(IPartitioner.global(), 
history.toTupleBufferList()));
         String tableName = "test" + tableNum.getAndIncrement();
         SystemKeyspace.savePaxosRepairHistory("test", tableName, history, 
false);
         Assert.assertEquals(history, 
SystemKeyspace.loadPaxosRepairHistory("test", tableName));
@@ -245,7 +246,7 @@ public class PaxosRepairHistoryTest
     public void testRegression()
     {
         Assert.assertEquals(none(), trim(
-                new PaxosRepairHistory(
+                new PaxosRepairHistory(IPartitioner.global(),
                         tks(-9223372036854775807L, -3952873730080618203L, 
-1317624576693539401L, 1317624576693539401L, 6588122883467697005L),
                         uuids("1382954c-1dd2-11b2-8fb2-f45d70d6d6d8", 
"138260a4-1dd2-11b2-abb2-c13c36b179e1", "1382951a-1dd2-11b2-1dd8-b7e242b38dbe", 
"138294fc-1dd2-11b2-83c4-43fb3a552386", "13829510-1dd2-11b2-f353-381f2ed963fa", 
"1382954c-1dd2-11b2-8fb2-f45d70d6d6d8")),
                 Collections.singleton(new Range<>(new 
LongToken(-1317624576693539401L), new LongToken(1317624576693539401L))))
@@ -256,8 +257,8 @@ public class PaxosRepairHistoryTest
     public void testInequality()
     {
         Collection<Range<Token>> ranges = Collections.singleton(new 
Range<>(Murmur3Partitioner.MINIMUM, Murmur3Partitioner.MINIMUM));
-        PaxosRepairHistory a = 
PaxosRepairHistory.add(PaxosRepairHistory.EMPTY, ranges, none());
-        PaxosRepairHistory b = 
PaxosRepairHistory.add(PaxosRepairHistory.EMPTY, ranges, nextBallot(NONE));
+        PaxosRepairHistory a = 
PaxosRepairHistory.add(PaxosRepairHistory.empty(), ranges, none());
+        PaxosRepairHistory b = 
PaxosRepairHistory.add(PaxosRepairHistory.empty(), ranges, nextBallot(NONE));
         Assert.assertNotEquals(a, b);
     }
 
@@ -342,7 +343,7 @@ public class PaxosRepairHistoryTest
             }
         }
 
-        PaxosRepairHistory merged = PaxosRepairHistory.EMPTY;
+        PaxosRepairHistory merged = 
PaxosRepairHistory.empty(Murmur3Partitioner.instance);
         for (PaxosRepairHistory split : splits)
             merged = PaxosRepairHistory.merge(merged, split);
 
@@ -418,7 +419,7 @@ public class PaxosRepairHistoryTest
 
     static class RandomPaxosRepairHistory
     {
-        PaxosRepairHistory test = PaxosRepairHistory.EMPTY;
+        PaxosRepairHistory test = 
PaxosRepairHistory.empty(Murmur3Partitioner.instance);
 
         void add(Collection<Range<Token>> ranges, Ballot ballot)
         {
@@ -502,7 +503,7 @@ public class PaxosRepairHistoryTest
 
         void serdeser()
         {
-            PaxosRepairHistory tmp = 
PaxosRepairHistory.fromTupleBufferList(test.toTupleBufferList());
+            PaxosRepairHistory tmp = 
PaxosRepairHistory.fromTupleBufferList(Murmur3Partitioner.instance, 
test.toTupleBufferList());
             Assert.assertEquals(test, tmp);
             test = tmp;
         }
diff --git 
a/test/unit/org/apache/cassandra/service/paxos/uncommitted/UncommittedTableDataTest.java
 
b/test/unit/org/apache/cassandra/service/paxos/uncommitted/UncommittedTableDataTest.java
index 70b1f254c1..e275bcdd89 100644
--- 
a/test/unit/org/apache/cassandra/service/paxos/uncommitted/UncommittedTableDataTest.java
+++ 
b/test/unit/org/apache/cassandra/service/paxos/uncommitted/UncommittedTableDataTest.java
@@ -91,7 +91,7 @@ public class UncommittedTableDataTest
 
         PaxosRepairHistory getPaxosRepairHistory()
         {
-            return PaxosRepairHistory.EMPTY;
+            return PaxosRepairHistory.empty(PARTITIONER);
         }
     };
 
@@ -583,7 +583,7 @@ public class UncommittedTableDataTest
 
             PaxosRepairHistory getPaxosRepairHistory()
             {
-                return PaxosRepairHistory.EMPTY;
+                return PaxosRepairHistory.empty(PARTITIONER);
             }
         });
 
@@ -608,7 +608,7 @@ public class UncommittedTableDataTest
 
             PaxosRepairHistory getPaxosRepairHistory()
             {
-                return PaxosRepairHistory.EMPTY;
+                return PaxosRepairHistory.empty(PARTITIONER);
             }
         });
 
@@ -636,7 +636,7 @@ public class UncommittedTableDataTest
 
             PaxosRepairHistory getPaxosRepairHistory()
             {
-                return PaxosRepairHistory.add(PaxosRepairHistory.EMPTY, 
ALL_RANGES, ballots[1]);
+                return 
PaxosRepairHistory.add(PaxosRepairHistory.empty(PARTITIONER), ALL_RANGES, 
ballots[1]);
             }
         });
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to