This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 2c003710881860bde420d6a2dc1cb71e845bdb28 Author: Sam Tunnicliffe <s...@apache.org> AuthorDate: Fri Apr 26 09:12:38 2024 +0100 Push down repair tokens and partitioner through paxos repair Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson for CASSANDRA-19714 --- CHANGES.txt | 1 + .../org/apache/cassandra/db/ColumnFamilyStore.java | 4 +- src/java/org/apache/cassandra/db/Keyspace.java | 6 ++ .../org/apache/cassandra/db/SnapshotCommand.java | 54 +++++++++++-- .../org/apache/cassandra/db/SystemKeyspace.java | 6 +- .../cassandra/dht/RangeFetchMapCalculator.java | 2 +- src/java/org/apache/cassandra/net/ParamType.java | 3 - .../org/apache/cassandra/repair/RepairJobDesc.java | 31 ++++---- .../cassandra/repair/consistent/LocalSessions.java | 17 ++-- .../cassandra/repair/messages/SyncRequest.java | 6 +- .../cassandra/service/SnapshotVerbHandler.java | 11 +-- .../service/paxos/PaxosRepairHistory.java | 87 ++++++++++++++++----- .../paxos/cleanup/PaxosCleanupComplete.java | 7 +- .../service/paxos/cleanup/PaxosCleanupHistory.java | 2 +- .../service/paxos/cleanup/PaxosCleanupRequest.java | 8 +- .../paxos/cleanup/PaxosStartPrepareCleanup.java | 8 +- .../paxos/uncommitted/PaxosStateTracker.java | 6 +- .../paxos/uncommitted/PaxosUncommittedTracker.java | 9 ++- .../paxos/uncommitted/UncommittedDataFile.java | 3 +- .../paxos/uncommitted/UncommittedTableData.java | 54 +++++++------ .../cassandra/utils/DiagnosticSnapshotService.java | 22 ++++-- .../apache/cassandra/utils/RangesSerializer.java | 73 ----------------- .../serialization/5.1/service.SyncComplete.bin | Bin 344 -> 256 bytes .../data/serialization/5.1/service.SyncRequest.bin | Bin 155 -> 111 bytes .../5.1/service.ValidationComplete.bin | Bin 729 -> 597 bytes .../5.1/service.ValidationRequest.bin | Bin 118 -> 74 bytes .../distributed/test/tcm/PaxosRepairTCMTest.java | 68 ++++++++++++++++ .../cassandra/db/ColumnFamilyStoreMBeanTest.java | 1 + .../service/paxos/PaxosRepairHistoryTest.java | 21 ++--- .../uncommitted/UncommittedTableDataTest.java | 8 +- 30 files changed, 309 insertions(+), 209 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 91d3d51abc..862df73c83 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Use table-specific partitioners during Paxos repair (CASSANDRA-19714) * Expose current compaction throughput in nodetool (CASSANDRA-13890) * CEP-24 Password validation / generation (CASSANDRA-17457) * Reconfigure CMS after replacement, bootstrap and move operations (CASSANDRA-19705) diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 56d64d009c..d56d690f4a 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -2495,7 +2495,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner @Override public void forceCompactionForTokenRanges(String... strings) { - CompactionManager.instance.forceCompactionForTokenRange(this, toTokenRanges(DatabaseDescriptor.getPartitioner(), strings)); + CompactionManager.instance.forceCompactionForTokenRange(this, toTokenRanges(getPartitioner(), strings)); } static Set<Range<Token>> toTokenRanges(IPartitioner partitioner, String... strings) @@ -3340,7 +3340,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner if (ksName == null || cfName == null) return null; - Keyspace keyspace = Keyspace.open(ksName); + Keyspace keyspace = Keyspace.openIfExists(ksName); if (keyspace == null) return null; diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index 551a4a77b2..7a18f112ba 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -149,6 +149,12 @@ public class Keyspace return ks; } + public static Keyspace openIfExists(String keyspaceName) + { + assert initialized || SchemaConstants.isLocalSystemKeyspace(keyspaceName) : "Initialized: " + initialized; + return Schema.instance.getKeyspaceInstance(keyspaceName); + } + // to only be used by org.apache.cassandra.tools.Standalone* classes public static Keyspace openWithoutSSTables(String keyspaceName) { diff --git a/src/java/org/apache/cassandra/db/SnapshotCommand.java b/src/java/org/apache/cassandra/db/SnapshotCommand.java index e909e50c94..a5e522abff 100644 --- a/src/java/org/apache/cassandra/db/SnapshotCommand.java +++ b/src/java/org/apache/cassandra/db/SnapshotCommand.java @@ -18,10 +18,17 @@ package org.apache.cassandra.db; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; public class SnapshotCommand { @@ -31,13 +38,15 @@ public class SnapshotCommand public final String column_family; public final String snapshot_name; public final boolean clear_snapshot; + public final List<Range<Token>> ranges; - public SnapshotCommand(String keyspace, String columnFamily, String snapshotName, boolean clearSnapshot) + public SnapshotCommand(String keyspace, String columnFamily, List<Range<Token>> ranges, String snapshotName, boolean clearSnapshot) { this.keyspace = keyspace; this.column_family = columnFamily; this.snapshot_name = snapshotName; this.clear_snapshot = clearSnapshot; + this.ranges = ranges; } @Override @@ -46,7 +55,8 @@ public class SnapshotCommand return "SnapshotCommand{" + "keyspace='" + keyspace + '\'' + ", column_family='" + column_family + '\'' + ", snapshot_name=" + snapshot_name + - ", clear_snapshot=" + clear_snapshot + '}'; + ", clear_snapshot=" + clear_snapshot + + ", ranges=" + ranges + '}'; } } @@ -58,6 +68,15 @@ class SnapshotCommandSerializer implements IVersionedSerializer<SnapshotCommand> out.writeUTF(snapshot_command.column_family); out.writeUTF(snapshot_command.snapshot_name); out.writeBoolean(snapshot_command.clear_snapshot); + if (version >= MessagingService.VERSION_51) + { + out.writeUnsignedVInt32(snapshot_command.ranges.size()); + for (Range<Token> r : snapshot_command.ranges) + { + Token.serializer.serialize(r.left, out, version); + Token.serializer.serialize(r.right, out, version); + } + } } public SnapshotCommand deserialize(DataInputPlus in, int version) throws IOException @@ -66,14 +85,35 @@ class SnapshotCommandSerializer implements IVersionedSerializer<SnapshotCommand> String column_family = in.readUTF(); String snapshot_name = in.readUTF(); boolean clear_snapshot = in.readBoolean(); - return new SnapshotCommand(keyspace, column_family, snapshot_name, clear_snapshot); + if (version >= MessagingService.VERSION_51) + { + IPartitioner partitioner = Keyspace.open(keyspace).getColumnFamilyStore(column_family).getPartitioner(); + int count = in.readUnsignedVInt32(); + List<Range<Token>> ranges = new ArrayList<>(count); + for (int i = 0; i < count; i++) + { + Token start = Token.serializer.deserialize(in, partitioner, version); + Token end = Token.serializer.deserialize(in, partitioner, version); + ranges.add(new Range<>(start, end)); + } + return new SnapshotCommand(keyspace, column_family, ranges, snapshot_name, clear_snapshot); + } + return new SnapshotCommand(keyspace, column_family, Collections.emptyList(), snapshot_name, clear_snapshot); } public long serializedSize(SnapshotCommand sc, int version) { - return TypeSizes.sizeof(sc.keyspace) - + TypeSizes.sizeof(sc.column_family) - + TypeSizes.sizeof(sc.snapshot_name) - + TypeSizes.sizeof(sc.clear_snapshot); + long size = TypeSizes.sizeof(sc.keyspace) + + TypeSizes.sizeof(sc.column_family) + + TypeSizes.sizeof(sc.snapshot_name) + + TypeSizes.sizeof(sc.clear_snapshot); + if (version >= MessagingService.VERSION_51) + { + size += TypeSizes.sizeofUnsignedVInt(sc.ranges.size()); + for (Range<Token> r : sc.ranges) + size += Token.serializer.serializedSize(r.left, version) + + Token.serializer.serializedSize(r.right, version); + } + return size; } } diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 0ee893c010..8709453280 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -1549,16 +1549,16 @@ public final class SystemKeyspace public static PaxosRepairHistory loadPaxosRepairHistory(String keyspace, String table) { if (SchemaConstants.LOCAL_SYSTEM_KEYSPACE_NAMES.contains(keyspace)) - return PaxosRepairHistory.EMPTY; + return PaxosRepairHistory.empty(keyspace, table); UntypedResultSet results = executeInternal(String.format("SELECT * FROM system.%s WHERE keyspace_name=? AND table_name=?", PAXOS_REPAIR_HISTORY), keyspace, table); if (results.isEmpty()) - return PaxosRepairHistory.EMPTY; + return PaxosRepairHistory.empty(keyspace, table); UntypedResultSet.Row row = Iterables.getOnlyElement(results); List<ByteBuffer> points = row.getList("points", BytesType.instance); - return PaxosRepairHistory.fromTupleBufferList(points); + return PaxosRepairHistory.fromTupleBufferList(keyspace, table, points); } /** diff --git a/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java b/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java index 8ddb0ec825..949cf99f09 100644 --- a/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java +++ b/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java @@ -98,7 +98,7 @@ public class RangeFetchMapCalculator static boolean isTrivial(Range<Token> range) { - IPartitioner partitioner = DatabaseDescriptor.getPartitioner(); + IPartitioner partitioner = range.left.getPartitioner(); if (partitioner.splitter().isPresent()) { BigInteger l = partitioner.splitter().get().valueForToken(range.left); diff --git a/src/java/org/apache/cassandra/net/ParamType.java b/src/java/org/apache/cassandra/net/ParamType.java index 77c0f32771..2367b1a390 100644 --- a/src/java/org/apache/cassandra/net/ParamType.java +++ b/src/java/org/apache/cassandra/net/ParamType.java @@ -23,11 +23,9 @@ import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.Int32Serializer; import org.apache.cassandra.utils.Int64Serializer; -import org.apache.cassandra.utils.RangesSerializer; import org.apache.cassandra.utils.TimeUUID; import static java.lang.Math.max; - import static org.apache.cassandra.locator.InetAddressAndPort.FwdFrmSerializer.fwdFrmSerializer; /** @@ -55,7 +53,6 @@ public enum ParamType ROW_INDEX_READ_SIZE_FAIL (12, Int64Serializer.serializer), ROW_INDEX_READ_SIZE_WARN (13, Int64Serializer.serializer), CUSTOM_MAP (14, CustomParamsSerializer.serializer), - SNAPSHOT_RANGES (15, RangesSerializer.serializer), TOO_MANY_REFERENCED_INDEXES_WARN (16, Int32Serializer.serializer), TOO_MANY_REFERENCED_INDEXES_FAIL (17, Int32Serializer.serializer); diff --git a/src/java/org/apache/cassandra/repair/RepairJobDesc.java b/src/java/org/apache/cassandra/repair/RepairJobDesc.java index 796b1c6f57..360d0429e1 100644 --- a/src/java/org/apache/cassandra/repair/RepairJobDesc.java +++ b/src/java/org/apache/cassandra/repair/RepairJobDesc.java @@ -35,10 +35,9 @@ import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.streaming.PreviewKind; -import org.apache.cassandra.tcm.ClusterMetadata; -import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.TimeUUID; import static org.apache.cassandra.utils.ByteBufferUtil.bytes; @@ -80,6 +79,17 @@ public class RepairJobDesc return UUID.nameUUIDFromBytes(bytes); } + public IPartitioner partitioner() + { + return partitioner(this.keyspace, this.columnFamily); + } + + public static IPartitioner partitioner(String keyspace, String columnFamily) + { + TableMetadata tm = Schema.instance.getTableMetadata(keyspace, columnFamily); + return tm != null ? tm.partitioner : IPartitioner.global(); + } + @Override public String toString() { @@ -125,8 +135,6 @@ public class RepairJobDesc desc.sessionId.serialize(out); out.writeUTF(desc.keyspace); out.writeUTF(desc.columnFamily); - if (version >= MessagingService.VERSION_51) - out.writeUTF(getPartitioner(desc).getClass().getCanonicalName()); out.writeInt(desc.ranges.size()); for (Range<Token> rt : desc.ranges) AbstractBounds.tokenSerializer.serialize(rt, out, version); @@ -140,8 +148,9 @@ public class RepairJobDesc TimeUUID sessionId = TimeUUID.deserialize(in); String keyspace = in.readUTF(); String columnFamily = in.readUTF(); + IPartitioner partitioner = version >= MessagingService.VERSION_51 - ? FBUtilities.newPartitioner(in.readUTF()) + ? partitioner(keyspace, columnFamily) : IPartitioner.global(); int nRanges = in.readInt(); @@ -164,11 +173,6 @@ public class RepairJobDesc size += TimeUUID.sizeInBytes(); size += TypeSizes.sizeof(desc.keyspace); size += TypeSizes.sizeof(desc.columnFamily); - if (version >= MessagingService.VERSION_51) - { - String partitioner = getPartitioner(desc).getClass().getCanonicalName(); - size += TypeSizes.sizeof(partitioner); - } size += TypeSizes.sizeof(desc.ranges.size()); for (Range<Token> rt : desc.ranges) { @@ -177,12 +181,5 @@ public class RepairJobDesc return size; } - private IPartitioner getPartitioner(RepairJobDesc desc) - { - TableMetadata tm = ClusterMetadata.current().schema.getKeyspaceMetadata(desc.keyspace) - .getTableOrViewNullable(desc.columnFamily); - return tm != null ? tm.partitioner : IPartitioner.global(); - - } } } diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java index 420f6d4028..f74d2b337d 100644 --- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java +++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.UUID; import java.util.concurrent.CopyOnWriteArraySet; @@ -56,7 +57,6 @@ import org.apache.cassandra.locator.RangesAtEndpoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.locator.Replica; import org.apache.cassandra.net.Verb; @@ -507,11 +507,10 @@ public class LocalSessions return buffers; } - private static Range<Token> deserializeRange(ByteBuffer bb) + private static Range<Token> deserializeRange(ByteBuffer bb, IPartitioner partitioner) { try (DataInputBuffer in = new DataInputBuffer(bb, false)) { - IPartitioner partitioner = DatabaseDescriptor.getPartitioner(); Token left = Token.serializer.deserialize(in, partitioner, 0); Token right = Token.serializer.deserialize(in, partitioner, 0); return new Range<>(left, right); @@ -522,10 +521,10 @@ public class LocalSessions } } - private static Set<Range<Token>> deserializeRanges(Set<ByteBuffer> buffers) + private static Set<Range<Token>> deserializeRanges(Set<ByteBuffer> buffers, IPartitioner partitioner) { Set<Range<Token>> ranges = new HashSet<>(buffers.size()); - buffers.forEach(bb -> ranges.add(deserializeRange(bb))); + buffers.forEach(bb -> ranges.add(deserializeRange(bb, partitioner))); return ranges; } @@ -579,9 +578,13 @@ public class LocalSessions row.getInetAddress("coordinator"), row.getInt("coordinator_port")); builder.withCoordinator(coordinator); - builder.withTableIds(uuidToTableId(row.getSet("cfids", UUIDType.instance))); + Set<TableId> tableIds = uuidToTableId(row.getSet("cfids", UUIDType.instance)); + builder.withTableIds(tableIds); builder.withRepairedAt(row.getTimestamp("repaired_at").getTime()); - builder.withRanges(deserializeRanges(row.getSet("ranges", BytesType.instance))); + Set<IPartitioner> partitioners = tableIds.stream().map(ColumnFamilyStore::getIfExists).filter(Objects::nonNull).map(ColumnFamilyStore::getPartitioner).collect(Collectors.toSet()); + assert partitioners.size() <= 1 : "Mismatching partitioners for a localsession: " + partitioners; + IPartitioner partitioner = partitioners.isEmpty() ? IPartitioner.global() : partitioners.iterator().next(); + builder.withRanges(deserializeRanges(row.getSet("ranges", BytesType.instance), partitioner)); //There is no cross version streaming and thus no cross version repair so assume that //any valid repair sessions has the participants_wp column and any that doesn't is malformed Set<String> participants = row.getSet("participants_wp", UTF8Type.instance); diff --git a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java index 92a1ac4eb8..137147d898 100644 --- a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java +++ b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java @@ -32,10 +32,8 @@ import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.locator.MetaStrategy; import org.apache.cassandra.repair.RepairJobDesc; import org.apache.cassandra.streaming.PreviewKind; -import org.apache.cassandra.tcm.ClusterMetadata; import static org.apache.cassandra.locator.InetAddressAndPort.Serializer.inetAddressAndPortSerializer; @@ -115,9 +113,7 @@ public class SyncRequest extends RepairMessage InetAddressAndPort dst = inetAddressAndPortSerializer.deserialize(in, version); int rangesCount = in.readInt(); List<Range<Token>> ranges = new ArrayList<>(rangesCount); - IPartitioner partitioner = ClusterMetadata.current().schema.getKeyspaceMetadata(desc.keyspace).params.replication.isMeta() - ? MetaStrategy.partitioner - : IPartitioner.global(); + IPartitioner partitioner = desc.partitioner(); for (int i = 0; i < rangesCount; ++i) ranges.add((Range<Token>) AbstractBounds.tokenSerializer.deserialize(in, partitioner, version)); PreviewKind previewKind = PreviewKind.deserialize(in.readInt()); diff --git a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java index 99b5105406..db09578981 100644 --- a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java +++ b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java @@ -17,22 +17,16 @@ */ package org.apache.cassandra.service; -import java.util.Collections; -import java.util.List; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.SnapshotCommand; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.DiagnosticSnapshotService; -import static org.apache.cassandra.net.ParamType.SNAPSHOT_RANGES; public class SnapshotVerbHandler implements IVerbHandler<SnapshotCommand> { @@ -48,10 +42,7 @@ public class SnapshotVerbHandler implements IVerbHandler<SnapshotCommand> } else if (DiagnosticSnapshotService.isDiagnosticSnapshotRequest(command)) { - List<Range<Token>> ranges = Collections.emptyList(); - if (message.header.params().containsKey(SNAPSHOT_RANGES)) - ranges = (List<Range<Token>>) message.header.params().get(SNAPSHOT_RANGES); - DiagnosticSnapshotService.snapshot(command, ranges, message.from()); + DiagnosticSnapshotService.snapshot(command, message.from()); } else { diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosRepairHistory.java b/src/java/org/apache/cassandra/service/paxos/PaxosRepairHistory.java index d9e52a7409..fa99419626 100644 --- a/src/java/org/apache/cassandra/service/paxos/PaxosRepairHistory.java +++ b/src/java/org/apache/cassandra/service/paxos/PaxosRepairHistory.java @@ -27,15 +27,19 @@ import java.util.stream.IntStream; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.marshal.TupleType; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.utils.FBUtilities; import static java.lang.Math.min; import static org.apache.cassandra.service.paxos.Commit.isAfter; @@ -43,9 +47,6 @@ import static org.apache.cassandra.service.paxos.Commit.latest; public class PaxosRepairHistory { - public static final PaxosRepairHistory EMPTY = new PaxosRepairHistory(new Token[0], new Ballot[] { Ballot.none() }); - private static final Token.TokenFactory TOKEN_FACTORY = DatabaseDescriptor.getPartitioner().getTokenFactory(); - private static final Token MIN_TOKEN = DatabaseDescriptor.getPartitioner().getMinimumToken(); private static final TupleType TYPE = new TupleType(ImmutableList.of(BytesType.instance, BytesType.instance)); /** @@ -63,12 +64,37 @@ public class PaxosRepairHistory * (t4, MAX_VALUE) => none() */ + public final IPartitioner partitioner; private final Token[] tokenInclusiveUpperBound; private final Ballot[] ballotLowBound; // always one longer to capture values up to "MAX_VALUE" (which in some cases doesn't exist, as is infinite) - PaxosRepairHistory(Token[] tokenInclusiveUpperBound, Ballot[] ballotLowBound) + private static IPartitioner partitioner(String keyspace, String table) + { + TableMetadata tm = Schema.instance.getTableMetadata(keyspace, table); + return tm != null ? tm.partitioner : IPartitioner.global(); + } + + @VisibleForTesting + public static PaxosRepairHistory empty() + { + return empty(IPartitioner.global()); + } + + public static PaxosRepairHistory empty(String keyspace, String table) + { + return empty(partitioner(keyspace, table)); + } + + public static PaxosRepairHistory empty(IPartitioner partitioner) + { + return new PaxosRepairHistory(partitioner, new Token[0], new Ballot[] { Ballot.none() } ); + } + + PaxosRepairHistory(IPartitioner partitioner, + Token[] tokenInclusiveUpperBound, Ballot[] ballotLowBound) { assert ballotLowBound.length == tokenInclusiveUpperBound.length + 1; + this.partitioner = partitioner; this.tokenInclusiveUpperBound = tokenInclusiveUpperBound; this.ballotLowBound = ballotLowBound; } @@ -158,23 +184,28 @@ public class PaxosRepairHistory private Token tokenExclusiveLowerBound(int i) { - return i == 0 ? MIN_TOKEN : tokenInclusiveUpperBound[i - 1]; + return i == 0 ? partitioner.getMinimumToken() : tokenInclusiveUpperBound[i - 1]; } private Token tokenInclusiveUpperBound(int i) { - return i == tokenInclusiveUpperBound.length ? MIN_TOKEN : tokenInclusiveUpperBound[i]; + return i == tokenInclusiveUpperBound.length ? partitioner.getMinimumToken() : tokenInclusiveUpperBound[i]; } public List<ByteBuffer> toTupleBufferList() { List<ByteBuffer> tuples = new ArrayList<>(size() + 1); for (int i = 0 ; i < 1 + size() ; ++i) - tuples.add(TYPE.pack(TOKEN_FACTORY.toByteArray(tokenInclusiveUpperBound(i)), ballotLowBound[i].toBytes())); + tuples.add(TYPE.pack(partitioner.getTokenFactory().toByteArray(tokenInclusiveUpperBound(i)), ballotLowBound[i].toBytes())); return tuples; } - public static PaxosRepairHistory fromTupleBufferList(List<ByteBuffer> tuples) + public static PaxosRepairHistory fromTupleBufferList(String keyspace, String table, List<ByteBuffer> tuples) + { + return fromTupleBufferList(partitioner(keyspace, table), tuples); + } + + public static PaxosRepairHistory fromTupleBufferList(IPartitioner partitioner, List<ByteBuffer> tuples) { Token[] tokenInclusiveUpperBounds = new Token[tuples.size() - 1]; Ballot[] ballotLowBounds = new Ballot[tuples.size()]; @@ -182,11 +213,11 @@ public class PaxosRepairHistory { List<ByteBuffer> elements = TYPE.unpack(tuples.get(i)); if (i < tokenInclusiveUpperBounds.length) - tokenInclusiveUpperBounds[i] = TOKEN_FACTORY.fromByteArray(elements.get(0)); + tokenInclusiveUpperBounds[i] = partitioner.getTokenFactory().fromByteArray(elements.get(0)); ballotLowBounds[i] = Ballot.deserialize(elements.get(1)); } - return new PaxosRepairHistory(tokenInclusiveUpperBounds, ballotLowBounds); + return new PaxosRepairHistory(partitioner, tokenInclusiveUpperBounds, ballotLowBounds); } // append the item to the given list, modifying the underlying list @@ -200,7 +231,11 @@ public class PaxosRepairHistory if (historyRight == null) return historyLeft; - Builder builder = new Builder(historyLeft.size() + historyRight.size()); + assert historyLeft.partitioner == historyRight.partitioner : String.format("Mismatching partitioners (%s != %s)", + historyLeft.partitioner, + historyRight.partitioner); + + Builder builder = new Builder(historyLeft.partitioner, historyLeft.size() + historyRight.size()); RangeIterator left = historyLeft.rangeIterator(); RangeIterator right = historyRight.rangeIterator(); @@ -243,7 +278,7 @@ public class PaxosRepairHistory public static PaxosRepairHistory add(PaxosRepairHistory existing, Collection<Range<Token>> ranges, Ballot ballot) { ranges = Range.normalize(ranges); - Builder builder = new Builder(ranges.size() * 2); + Builder builder = new Builder(existing.partitioner, ranges.size() * 2); for (Range<Token> range : ranges) { // don't add a point for an opening min token, since it @@ -261,7 +296,7 @@ public class PaxosRepairHistory @VisibleForTesting static PaxosRepairHistory trim(PaxosRepairHistory existing, Collection<Range<Token>> ranges) { - Builder builder = new Builder(existing.size()); + Builder builder = new Builder(existing.partitioner, existing.size()); ranges = Range.normalize(ranges); for (Range<Token> select : ranges) @@ -310,10 +345,12 @@ public class PaxosRepairHistory else return a; } - public static final IVersionedSerializer<PaxosRepairHistory> serializer = new IVersionedSerializer<PaxosRepairHistory>() + public static final IVersionedSerializer<PaxosRepairHistory> serializer = new IVersionedSerializer<>() { public void serialize(PaxosRepairHistory history, DataOutputPlus out, int version) throws IOException { + if (version >= MessagingService.VERSION_51) + out.writeUTF(history.partitioner.getClass().getCanonicalName()); out.writeUnsignedVInt32(history.size()); for (int i = 0; i < history.size() ; ++i) { @@ -325,21 +362,27 @@ public class PaxosRepairHistory public PaxosRepairHistory deserialize(DataInputPlus in, int version) throws IOException { + IPartitioner partitioner = version >= MessagingService.VERSION_51 + ? FBUtilities.newPartitioner(in.readUTF()) + : IPartitioner.global(); int size = in.readUnsignedVInt32(); Token[] tokenInclusiveUpperBounds = new Token[size]; Ballot[] ballotLowBounds = new Ballot[size + 1]; for (int i = 0; i < size; i++) { - tokenInclusiveUpperBounds[i] = Token.serializer.deserialize(in, DatabaseDescriptor.getPartitioner(), version); + tokenInclusiveUpperBounds[i] = Token.serializer.deserialize(in, partitioner, version); ballotLowBounds[i] = Ballot.deserialize(in); } ballotLowBounds[size] = Ballot.deserialize(in); - return new PaxosRepairHistory(tokenInclusiveUpperBounds, ballotLowBounds); + return new PaxosRepairHistory(partitioner, tokenInclusiveUpperBounds, ballotLowBounds); } public long serializedSize(PaxosRepairHistory history, int version) { - long size = TypeSizes.sizeofUnsignedVInt(history.size()); + long size = 0; + if (version >= MessagingService.VERSION_51) + size += TypeSizes.sizeof(history.partitioner.getClass().getCanonicalName()); + size += TypeSizes.sizeofUnsignedVInt(history.size()); for (int i = 0; i < history.size() ; ++i) { size += Token.serializer.serializedSize(history.tokenInclusiveUpperBound[i], version); @@ -411,11 +454,13 @@ public class PaxosRepairHistory static class Builder { + final IPartitioner partitioner; final List<Token> tokenInclusiveUpperBounds; final List<Ballot> ballotLowBounds; - Builder(int capacity) + Builder(IPartitioner partitioner, int capacity) { + this.partitioner = partitioner; this.tokenInclusiveUpperBounds = new ArrayList<>(capacity); this.ballotLowBounds = new ArrayList<>(capacity + 1); } @@ -473,7 +518,9 @@ public class PaxosRepairHistory { if (tokenInclusiveUpperBounds.size() == ballotLowBounds.size()) ballotLowBounds.add(Ballot.none()); - return new PaxosRepairHistory(tokenInclusiveUpperBounds.toArray(new Token[0]), ballotLowBounds.toArray(new Ballot[0])); + return new PaxosRepairHistory(partitioner, + tokenInclusiveUpperBounds.toArray(new Token[0]), + ballotLowBounds.toArray(new Ballot[0])); } } } diff --git a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupComplete.java b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupComplete.java index c0c9a7e1d4..682375668a 100644 --- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupComplete.java +++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupComplete.java @@ -24,6 +24,7 @@ import java.util.*; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.RequestFailureReason; @@ -37,10 +38,10 @@ import org.apache.cassandra.net.RequestCallbackWithFailure; import org.apache.cassandra.repair.SharedContext; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.paxos.Ballot; import org.apache.cassandra.utils.concurrent.AsyncFuture; -import static org.apache.cassandra.config.DatabaseDescriptor.getPartitioner; import static org.apache.cassandra.net.NoPayload.noPayload; import static org.apache.cassandra.net.Verb.PAXOS2_CLEANUP_COMPLETE_REQ; @@ -124,11 +125,13 @@ public class PaxosCleanupComplete extends AsyncFuture<Void> implements RequestCa { TableId tableId = TableId.deserialize(in); Ballot lowBound = Ballot.deserialize(in); + TableMetadata table = Schema.instance.getTableMetadata(tableId); + IPartitioner partitioner = table != null ? table.partitioner : IPartitioner.global(); int numRanges = in.readInt(); List<Range<Token>> ranges = new ArrayList<>(); for (int i = 0; i < numRanges; i++) { - Range<Token> range = (Range<Token>) AbstractBounds.tokenSerializer.deserialize(in, getPartitioner(), version); + Range<Token> range = (Range<Token>) AbstractBounds.tokenSerializer.deserialize(in, partitioner, version); ranges.add(range); } return new Request(tableId, lowBound, ranges); diff --git a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupHistory.java b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupHistory.java index 70b4099eb7..04907e7964 100644 --- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupHistory.java +++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupHistory.java @@ -40,7 +40,7 @@ public class PaxosCleanupHistory this.history = history; } - public static final IVersionedSerializer<PaxosCleanupHistory> serializer = new IVersionedSerializer<PaxosCleanupHistory>() + public static final IVersionedSerializer<PaxosCleanupHistory> serializer = new IVersionedSerializer<>() { public void serialize(PaxosCleanupHistory message, DataOutputPlus out, int version) throws IOException { diff --git a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupRequest.java b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupRequest.java index 6d3fb731ea..103441940b 100644 --- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupRequest.java +++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanupRequest.java @@ -31,6 +31,7 @@ import com.google.common.util.concurrent.FutureCallback; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.IVersionedSerializer; @@ -40,7 +41,9 @@ import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessageFlag; import org.apache.cassandra.repair.SharedContext; +import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.utils.UUIDSerializer; @@ -129,12 +132,13 @@ public class PaxosCleanupRequest { UUID session = UUIDSerializer.serializer.deserialize(in, version); TableId tableId = TableId.deserialize(in); - + TableMetadata table = Schema.instance.getTableMetadata(tableId); + IPartitioner partitioner = table != null ? table.partitioner : IPartitioner.global(); int numRanges = in.readInt(); List<Range<Token>> ranges = new ArrayList<>(numRanges); for (int i=0; i<numRanges; i++) { - ranges.add((Range<Token>) AbstractBounds.tokenSerializer.deserialize(in, DatabaseDescriptor.getPartitioner(), version)); + ranges.add((Range<Token>) AbstractBounds.tokenSerializer.deserialize(in, partitioner, version)); } return new PaxosCleanupRequest(session, tableId, ranges); } diff --git a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosStartPrepareCleanup.java b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosStartPrepareCleanup.java index 41735526a4..ac9da8e0e7 100644 --- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosStartPrepareCleanup.java +++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosStartPrepareCleanup.java @@ -24,9 +24,9 @@ import java.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.RequestFailureReason; @@ -42,6 +42,7 @@ import org.apache.cassandra.net.RequestCallbackWithFailure; import org.apache.cassandra.repair.SharedContext; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.paxos.Ballot; import org.apache.cassandra.service.paxos.Commit; import org.apache.cassandra.service.paxos.PaxosRepairHistory; @@ -159,12 +160,13 @@ public class PaxosStartPrepareCleanup extends AsyncFuture<PaxosCleanupHistory> i { TableId tableId = TableId.deserialize(in); EndpointState epState = EndpointState.serializer.deserialize(in, version); - + TableMetadata table = Schema.instance.getTableMetadata(tableId); + IPartitioner partitioner = table != null ? table.partitioner : IPartitioner.global(); int numRanges = in.readInt(); List<Range<Token>> ranges = new ArrayList<>(); for (int i = 0; i < numRanges; i++) { - Range<Token> range = (Range<Token>) AbstractBounds.tokenSerializer.deserialize(in, DatabaseDescriptor.getPartitioner(), version); + Range<Token> range = (Range<Token>) AbstractBounds.tokenSerializer.deserialize(in, partitioner, version); ranges.add(range); } return new Request(tableId, epState, ranges); diff --git a/src/java/org/apache/cassandra/service/paxos/uncommitted/PaxosStateTracker.java b/src/java/org/apache/cassandra/service/paxos/uncommitted/PaxosStateTracker.java index 1b5f6a07fe..69494ee60e 100644 --- a/src/java/org/apache/cassandra/service/paxos/uncommitted/PaxosStateTracker.java +++ b/src/java/org/apache/cassandra/service/paxos/uncommitted/PaxosStateTracker.java @@ -46,6 +46,7 @@ import org.apache.cassandra.db.rows.RowIterator; import org.apache.cassandra.io.util.File; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.paxos.Ballot; import org.apache.cassandra.service.paxos.Commit; @@ -270,12 +271,13 @@ public class PaxosStateTracker Row row = partition.next(); Clustering clustering = row.clustering(); String tableName = UTF8Type.instance.compose(clustering.get(0), clustering.accessor()); - if (Schema.instance.getTableMetadata(keyspaceName, tableName) == null) + TableMetadata tm = Schema.instance.getTableMetadata(keyspaceName, tableName); + if (tm == null) continue; Cell pointsCell = row.getCell(pointsColumn); List<ByteBuffer> points = listType.compose(pointsCell.value(), pointsCell.accessor()); - PaxosRepairHistory history = PaxosRepairHistory.fromTupleBufferList(points); + PaxosRepairHistory history = PaxosRepairHistory.fromTupleBufferList(tm.partitioner, points); lowBound = Commit.latest(lowBound, history.maxLowBound()); } } diff --git a/src/java/org/apache/cassandra/service/paxos/uncommitted/PaxosUncommittedTracker.java b/src/java/org/apache/cassandra/service/paxos/uncommitted/PaxosUncommittedTracker.java index 062de3cf59..44d147e134 100644 --- a/src/java/org/apache/cassandra/service/paxos/uncommitted/PaxosUncommittedTracker.java +++ b/src/java/org/apache/cassandra/service/paxos/uncommitted/PaxosUncommittedTracker.java @@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.memtable.Memtable; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.util.File; @@ -45,6 +46,7 @@ import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState; import org.apache.cassandra.utils.CloseableIterator; @@ -191,7 +193,12 @@ public class PaxosUncommittedTracker public CloseableIterator<UncommittedPaxosKey> uncommittedKeyIterator(TableId tableId, Collection<Range<Token>> ranges) { - ranges = (ranges == null || ranges.isEmpty()) ? Collections.singleton(FULL_RANGE) : Range.normalize(ranges); + TableMetadata table = Schema.instance.getTableMetadata(tableId); + if (table == null || table.partitioner != IPartitioner.global()) + ranges = Collections.singleton(FULL_RANGE); + else + ranges = (ranges == null || ranges.isEmpty()) ? Collections.singleton(FULL_RANGE) : Range.normalize(ranges); + CloseableIterator<PaxosKeyState> updates = updateSupplier.repairIterator(tableId, ranges); try diff --git a/src/java/org/apache/cassandra/service/paxos/uncommitted/UncommittedDataFile.java b/src/java/org/apache/cassandra/service/paxos/uncommitted/UncommittedDataFile.java index b2a5004b80..e440d60a1b 100644 --- a/src/java/org/apache/cassandra/service/paxos/uncommitted/UncommittedDataFile.java +++ b/src/java/org/apache/cassandra/service/paxos/uncommitted/UncommittedDataFile.java @@ -34,7 +34,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import com.google.common.collect.PeekingIterator; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.dht.Range; @@ -343,7 +342,7 @@ public class UncommittedDataFile nextKey: while (!reader.isEOF()) { - DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(reader)); + DecoratedKey key = currentRange.left.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(reader)); while (!currentRange.contains(key)) { diff --git a/src/java/org/apache/cassandra/service/paxos/uncommitted/UncommittedTableData.java b/src/java/org/apache/cassandra/service/paxos/uncommitted/UncommittedTableData.java index 3130333813..bfc31c6d85 100644 --- a/src/java/org/apache/cassandra/service/paxos/uncommitted/UncommittedTableData.java +++ b/src/java/org/apache/cassandra/service/paxos/uncommitted/UncommittedTableData.java @@ -47,6 +47,7 @@ import org.apache.cassandra.concurrent.ExecutorPlus; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.SchemaElement; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.FSReadError; @@ -109,6 +110,7 @@ public class UncommittedTableData private final CloseableIterator<PaxosKeyState> wrapped; private final PeekingIterator<PaxosKeyState> peeking; private final PeekingIterator<Range<Token>> rangeIterator; + private final IPartitioner partitioner; private final PaxosRepairHistory.Searcher historySearcher; FilteringIterator(CloseableIterator<PaxosKeyState> wrapped, List<Range<Token>> ranges, PaxosRepairHistory history) @@ -116,6 +118,7 @@ public class UncommittedTableData this.wrapped = wrapped; this.peeking = Iterators.peekingIterator(wrapped); this.rangeIterator = Iterators.peekingIterator(Range.normalize(ranges).iterator()); + this.partitioner = history.partitioner; this.historySearcher = history.searcher(); } @@ -128,35 +131,25 @@ public class UncommittedTableData Range<Token> range = rangeIterator.peek(); - PaxosKeyState peeked = peeking.peek(); - Token token = peeked.key.getToken(); - - // If repairing the distributed metadata log table, we skip the filtering of paxos keys where the token - // is outside the range of the repair. This check would be complicated by the fact that the system.paxos - // table keys are tokenized with the global partitioner, but the log table uses its own specific - // partitioner. This means that the repair ranges will be a ReversedLongLocalToken pair, while the - // tokens obtained from the PaxosKeyState iterator will be whatever the global partitioner uses. - // However, as the replicas of the distributed log table (i.e. CMS members) always replicate the entire - // table, the range check is superfluous in this case anyway. - // For the purposes of the actual paxos repair (and for paxos reads/writes in general), this mismatch is - // also fine as the keys/tokens are opaque to the paxos implentation itself. - if (range.left.getPartitioner() == MetaStrategy.partitioner) + Token token = peeking.peek().key.getToken(); + if (!range.contains(token)) { - assert peeked.tableId.equals(DistributedMetadataLogKeyspace.LOG_TABLE_ID); - } - else - { - if (!range.contains(token)) - { - if (range.right.compareTo(token) < 0) - rangeIterator.next(); - else - peeking.next(); - continue; - } + if (!range.right.isMinimum() && range.right.compareTo(token) < 0) + rangeIterator.next(); + else + peeking.next(); + continue; } PaxosKeyState next = peeking.next(); + // If repairing a table with a partioner different from IPartitioner.global(), such as the distributed + // metadata log table, we don't filter paxos keys outside the data range of the repair. Instead, we + // repair everything present for that table. Replicas of the distributed log table (i.e. CMS members) + // always replicate the entire table, so this is not much of an issue at present. + // In this case, we also need to obtain the appropriate token for the paxos key, according to the + // table specific partitioner, in order to look up the low bound ballot for it the repair history. + if (partitioner != IPartitioner.global()) + token = partitioner.getToken(next.key.getKey()); Ballot lowBound = historySearcher.ballotForToken(token); if (Commit.isAfter(lowBound, next.ballot)) @@ -204,6 +197,10 @@ public class UncommittedTableData if (table == null) return Range.normalize(FULL_RANGE); + // for tables using a different partitioner to the globally configured one, don't filter anything + if (table.getPartitioner() != IPartitioner.global()) + return Range.normalize(FULL_RANGE); + String ksName = table.getKeyspaceName(); Collection<Range<Token>> ranges = StorageService.instance.getLocalAndPendingRanges(ksName); @@ -218,7 +215,12 @@ public class UncommittedTableData { ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(tableId); if (cfs == null) - return PaxosRepairHistory.EMPTY; + { + IPartitioner partitioner = tableId.equals(DistributedMetadataLogKeyspace.LOG_TABLE_ID) + ? MetaStrategy.partitioner + : IPartitioner.global(); + return PaxosRepairHistory.empty(partitioner); + } return cfs.getPaxosRepairHistory(); } diff --git a/src/java/org/apache/cassandra/utils/DiagnosticSnapshotService.java b/src/java/org/apache/cassandra/utils/DiagnosticSnapshotService.java index 5329ceef0d..dcae575940 100644 --- a/src/java/org/apache/cassandra/utils/DiagnosticSnapshotService.java +++ b/src/java/org/apache/cassandra/utils/DiagnosticSnapshotService.java @@ -22,7 +22,10 @@ import java.time.LocalDate; import java.time.format.DateTimeFormatter; import java.util.Collections; import java.util.List; -import java.util.concurrent.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import com.google.common.annotations.VisibleForTesting; @@ -30,7 +33,9 @@ import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.db.*; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.SnapshotCommand; import org.apache.cassandra.dht.Bounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; @@ -41,10 +46,9 @@ import org.apache.cassandra.net.Verb; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; +import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory; import static org.apache.cassandra.config.CassandraRelevantProperties.DIAGNOSTIC_SNAPSHOT_INTERVAL_NANOS; import static org.apache.cassandra.utils.Clock.Global.nanoTime; -import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory; -import static org.apache.cassandra.net.ParamType.SNAPSHOT_RANGES; /** * Provides a means to take snapshots when triggered by anomalous events or when the breaking of invariants is @@ -108,10 +112,10 @@ public class DiagnosticSnapshotService || command.snapshot_name.startsWith(DUPLICATE_ROWS_DETECTED_SNAPSHOT_PREFIX); } - public static void snapshot(SnapshotCommand command, List<Range<Token>> ranges, InetAddressAndPort initiator) + public static void snapshot(SnapshotCommand command, InetAddressAndPort initiator) { Preconditions.checkArgument(isDiagnosticSnapshotRequest(command)); - instance.maybeSnapshot(command, ranges, initiator); + instance.maybeSnapshot(command, command.ranges, initiator); } public static String getSnapshotName(String prefix) @@ -138,14 +142,16 @@ public class DiagnosticSnapshotService long interval = DIAGNOSTIC_SNAPSHOT_INTERVAL_NANOS.getLong(); if (now - last > interval && cached.compareAndSet(last, now)) { + if (ranges.size() > MAX_SNAPSHOT_RANGE_COUNT) + ranges = Collections.emptyList(); + Message<SnapshotCommand> msg = Message.out(Verb.SNAPSHOT_REQ, new SnapshotCommand(metadata.keyspace, metadata.name, + ranges, getSnapshotName(prefix), false)); - if (!ranges.isEmpty() && ranges.size() < MAX_SNAPSHOT_RANGE_COUNT) - msg = msg.withParam(SNAPSHOT_RANGES, ranges); for (InetAddressAndPort replica : endpoints) MessagingService.instance().send(msg, replica); } diff --git a/src/java/org/apache/cassandra/utils/RangesSerializer.java b/src/java/org/apache/cassandra/utils/RangesSerializer.java deleted file mode 100644 index 5707503f6b..0000000000 --- a/src/java/org/apache/cassandra/utils/RangesSerializer.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.utils; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.TypeSizes; -import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.io.util.DataOutputPlus; - -public class RangesSerializer implements IVersionedSerializer<Collection<Range<Token>>> -{ - public static final RangesSerializer serializer = new RangesSerializer(); - - @Override - public void serialize(Collection<Range<Token>> ranges, DataOutputPlus out, int version) throws IOException - { - out.writeInt(ranges.size()); - for (Range<Token> r : ranges) - { - Token.serializer.serialize(r.left, out, version); - Token.serializer.serialize(r.right, out, version); - } - } - - @Override - public Collection<Range<Token>> deserialize(DataInputPlus in, int version) throws IOException - { - int count = in.readInt(); - List<Range<Token>> ranges = new ArrayList<>(count); - IPartitioner partitioner = DatabaseDescriptor.getPartitioner(); - for (int i = 0; i < count; i++) - { - Token start = Token.serializer.deserialize(in, partitioner, version); - Token end = Token.serializer.deserialize(in, partitioner, version); - ranges.add(new Range<>(start, end)); - } - return ranges; - } - - @Override - public long serializedSize(Collection<Range<Token>> ranges, int version) - { - int size = TypeSizes.sizeof(ranges.size()); - if (ranges.size() > 0) - size += ranges.size() * 2 * Token.serializer.serializedSize(ranges.iterator().next().left, version); - return size; - } -} diff --git a/test/data/serialization/5.1/service.SyncComplete.bin b/test/data/serialization/5.1/service.SyncComplete.bin index 69ddbdda04..b5f3633e7b 100644 Binary files a/test/data/serialization/5.1/service.SyncComplete.bin and b/test/data/serialization/5.1/service.SyncComplete.bin differ diff --git a/test/data/serialization/5.1/service.SyncRequest.bin b/test/data/serialization/5.1/service.SyncRequest.bin index d1046c1435..f853b20f9c 100644 Binary files a/test/data/serialization/5.1/service.SyncRequest.bin and b/test/data/serialization/5.1/service.SyncRequest.bin differ diff --git a/test/data/serialization/5.1/service.ValidationComplete.bin b/test/data/serialization/5.1/service.ValidationComplete.bin index 85c0940b20..888aa1a5a8 100644 Binary files a/test/data/serialization/5.1/service.ValidationComplete.bin and b/test/data/serialization/5.1/service.ValidationComplete.bin differ diff --git a/test/data/serialization/5.1/service.ValidationRequest.bin b/test/data/serialization/5.1/service.ValidationRequest.bin index 5f4cbf664b..04c492a8a1 100644 Binary files a/test/data/serialization/5.1/service.ValidationRequest.bin and b/test/data/serialization/5.1/service.ValidationRequest.bin differ diff --git a/test/distributed/org/apache/cassandra/distributed/test/tcm/PaxosRepairTCMTest.java b/test/distributed/org/apache/cassandra/distributed/test/tcm/PaxosRepairTCMTest.java new file mode 100644 index 0000000000..3a2bb0f112 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/tcm/PaxosRepairTCMTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.tcm; + +import java.io.IOException; + +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.test.TestBaseImpl; + +public class PaxosRepairTCMTest extends TestBaseImpl +{ + @Test + public void paxosRepairWithReversePartitionerTest() throws IOException, InterruptedException + { + try (Cluster cluster = init(Cluster.build(2).withConfig(c -> c.with(Feature.NETWORK, Feature.GOSSIP)).start())) + { + cluster.get(1).nodetoolResult("cms", "reconfigure", "2").asserts().success(); + cluster.schemaChange(withKeyspace("create table %s.tbl (id int primary key)")); + for (int i = 0; i < 1000; i++) + cluster.coordinator(1).execute(withKeyspace("insert into %s.tbl (id) values (?)"), ConsistencyLevel.ALL, i); + cluster.forEach(i -> i.flush(KEYSPACE)); + + // flush system.paxos to create some UncommittedTableData files + for (int i = 0; i < 3; i++) + { + for (int j = 0; j < 3; j++) + cluster.schemaChange(withKeyspace(String.format("alter table %%s.tbl with comment = 'comment " + i + j + "'", i, j))); + cluster.forEach(inst -> inst.flush("system")); + } + + // unflushed rows in the system.paxos memtable + for (int i = 0; i < 3; i++) + cluster.schemaChange(withKeyspace(String.format("alter table %%s.tbl with comment = 'comment x " + i + "'", i))); + + long start = Long.MIN_VALUE; + long interval = Long.MAX_VALUE/5; + long end = start + interval; + for (int i = 0; i < 10; i++) + { + cluster.get(1).nodetoolResult("repair", "-full", "-st", String.valueOf(start), "-et", String.valueOf(end)); + start += interval; + end += interval; + } + + cluster.schemaChange(withKeyspace("create table %s.tbl2 (id int primary key)")); + } + } +} diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreMBeanTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreMBeanTest.java index 8d15f678e3..006ace0fb5 100644 --- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreMBeanTest.java +++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreMBeanTest.java @@ -77,6 +77,7 @@ public class ColumnFamilyStoreMBeanTest public void testInvalidateTokenRangesFormat() { ColumnFamilyStore store = Mockito.mock(ColumnFamilyStore.class); + Mockito.when(store.getPartitioner()).thenReturn(ByteOrderedPartitioner.instance); Mockito.doCallRealMethod().when(store).forceCompactionForTokenRanges(Mockito.any()); IPartitioner previous = DatabaseDescriptor.getPartitioner(); try diff --git a/test/unit/org/apache/cassandra/service/paxos/PaxosRepairHistoryTest.java b/test/unit/org/apache/cassandra/service/paxos/PaxosRepairHistoryTest.java index cd13616ccd..9846895a12 100644 --- a/test/unit/org/apache/cassandra/service/paxos/PaxosRepairHistoryTest.java +++ b/test/unit/org/apache/cassandra/service/paxos/PaxosRepairHistoryTest.java @@ -33,6 +33,7 @@ import com.google.common.collect.Lists; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Murmur3Partitioner.LongToken; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; @@ -116,7 +117,7 @@ public class PaxosRepairHistoryTest ballots[i] = points[i].right; } ballots[length - 1] = length == points.length ? points[length - 1].right : none(); - return new PaxosRepairHistory(tokens, ballots); + return new PaxosRepairHistory(IPartitioner.global(), tokens, ballots); } static @@ -127,7 +128,7 @@ public class PaxosRepairHistoryTest private static class Builder { - PaxosRepairHistory history = PaxosRepairHistory.EMPTY; + PaxosRepairHistory history = PaxosRepairHistory.empty(); Builder add(Ballot ballot, Range<Token>... ranges) { @@ -137,7 +138,7 @@ public class PaxosRepairHistoryTest Builder clear() { - history = PaxosRepairHistory.EMPTY; + history = PaxosRepairHistory.empty(); return this; } } @@ -149,7 +150,7 @@ public class PaxosRepairHistoryTest private static void checkSystemTableIO(PaxosRepairHistory history) { - Assert.assertEquals(history, PaxosRepairHistory.fromTupleBufferList(history.toTupleBufferList())); + Assert.assertEquals(history, PaxosRepairHistory.fromTupleBufferList(IPartitioner.global(), history.toTupleBufferList())); String tableName = "test" + tableNum.getAndIncrement(); SystemKeyspace.savePaxosRepairHistory("test", tableName, history, false); Assert.assertEquals(history, SystemKeyspace.loadPaxosRepairHistory("test", tableName)); @@ -245,7 +246,7 @@ public class PaxosRepairHistoryTest public void testRegression() { Assert.assertEquals(none(), trim( - new PaxosRepairHistory( + new PaxosRepairHistory(IPartitioner.global(), tks(-9223372036854775807L, -3952873730080618203L, -1317624576693539401L, 1317624576693539401L, 6588122883467697005L), uuids("1382954c-1dd2-11b2-8fb2-f45d70d6d6d8", "138260a4-1dd2-11b2-abb2-c13c36b179e1", "1382951a-1dd2-11b2-1dd8-b7e242b38dbe", "138294fc-1dd2-11b2-83c4-43fb3a552386", "13829510-1dd2-11b2-f353-381f2ed963fa", "1382954c-1dd2-11b2-8fb2-f45d70d6d6d8")), Collections.singleton(new Range<>(new LongToken(-1317624576693539401L), new LongToken(1317624576693539401L)))) @@ -256,8 +257,8 @@ public class PaxosRepairHistoryTest public void testInequality() { Collection<Range<Token>> ranges = Collections.singleton(new Range<>(Murmur3Partitioner.MINIMUM, Murmur3Partitioner.MINIMUM)); - PaxosRepairHistory a = PaxosRepairHistory.add(PaxosRepairHistory.EMPTY, ranges, none()); - PaxosRepairHistory b = PaxosRepairHistory.add(PaxosRepairHistory.EMPTY, ranges, nextBallot(NONE)); + PaxosRepairHistory a = PaxosRepairHistory.add(PaxosRepairHistory.empty(), ranges, none()); + PaxosRepairHistory b = PaxosRepairHistory.add(PaxosRepairHistory.empty(), ranges, nextBallot(NONE)); Assert.assertNotEquals(a, b); } @@ -342,7 +343,7 @@ public class PaxosRepairHistoryTest } } - PaxosRepairHistory merged = PaxosRepairHistory.EMPTY; + PaxosRepairHistory merged = PaxosRepairHistory.empty(Murmur3Partitioner.instance); for (PaxosRepairHistory split : splits) merged = PaxosRepairHistory.merge(merged, split); @@ -418,7 +419,7 @@ public class PaxosRepairHistoryTest static class RandomPaxosRepairHistory { - PaxosRepairHistory test = PaxosRepairHistory.EMPTY; + PaxosRepairHistory test = PaxosRepairHistory.empty(Murmur3Partitioner.instance); void add(Collection<Range<Token>> ranges, Ballot ballot) { @@ -502,7 +503,7 @@ public class PaxosRepairHistoryTest void serdeser() { - PaxosRepairHistory tmp = PaxosRepairHistory.fromTupleBufferList(test.toTupleBufferList()); + PaxosRepairHistory tmp = PaxosRepairHistory.fromTupleBufferList(Murmur3Partitioner.instance, test.toTupleBufferList()); Assert.assertEquals(test, tmp); test = tmp; } diff --git a/test/unit/org/apache/cassandra/service/paxos/uncommitted/UncommittedTableDataTest.java b/test/unit/org/apache/cassandra/service/paxos/uncommitted/UncommittedTableDataTest.java index 70b1f254c1..e275bcdd89 100644 --- a/test/unit/org/apache/cassandra/service/paxos/uncommitted/UncommittedTableDataTest.java +++ b/test/unit/org/apache/cassandra/service/paxos/uncommitted/UncommittedTableDataTest.java @@ -91,7 +91,7 @@ public class UncommittedTableDataTest PaxosRepairHistory getPaxosRepairHistory() { - return PaxosRepairHistory.EMPTY; + return PaxosRepairHistory.empty(PARTITIONER); } }; @@ -583,7 +583,7 @@ public class UncommittedTableDataTest PaxosRepairHistory getPaxosRepairHistory() { - return PaxosRepairHistory.EMPTY; + return PaxosRepairHistory.empty(PARTITIONER); } }); @@ -608,7 +608,7 @@ public class UncommittedTableDataTest PaxosRepairHistory getPaxosRepairHistory() { - return PaxosRepairHistory.EMPTY; + return PaxosRepairHistory.empty(PARTITIONER); } }); @@ -636,7 +636,7 @@ public class UncommittedTableDataTest PaxosRepairHistory getPaxosRepairHistory() { - return PaxosRepairHistory.add(PaxosRepairHistory.EMPTY, ALL_RANGES, ballots[1]); + return PaxosRepairHistory.add(PaxosRepairHistory.empty(PARTITIONER), ALL_RANGES, ballots[1]); } }); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org