This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 34d999c47a Optimisations to PlacementForRange, improve lookup on r/w path 34d999c47a is described below commit 34d999c47a4da6d43a67910354fb9888184b23ab Author: Marcus Eriksson <marc...@apache.org> AuthorDate: Wed Mar 20 15:53:50 2024 +0100 Optimisations to PlacementForRange, improve lookup on r/w path Patch by marcuse and Sam Tunnicliffe; reviewed by Sam Tunnicliffe for CASSANDRA-19191 Co-authored-by: Sam Tunnicliffe <s...@apache.org> Co-authored-by: Marcus Eriksson <marc...@apache.org> --- .../apache/cassandra/locator/LocalStrategy.java | 6 +- .../cassandra/locator/NetworkTopologyStrategy.java | 6 +- .../apache/cassandra/locator/SimpleStrategy.java | 6 +- .../org/apache/cassandra/tcm/ClusterMetadata.java | 16 +- .../cassandra/tcm/ownership/DataPlacement.java | 68 +++---- .../cassandra/tcm/ownership/DataPlacements.java | 14 +- .../{PlacementForRange.java => ReplicaGroups.java} | 206 +++++++++++++-------- .../org/apache/cassandra/tcm/sequences/Move.java | 4 +- .../cassandra/tcm/sequences/RemoveNodeStreams.java | 4 +- .../cassandra/distributed/shared/ClusterUtils.java | 4 +- .../test/log/MetadataChangeSimulationTest.java | 26 +-- .../test/log/OperationalEquivalenceTest.java | 4 +- .../distributed/test/log/SimulatedOperation.java | 4 +- .../distributed/test/ring/RangeVersioningTest.java | 4 +- .../test/microbench/ReplicaGroupsBench.java | 138 ++++++++++++++ .../tcm/compatibility/GossipHelperTest.java | 6 +- .../tcm/ownership/UniformRangePlacementTest.java | 68 ++++--- .../InProgressSequenceCancellationTest.java | 18 +- .../cassandra/tcm/sequences/SequencesUtils.java | 2 +- 19 files changed, 392 insertions(+), 212 deletions(-) diff --git a/src/java/org/apache/cassandra/locator/LocalStrategy.java b/src/java/org/apache/cassandra/locator/LocalStrategy.java index 69193090c4..4032ce1594 100644 --- a/src/java/org/apache/cassandra/locator/LocalStrategy.java +++ b/src/java/org/apache/cassandra/locator/LocalStrategy.java @@ -26,7 +26,7 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.ownership.DataPlacement; -import org.apache.cassandra.tcm.ownership.PlacementForRange; +import org.apache.cassandra.tcm.ownership.ReplicaGroups; import org.apache.cassandra.tcm.ownership.VersionedEndpoints; import org.apache.cassandra.utils.FBUtilities; @@ -65,7 +65,7 @@ public class LocalStrategy extends SystemStrategy { public static final Range<Token> entireRange = new Range<>(DatabaseDescriptor.getPartitioner().getMinimumToken(), DatabaseDescriptor.getPartitioner().getMinimumToken()); public static final EndpointsForRange localReplicas = EndpointsForRange.of(new Replica(FBUtilities.getBroadcastAddressAndPort(), entireRange, true)); - public static final DataPlacement placement = new DataPlacement(PlacementForRange.builder().withReplicaGroup(VersionedEndpoints.forRange(Epoch.FIRST, localReplicas)).build(), - PlacementForRange.builder().withReplicaGroup(VersionedEndpoints.forRange(Epoch.FIRST, localReplicas)).build()); + public static final DataPlacement placement = new DataPlacement(ReplicaGroups.builder().withReplicaGroup(VersionedEndpoints.forRange(Epoch.FIRST, localReplicas)).build(), + ReplicaGroups.builder().withReplicaGroup(VersionedEndpoints.forRange(Epoch.FIRST, localReplicas)).build()); } } diff --git a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java index d48ee31610..05bfcfb9ed 100644 --- a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java +++ b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java @@ -49,7 +49,7 @@ import org.apache.cassandra.tcm.membership.Directory; import org.apache.cassandra.tcm.membership.Location; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.ownership.DataPlacement; -import org.apache.cassandra.tcm.ownership.PlacementForRange; +import org.apache.cassandra.tcm.ownership.ReplicaGroups; import org.apache.cassandra.tcm.ownership.TokenMap; import org.apache.cassandra.tcm.ownership.VersionedEndpoints; import org.apache.cassandra.utils.FBUtilities; @@ -194,7 +194,7 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy Directory directory, TokenMap tokenMap) { - PlacementForRange.Builder builder = PlacementForRange.builder(); + ReplicaGroups.Builder builder = ReplicaGroups.builder(); for (Range<Token> range : ranges) { EndpointsForRange endpointsForRange = calculateNaturalReplicas(range.right, @@ -205,7 +205,7 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy builder.withReplicaGroup(VersionedEndpoints.forRange(epoch, endpointsForRange)); } - PlacementForRange built = builder.build(); + ReplicaGroups built = builder.build(); return new DataPlacement(built, built); } diff --git a/src/java/org/apache/cassandra/locator/SimpleStrategy.java b/src/java/org/apache/cassandra/locator/SimpleStrategy.java index 24893e5f1e..380f1a03e5 100644 --- a/src/java/org/apache/cassandra/locator/SimpleStrategy.java +++ b/src/java/org/apache/cassandra/locator/SimpleStrategy.java @@ -41,7 +41,7 @@ import org.apache.cassandra.tcm.compatibility.TokenRingUtils; import org.apache.cassandra.tcm.membership.Directory; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.ownership.DataPlacement; -import org.apache.cassandra.tcm.ownership.PlacementForRange; +import org.apache.cassandra.tcm.ownership.ReplicaGroups; import org.apache.cassandra.tcm.ownership.TokenMap; import org.apache.cassandra.tcm.ownership.VersionedEndpoints; @@ -69,12 +69,12 @@ public class SimpleStrategy extends AbstractReplicationStrategy @Override public DataPlacement calculateDataPlacement(Epoch epoch, List<Range<Token>> ranges, ClusterMetadata metadata) { - PlacementForRange.Builder builder = PlacementForRange.builder(); + ReplicaGroups.Builder builder = ReplicaGroups.builder(); for (Range<Token> range : ranges) builder.withReplicaGroup(VersionedEndpoints.forRange(epoch, calculateNaturalReplicas(range.right, metadata.tokenMap.tokens(), range, metadata.directory, metadata.tokenMap))); - PlacementForRange built = builder.build(); + ReplicaGroups built = builder.build(); return new DataPlacement(built, built); } diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java index 51a6d1f2e6..33886bec40 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java @@ -64,7 +64,7 @@ import org.apache.cassandra.tcm.membership.NodeVersion; import org.apache.cassandra.tcm.ownership.DataPlacement; import org.apache.cassandra.tcm.ownership.DataPlacements; import org.apache.cassandra.tcm.ownership.PrimaryRangeComparator; -import org.apache.cassandra.tcm.ownership.PlacementForRange; +import org.apache.cassandra.tcm.ownership.ReplicaGroups; import org.apache.cassandra.tcm.ownership.TokenMap; import org.apache.cassandra.tcm.ownership.VersionedEndpoints; import org.apache.cassandra.tcm.sequences.InProgressSequences; @@ -282,8 +282,8 @@ public class ClusterMetadata // TODO Remove this as it isn't really an equivalent to the previous concept of pending ranges public boolean hasPendingRangesFor(KeyspaceMetadata ksm, Token token) { - PlacementForRange writes = placements.get(ksm.params.replication).writes; - PlacementForRange reads = placements.get(ksm.params.replication).reads; + ReplicaGroups writes = placements.get(ksm.params.replication).writes; + ReplicaGroups reads = placements.get(ksm.params.replication).reads; if (ksm.params.replication.isMeta()) return !reads.equals(writes); return !reads.forToken(token).equals(writes.forToken(token)); @@ -292,8 +292,8 @@ public class ClusterMetadata // TODO Remove this as it isn't really an equivalent to the previous concept of pending ranges public boolean hasPendingRangesFor(KeyspaceMetadata ksm, InetAddressAndPort endpoint) { - PlacementForRange writes = placements.get(ksm.params.replication).writes; - PlacementForRange reads = placements.get(ksm.params.replication).reads; + ReplicaGroups writes = placements.get(ksm.params.replication).writes; + ReplicaGroups reads = placements.get(ksm.params.replication).reads; return !writes.byEndpoint().get(endpoint).equals(reads.byEndpoint().get(endpoint)); } @@ -311,8 +311,8 @@ public class ClusterMetadata public Map<Range<Token>, VersionedEndpoints.ForRange> pendingRanges(KeyspaceMetadata metadata) { Map<Range<Token>, VersionedEndpoints.ForRange> map = new HashMap<>(); - PlacementForRange writes = placements.get(metadata.params.replication).writes; - PlacementForRange reads = placements.get(metadata.params.replication).reads; + ReplicaGroups writes = placements.get(metadata.params.replication).writes; + ReplicaGroups reads = placements.get(metadata.params.replication).reads; // first, pending ranges as the result of range splitting or merging // i.e. new ranges being created through join/leave @@ -323,7 +323,7 @@ public class ClusterMetadata // next, ranges where the ranges themselves are not changing, but the replicas are // i.e. replacement or RF increase - writes.replicaGroups().forEach((range, endpoints) -> { + writes.forEach((range, endpoints) -> { VersionedEndpoints.ForRange readGroup = reads.forRange(range); if (!readGroup.equals(endpoints)) map.put(range, VersionedEndpoints.forRange(endpoints.lastModified(), diff --git a/src/java/org/apache/cassandra/tcm/ownership/DataPlacement.java b/src/java/org/apache/cassandra/tcm/ownership/DataPlacement.java index c37137b357..f42d8b5479 100644 --- a/src/java/org/apache/cassandra/tcm/ownership/DataPlacement.java +++ b/src/java/org/apache/cassandra/tcm/ownership/DataPlacement.java @@ -21,7 +21,6 @@ package org.apache.cassandra.tcm.ownership; import java.io.IOException; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Set; @@ -47,16 +46,16 @@ public class DataPlacement return replication.isMeta() ? metaKeyspaceSerializer : globalSerializer; } - private static final DataPlacement EMPTY = new DataPlacement(PlacementForRange.EMPTY, PlacementForRange.EMPTY); + private static final DataPlacement EMPTY = new DataPlacement(ReplicaGroups.EMPTY, ReplicaGroups.EMPTY); // TODO make tree of just EndpointsForRange, navigable by EFR.range() // TODO combine peers into a single entity with one vote in any quorum // (e.g. old & new peer must both respond to count one replica) - public final PlacementForRange reads; - public final PlacementForRange writes; + public final ReplicaGroups reads; + public final ReplicaGroups writes; - public DataPlacement(PlacementForRange reads, - PlacementForRange writes) + public DataPlacement(ReplicaGroups reads, + ReplicaGroups writes) { this.reads = reads; this.writes = writes; @@ -77,14 +76,14 @@ public class DataPlacement public DataPlacement combineReplicaGroups(DataPlacement other) { - return new DataPlacement(PlacementForRange.builder() - .withReplicaGroups(reads.replicaGroups().values()) - .withReplicaGroups(other.reads.replicaGroups.values()) - .build(), - PlacementForRange.builder() - .withReplicaGroups(writes.replicaGroups().values()) - .withReplicaGroups(other.writes.replicaGroups.values()) - .build()); + return new DataPlacement(ReplicaGroups.builder() + .withReplicaGroups(reads.endpoints) + .withReplicaGroups(other.reads.endpoints) + .build(), + ReplicaGroups.builder() + .withReplicaGroups(writes.endpoints) + .withReplicaGroups(other.writes.endpoints) + .build()); } public PlacementDeltas.PlacementDelta difference(DataPlacement next) @@ -95,8 +94,8 @@ public class DataPlacement public DataPlacement splitRangesForPlacement(List<Token> tokens) { - return new DataPlacement(PlacementForRange.splitRangesForPlacement(tokens, reads), - PlacementForRange.splitRangesForPlacement(tokens, writes)); + return new DataPlacement(ReplicaGroups.splitRangesForPlacement(tokens, reads), + ReplicaGroups.splitRangesForPlacement(tokens, writes)); } public static DataPlacement empty() @@ -106,8 +105,8 @@ public class DataPlacement public static Builder builder() { - return new Builder(PlacementForRange.builder(), - PlacementForRange.builder()); + return new Builder(ReplicaGroups.builder(), + ReplicaGroups.builder()); } public Builder unbuild() @@ -116,10 +115,10 @@ public class DataPlacement } public static class Builder { - public final PlacementForRange.Builder reads; - public final PlacementForRange.Builder writes; + public final ReplicaGroups.Builder reads; + public final ReplicaGroups.Builder writes; - public Builder(PlacementForRange.Builder reads, PlacementForRange.Builder writes) + public Builder(ReplicaGroups.Builder reads, ReplicaGroups.Builder writes) { this.reads = reads; this.writes = writes; @@ -165,22 +164,11 @@ public class DataPlacement public String toString() { return "DataPlacement{" + - "reads=" + toString(reads.replicaGroups) + - ", writes=" + toString(writes.replicaGroups) + + "reads=" + reads + + ", writes=" + writes + '}'; } - public static String toString(Map<?, ?> predicted) - { - StringBuilder sb = new StringBuilder(); - for (Map.Entry<?, ?> e : predicted.entrySet()) - { - sb.append(e.getKey()).append("=").append(e.getValue()).append(",\n"); - } - - return sb.toString(); - } - @Override public boolean equals(Object o) { @@ -206,21 +194,21 @@ public class DataPlacement public void serialize(DataPlacement t, DataOutputPlus out, Version version) throws IOException { - PlacementForRange.serializer.serialize(t.reads, out, partitioner, version); - PlacementForRange.serializer.serialize(t.writes, out, partitioner, version); + ReplicaGroups.serializer.serialize(t.reads, out, partitioner, version); + ReplicaGroups.serializer.serialize(t.writes, out, partitioner, version); } public DataPlacement deserialize(DataInputPlus in, Version version) throws IOException { - PlacementForRange reads = PlacementForRange.serializer.deserialize(in, partitioner, version); - PlacementForRange writes = PlacementForRange.serializer.deserialize(in, partitioner, version); + ReplicaGroups reads = ReplicaGroups.serializer.deserialize(in, partitioner, version); + ReplicaGroups writes = ReplicaGroups.serializer.deserialize(in, partitioner, version); return new DataPlacement(reads, writes); } public long serializedSize(DataPlacement t, Version version) { - return PlacementForRange.serializer.serializedSize(t.reads, partitioner, version) + - PlacementForRange.serializer.serializedSize(t.writes, partitioner, version); + return ReplicaGroups.serializer.serializedSize(t.reads, partitioner, version) + + ReplicaGroups.serializer.serializedSize(t.writes, partitioner, version); } } } diff --git a/src/java/org/apache/cassandra/tcm/ownership/DataPlacements.java b/src/java/org/apache/cassandra/tcm/ownership/DataPlacements.java index 5d598a8191..988d2b1bcb 100644 --- a/src/java/org/apache/cassandra/tcm/ownership/DataPlacements.java +++ b/src/java/org/apache/cassandra/tcm/ownership/DataPlacements.java @@ -101,9 +101,9 @@ public class DataPlacements extends ReplicationMap<DataPlacement> implements Met DatabaseDescriptor.getPartitioner().getMinimumToken(), DatabaseDescriptor.getPartitioner().getMinimumToken(), true)); - PlacementForRange placement = PlacementForRange.builder(1) - .withReplicaGroup(VersionedEndpoints.forRange(Epoch.EMPTY, endpoints)) - .build(); + ReplicaGroups placement = ReplicaGroups.builder(1) + .withReplicaGroup(VersionedEndpoints.forRange(Epoch.EMPTY, endpoints)) + .build(); LOCAL_PLACEMENT = new DataPlacement(placement, placement); } return LOCAL_PLACEMENT; @@ -149,13 +149,13 @@ public class DataPlacements extends ReplicationMap<DataPlacement> implements Met builder.with(params, placement); else { - PlacementForRange.Builder reads = PlacementForRange.builder(placement.reads.replicaGroups().size()); - placement.reads.replicaGroups().forEach((range, endpoints) -> { + ReplicaGroups.Builder reads = ReplicaGroups.builder(placement.reads.size()); + placement.reads.endpoints.forEach((endpoints) -> { reads.withReplicaGroup(VersionedEndpoints.forRange(endpoints.lastModified(), endpoints.get().sorted(comparator))); }); - PlacementForRange.Builder writes = PlacementForRange.builder(placement.writes.replicaGroups().size()); - placement.writes.replicaGroups().forEach((range, endpoints) -> { + ReplicaGroups.Builder writes = ReplicaGroups.builder(placement.writes.size()); + placement.writes.endpoints.forEach((endpoints) -> { writes.withReplicaGroup(VersionedEndpoints.forRange(endpoints.lastModified(), endpoints.get().sorted(comparator))); }); diff --git a/src/java/org/apache/cassandra/tcm/ownership/PlacementForRange.java b/src/java/org/apache/cassandra/tcm/ownership/ReplicaGroups.java similarity index 72% rename from src/java/org/apache/cassandra/tcm/ownership/PlacementForRange.java rename to src/java/org/apache/cassandra/tcm/ownership/ReplicaGroups.java index 0169279d3e..fbf84b57ae 100644 --- a/src/java/org/apache/cassandra/tcm/ownership/PlacementForRange.java +++ b/src/java/org/apache/cassandra/tcm/ownership/ReplicaGroups.java @@ -20,9 +20,12 @@ package org.apache.cassandra.tcm.ownership; import java.io.IOException; import java.util.*; +import java.util.function.BiConsumer; import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.Maps; import org.apache.cassandra.dht.IPartitioner; @@ -41,32 +44,60 @@ import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.serialization.PartitionerAwareMetadataSerializer; import org.apache.cassandra.tcm.serialization.Version; +import org.apache.cassandra.utils.AsymmetricOrdering; import static org.apache.cassandra.db.TypeSizes.sizeof; -public class PlacementForRange +public class ReplicaGroups { - public static final Serializer serializer = new Serializer(); + private static final AsymmetricOrdering<Range<Token>, Token> ordering = new AsymmetricOrdering<>() + { + @Override + public int compare(Range<Token> left, Range<Token> right) + { + return left.compareTo(right); + } - public static final PlacementForRange EMPTY = PlacementForRange.builder().build(); + @Override + public int compareAsymmetric(Range<Token> range, Token token) + { + if (token.isMinimum() && !range.right.isMinimum()) + return -1; + if (range.left.compareTo(token) >= 0) + return 1; + if (!range.right.isMinimum() && range.right.compareTo(token) < 0) + return -1; + return 0; + } + }; - final SortedMap<Range<Token>, VersionedEndpoints.ForRange> replicaGroups; + public static final Serializer serializer = new Serializer(); + public static final ReplicaGroups EMPTY = ReplicaGroups.builder().build(); - public PlacementForRange(Map<Range<Token>, VersionedEndpoints.ForRange> replicaGroups) - { - this.replicaGroups = new TreeMap<>(replicaGroups); - } + public final ImmutableList<Range<Token>> ranges; + public final ImmutableList<VersionedEndpoints.ForRange> endpoints; - @VisibleForTesting - public Map<Range<Token>, VersionedEndpoints.ForRange> replicaGroups() + public ReplicaGroups(Map<Range<Token>, VersionedEndpoints.ForRange> replicaGroups) { - return Collections.unmodifiableMap(replicaGroups); + ImmutableList.Builder<Range<Token>> rangesBuilder = ImmutableList.builderWithExpectedSize(replicaGroups.size()); + ImmutableList.Builder<VersionedEndpoints.ForRange> endpointsBuilder = ImmutableList.builderWithExpectedSize(replicaGroups.size()); + Range<Token> prev = null; + for (Map.Entry<Range<Token>, VersionedEndpoints.ForRange> entry : ImmutableSortedMap.copyOf(replicaGroups, Comparator.comparing(o -> o.left)).entrySet()) + { + if (prev != null && prev.right.compareTo(entry.getKey().left) > 0 ) + throw new IllegalArgumentException("Got overlapping ranges in replica groups: " + replicaGroups); + prev = entry.getKey(); + rangesBuilder.add(entry.getKey()); + endpointsBuilder.add(entry.getValue()); + } + this.ranges = rangesBuilder.build(); + this.endpoints = endpointsBuilder.build(); } @VisibleForTesting public List<Range<Token>> ranges() { - List<Range<Token>> ranges = new ArrayList<>(replicaGroups.keySet()); + List<Range<Token>> ranges = new ArrayList<>(this.ranges); ranges.sort(Range::compareTo); return ranges; } @@ -76,7 +107,11 @@ public class PlacementForRange { // can't use range.isWrapAround() since range.unwrap() returns a wrapping range (right token is min value) assert range.right.compareTo(range.left) > 0 || range.right.equals(range.right.minValue()); - return replicaGroups.get(range); + // we're searching for an exact match to the input range here, can use standard binary search + int pos = Collections.binarySearch(ranges, range, Comparator.comparing(o -> o.left)); + if (pos >= 0 && pos < ranges.size() && ranges.get(pos).equals(range)) + return endpoints.get(pos); + return null; } /** @@ -86,40 +121,32 @@ public class PlacementForRange { EndpointsForRange.Builder builder = new EndpointsForRange.Builder(range); Epoch lastModified = Epoch.EMPTY; - - for (Map.Entry<Range<Token>, VersionedEndpoints.ForRange> entry : replicaGroups.entrySet()) + // find a range containing the *right* token for the given range - Range is start exclusive so if we looked for the + // left one we could get the wrong range + int pos = ordering.binarySearchAsymmetric(ranges, range.right, AsymmetricOrdering.Op.CEIL); + if (pos >= 0 && pos < ranges.size() && ranges.get(pos).contains(range)) { - if (entry.getKey().contains(range)) - { - lastModified = Epoch.max(lastModified, entry.getValue().lastModified()); - builder.addAll(entry.getValue().get(), ReplicaCollection.Builder.Conflict.ALL); - } + VersionedEndpoints.ForRange eps = endpoints.get(pos); + lastModified = eps.lastModified(); + builder.addAll(eps.get(), ReplicaCollection.Builder.Conflict.ALL); } - return VersionedEndpoints.forRange(lastModified, builder.build()); } public VersionedEndpoints.ForRange forRange(Token token) { - for (Map.Entry<Range<Token>, VersionedEndpoints.ForRange> entry : replicaGroups.entrySet()) - { - if (entry.getKey().contains(token)) - return entry.getValue(); - } - throw new IllegalStateException("Could not find range for token " + token + " in PlacementForRange: " + replicaGroups); + int pos = ordering.binarySearchAsymmetric(ranges, token, AsymmetricOrdering.Op.CEIL); + if (pos >= 0 && pos < endpoints.size()) + return endpoints.get(pos); + throw new IllegalStateException("Could not find range for token " + token + " in ReplicaGroups: " + this); } public VersionedEndpoints.ForToken forToken(Token token) { - for (Map.Entry<Range<Token>, VersionedEndpoints.ForRange> entry : replicaGroups.entrySet()) - { - if (entry.getKey().contains(token)) - return entry.getValue().forToken(token); - } - throw new IllegalStateException("Could not find range for token " + token + " in PlacementForRange: " + replicaGroups); + return forRange(token).forToken(token); } - public Delta difference(PlacementForRange next) + public Delta difference(ReplicaGroups next) { RangesByEndpoint oldMap = this.byEndpoint(); RangesByEndpoint newMap = next.byEndpoint(); @@ -130,8 +157,8 @@ public class PlacementForRange public RangesByEndpoint byEndpoint() { RangesByEndpoint.Builder builder = new RangesByEndpoint.Builder(); - for (Map.Entry<Range<Token>, VersionedEndpoints.ForRange> oldPlacement : this.replicaGroups.entrySet()) - oldPlacement.getValue().byEndpoint().forEach(builder::put); + for (int i = 0; i < endpoints.size(); i++) + endpoints.get(i).byEndpoint().forEach(builder::put); return builder.build(); } @@ -152,24 +179,52 @@ public class PlacementForRange return builder.build(); } - public PlacementForRange withCappedLastModified(Epoch lastModified) + public ReplicaGroups withCappedLastModified(Epoch lastModified) { SortedMap<Range<Token>, VersionedEndpoints.ForRange> copy = new TreeMap<>(); - for (Map.Entry<Range<Token>, VersionedEndpoints.ForRange> entry : replicaGroups.entrySet()) + for (int i = 0; i < ranges.size(); i++) { - Range<Token> range = entry.getKey(); - VersionedEndpoints.ForRange forRange = entry.getValue(); + Range<Token> range = ranges.get(i); + VersionedEndpoints.ForRange forRange = endpoints.get(i); if (forRange.lastModified().isAfter(lastModified)) forRange = forRange.withLastModified(lastModified); copy.put(range, forRange); } - return new PlacementForRange(copy); + return new ReplicaGroups(copy); + } + + + public int size() + { + return ranges.size(); + } + + public boolean isEmpty() + { + return size() == 0; + } + + @VisibleForTesting + public Map<Range<Token>, VersionedEndpoints.ForRange> asMap() + { + Map<Range<Token>, VersionedEndpoints.ForRange> map = new HashMap<>(); + for (int i = 0; i < size(); i++) + map.put(ranges.get(i), endpoints.get(i)); + return map; + } + + public void forEach(BiConsumer<Range<Token>, VersionedEndpoints.ForRange> consumer) + { + for (int i = 0; i < size(); i++) + consumer.accept(ranges.get(i), endpoints.get(i)); } @Override public String toString() { - return replicaGroups.toString(); + StringBuilder sb = new StringBuilder("ReplicaGroups{"); + forEach((range, eps) -> sb.append(range).append('=').append(eps).append(", ")); + return sb.append('}').toString(); } @VisibleForTesting @@ -184,17 +239,16 @@ public class PlacementForRange @VisibleForTesting public List<String> toReplicaStringList() { - return replicaGroups.values() - .stream() - .map(VersionedEndpoints.ForRange::get) - .flatMap(AbstractReplicaCollection::stream) - .map(Replica::toString) - .collect(Collectors.toList()); + return endpoints.stream() + .map(VersionedEndpoints.ForRange::get) + .flatMap(AbstractReplicaCollection::stream) + .map(Replica::toString) + .collect(Collectors.toList()); } public Builder unbuild() { - return new Builder(new HashMap<>(replicaGroups)); + return new Builder(asMap()); } public static Builder builder() @@ -208,13 +262,13 @@ public class PlacementForRange } @VisibleForTesting - public static PlacementForRange splitRangesForPlacement(List<Token> tokens, PlacementForRange placement) + public static ReplicaGroups splitRangesForPlacement(List<Token> tokens, ReplicaGroups placement) { - if (placement.replicaGroups.isEmpty()) + if (placement.ranges.isEmpty()) return placement; - Builder newPlacement = PlacementForRange.builder(); - List<VersionedEndpoints.ForRange> eprs = new ArrayList<>(placement.replicaGroups.values()); + Builder newPlacement = ReplicaGroups.builder(); + List<VersionedEndpoints.ForRange> eprs = new ArrayList<>(placement.endpoints); eprs.sort(Comparator.comparing(a -> a.range().left)); Token min = eprs.get(0).range().left; Token max = eprs.get(eprs.size() - 1).range().right; @@ -372,30 +426,30 @@ public class PlacementForRange return this; } - public PlacementForRange build() + public ReplicaGroups build() { - return new PlacementForRange(this.replicaGroups); + return new ReplicaGroups(this.replicaGroups); } } - public static class Serializer implements PartitionerAwareMetadataSerializer<PlacementForRange> + public static class Serializer implements PartitionerAwareMetadataSerializer<ReplicaGroups> { - public void serialize(PlacementForRange t, DataOutputPlus out, IPartitioner partitioner, Version version) throws IOException + public void serialize(ReplicaGroups t, DataOutputPlus out, IPartitioner partitioner, Version version) throws IOException { - out.writeInt(t.replicaGroups.size()); + out.writeInt(t.ranges.size()); - for (Map.Entry<Range<Token>, VersionedEndpoints.ForRange> entry : t.replicaGroups.entrySet()) + for (int i = 0; i < t.ranges.size(); i++) { - Range<Token> range = entry.getKey(); - VersionedEndpoints.ForRange efr = entry.getValue(); + Range<Token> range = t.ranges.get(i); + VersionedEndpoints.ForRange efr = t.endpoints.get(i); if (version.isAtLeast(Version.V2)) Epoch.serializer.serialize(efr.lastModified(), out, version); Token.metadataSerializer.serialize(range.left, out, partitioner, version); Token.metadataSerializer.serialize(range.right, out, partitioner, version); out.writeInt(efr.size()); - for (int i = 0; i < efr.size(); i++) + for (int efrIdx = 0; efrIdx < efr.size(); efrIdx++) { - Replica r = efr.get().get(i); + Replica r = efr.get().get(efrIdx); Token.metadataSerializer.serialize(r.range().left, out, partitioner, version); Token.metadataSerializer.serialize(r.range().right, out, partitioner, version); InetAddressAndPort.MetadataSerializer.serializer.serialize(r.endpoint(), out, version); @@ -404,7 +458,7 @@ public class PlacementForRange } } - public PlacementForRange deserialize(DataInputPlus in, IPartitioner partitioner, Version version) throws IOException + public ReplicaGroups deserialize(DataInputPlus in, IPartitioner partitioner, Version version) throws IOException { int groupCount = in.readInt(); Map<Range<Token>, VersionedEndpoints.ForRange> result = Maps.newHashMapWithExpectedSize(groupCount); @@ -438,25 +492,25 @@ public class PlacementForRange EndpointsForRange efr = EndpointsForRange.copyOf(replicas); result.put(range, VersionedEndpoints.forRange(lastModified, efr)); } - return new PlacementForRange(result); + return new ReplicaGroups(result); } - public long serializedSize(PlacementForRange t, IPartitioner partitioner, Version version) + public long serializedSize(ReplicaGroups t, IPartitioner partitioner, Version version) { - long size = sizeof(t.replicaGroups.size()); - for (Map.Entry<Range<Token>, VersionedEndpoints.ForRange> entry : t.replicaGroups.entrySet()) + long size = sizeof(t.ranges.size()); + for (int i = 0; i < t.ranges.size(); i++) { - Range<Token> range = entry.getKey(); - VersionedEndpoints.ForRange efr = entry.getValue(); + Range<Token> range = t.ranges.get(i); + VersionedEndpoints.ForRange efr = t.endpoints.get(i); if (version.isAtLeast(Version.V2)) size += Epoch.serializer.serializedSize(efr.lastModified(), version); size += Token.metadataSerializer.serializedSize(range.left, partitioner, version); size += Token.metadataSerializer.serializedSize(range.right, partitioner, version); size += sizeof(efr.size()); - for (int i = 0; i < efr.size(); i++) + for (int efrIdx = 0; efrIdx < efr.size(); efrIdx++) { - Replica r = efr.get().get(i); + Replica r = efr.get().get(efrIdx); size += Token.metadataSerializer.serializedSize(r.range().left, partitioner, version); size += Token.metadataSerializer.serializedSize(r.range().right, partitioner, version); size += InetAddressAndPort.MetadataSerializer.serializer.serializedSize(r.endpoint(), version); @@ -471,14 +525,14 @@ public class PlacementForRange public boolean equals(Object o) { if (this == o) return true; - if (!(o instanceof PlacementForRange)) return false; - PlacementForRange that = (PlacementForRange) o; - return Objects.equals(replicaGroups, that.replicaGroups); + if (!(o instanceof ReplicaGroups)) return false; + ReplicaGroups that = (ReplicaGroups) o; + return Objects.equals(ranges, that.ranges) && Objects.equals(endpoints, that.endpoints); } @Override public int hashCode() { - return Objects.hash(replicaGroups); + return Objects.hash(ranges, endpoints); } } diff --git a/src/java/org/apache/cassandra/tcm/sequences/Move.java b/src/java/org/apache/cassandra/tcm/sequences/Move.java index 898cfabc3b..4b28d7b8aa 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/Move.java +++ b/src/java/org/apache/cassandra/tcm/sequences/Move.java @@ -61,7 +61,7 @@ import org.apache.cassandra.tcm.membership.NodeState; import org.apache.cassandra.tcm.ownership.DataPlacements; import org.apache.cassandra.tcm.ownership.MovementMap; import org.apache.cassandra.tcm.ownership.PlacementDeltas; -import org.apache.cassandra.tcm.ownership.PlacementForRange; +import org.apache.cassandra.tcm.ownership.ReplicaGroups; import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer; import org.apache.cassandra.tcm.serialization.MetadataSerializer; import org.apache.cassandra.tcm.serialization.Version; @@ -334,7 +334,7 @@ public class Move extends MultiStepOperation<Epoch> MovementMap.Builder allMovements = MovementMap.builder(); toStart.forEach((params, delta) -> { RangesByEndpoint targets = delta.writes.additions; - PlacementForRange oldOwners = placements.get(params).reads; + ReplicaGroups oldOwners = placements.get(params).reads; EndpointsByReplica.Builder movements = new EndpointsByReplica.Builder(); targets.flattenValues().forEach(destination -> { SourceHolder sources = new SourceHolder(fd, destination, toSplitRanges.get(params), strictConsistency); diff --git a/src/java/org/apache/cassandra/tcm/sequences/RemoveNodeStreams.java b/src/java/org/apache/cassandra/tcm/sequences/RemoveNodeStreams.java index b5951b6183..d6e8ca3127 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/RemoveNodeStreams.java +++ b/src/java/org/apache/cassandra/tcm/sequences/RemoveNodeStreams.java @@ -39,7 +39,7 @@ import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.ownership.MovementMap; import org.apache.cassandra.tcm.ownership.PlacementDeltas; -import org.apache.cassandra.tcm.ownership.PlacementForRange; +import org.apache.cassandra.tcm.ownership.ReplicaGroups; import static org.apache.cassandra.streaming.StreamOperation.RESTORE_REPLICA_COUNT; @@ -121,7 +121,7 @@ public class RemoveNodeStreams implements LeaveStreams RangesByEndpoint startWriteAdditions = startDelta.get(params).writes.additions; RangesByEndpoint startWriteRemovals = startDelta.get(params).writes.removals; // find current placements from the metadata, we need to stream from replicas that are not changed and are therefore not in the deltas - PlacementForRange currentPlacements = metadata.placements.get(params).reads; + ReplicaGroups currentPlacements = metadata.placements.get(params).reads; startWriteAdditions.flattenValues() .forEach(newReplica -> { EndpointsForRange.Builder candidateBuilder = new EndpointsForRange.Builder(newReplica.range()); diff --git a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java index b160b2600f..f6628d3047 100644 --- a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java +++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java @@ -78,7 +78,7 @@ import org.apache.cassandra.tcm.Commit; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.Transformation; import org.apache.cassandra.tcm.membership.NodeId; -import org.apache.cassandra.tcm.ownership.PlacementForRange; +import org.apache.cassandra.tcm.ownership.ReplicaGroups; import org.apache.cassandra.utils.Isolated; import org.apache.cassandra.utils.concurrent.AsyncPromise; import org.apache.cassandra.utils.concurrent.CountDownLatch; @@ -388,7 +388,7 @@ public class ClusterUtils StringBuilder builder = new StringBuilder(); builder.append("'keyspace' { 'name':").append(keyspace.name).append("', "); builder.append("'reads':['"); - PlacementForRange placement = metadata.placements.get(keyspace.params.replication).reads; + ReplicaGroups placement = metadata.placements.get(keyspace.params.replication).reads; builder.append(byEndpoint ? placement.toStringByEndpoint() : placement.toString()); builder.append("'], 'writes':['"); placement = metadata.placements.get(keyspace.params.replication).writes; diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java index 8ece3dc7c8..26d1cff7ac 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java @@ -65,7 +65,7 @@ import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.membership.NodeVersion; import org.apache.cassandra.tcm.ownership.DataPlacement; import org.apache.cassandra.tcm.ownership.DataPlacements; -import org.apache.cassandra.tcm.ownership.PlacementForRange; +import org.apache.cassandra.tcm.ownership.ReplicaGroups; import org.apache.cassandra.tcm.ownership.VersionedEndpoints; import org.apache.cassandra.tcm.transformations.Register; import org.apache.cassandra.tcm.transformations.TriggerSnapshot; @@ -558,7 +558,7 @@ public class MetadataChangeSimulationTest extends CMSTestBase Set<NodeId> bouncing = new HashSet<>(); Set<NodeId> replicasFromBouncedReplicaSets = new HashSet<>(); outer: - for (VersionedEndpoints.ForRange placements : sut.service.metadata().placements.get(rf.asKeyspaceParams().replication).writes.replicaGroups().values()) + for (VersionedEndpoints.ForRange placements : sut.service.metadata().placements.get(rf.asKeyspaceParams().replication).writes.endpoints) { List<NodeId> replicas = new ArrayList<>(metadata.directory.toNodeIds(placements.get().endpoints())); List<NodeId> bounceCandidates = new ArrayList<>(); @@ -700,12 +700,12 @@ public class MetadataChangeSimulationTest extends CMSTestBase return sb.toString(); } - public static void match(PlacementForRange actual, Map<TokenPlacementModel.Range, List<TokenPlacementModel.Replica>> predicted) throws Throwable + public static void match(ReplicaGroups actual, Map<TokenPlacementModel.Range, List<TokenPlacementModel.Replica>> predicted) throws Throwable { - Map<Range<Token>, VersionedEndpoints.ForRange> actualGroups = actual.replicaGroups(); + Map<Range<Token>, VersionedEndpoints.ForRange> actualGroups = actual.asMap(); assert predicted.size() == actualGroups.size() : String.format("\nPredicted:\n%s(%d)" + - "\nActual:\n%s(%d)", toString(predicted), predicted.size(), toString(actual.replicaGroups()), actualGroups.size()); + "\nActual:\n%s(%d)", toString(predicted), predicted.size(), toString(actualGroups), actualGroups.size()); for (Map.Entry<TokenPlacementModel.Range, List<TokenPlacementModel.Replica>> entry : predicted.entrySet()) { @@ -825,24 +825,28 @@ public class MetadataChangeSimulationTest extends CMSTestBase validatePlacementsInternal(rf, modelState.inFlightOperations, expectedRanges, actualPlacements.writes, true); } - public static void validateTransientStatus(PlacementForRange reads, PlacementForRange writes) + public static void validateTransientStatus(ReplicaGroups reads, ReplicaGroups writes) { // No node should ever be a FULL read replica but a TRANSIENT write replica for the same range Map<Range<Token>, List<Replica>> invalid = new HashMap<>(); - reads.replicaGroups().forEach((range, readGroup) -> { + for (int i = 0; i < reads.ranges.size(); i++) + { + Range<Token> range = reads.ranges.get(i); + VersionedEndpoints.ForRange readGroup = reads.endpoints.get(i); Map<InetAddressAndPort, Replica> writeGroup = writes.forRange(range).get().byEndpoint(); + readGroup.forEach(r -> { if (r.isFull()) { Replica w = writeGroup.get(r.endpoint()); if (w != null && w.isTransient()) { - List<Replica> i = invalid.computeIfAbsent(range, ignore -> new ArrayList<>()); - i.add(w); + List<Replica> replicas = invalid.computeIfAbsent(range, ignore -> new ArrayList<>()); + replicas.add(w); } } }); - }); + } assertTrue(() -> String.format("Found replicas with invalid transient/full status within a given range. " + "The following were found with the same instance having TRANSIENT status for writes, but " + "FULL status for reads, which can cause consistency violations. %n%s", invalid), @@ -854,7 +858,7 @@ public class MetadataChangeSimulationTest extends CMSTestBase Assert.assertEquals(new TreeSet<>(l), new TreeSet<>(r)); } - public static void validatePlacementsInternal(ReplicationFactor rf, List<SimulatedOperation> opStates, List<Range<Token>> expectedRanges, PlacementForRange placements, boolean allowPending) + public static void validatePlacementsInternal(ReplicationFactor rf, List<SimulatedOperation> opStates, List<Range<Token>> expectedRanges, ReplicaGroups placements, boolean allowPending) { int overreplicated = 0; for (Range<Token> range : expectedRanges) diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/OperationalEquivalenceTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/OperationalEquivalenceTest.java index 035cd68a32..85519006cf 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/OperationalEquivalenceTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/OperationalEquivalenceTest.java @@ -133,14 +133,14 @@ public class OperationalEquivalenceTest extends CMSTestBase { l.forEach((params, lPlacement) -> { DataPlacement rPlacement = r.get(params); - lPlacement.reads.replicaGroups().forEach((range, lReplicas) -> { + lPlacement.reads.forEach((range, lReplicas) -> { EndpointsForRange rReplicas = rPlacement.reads.forRange(range).get(); Assert.assertEquals(toReplicas(lReplicas.get()), toReplicas(rReplicas)); }); - lPlacement.writes.replicaGroups().forEach((range, lReplicas) -> { + lPlacement.writes.forEach((range, lReplicas) -> { EndpointsForRange rReplicas = rPlacement.writes.forRange(range).get(); Assert.assertEquals(toReplicas(lReplicas.get()), diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/SimulatedOperation.java b/test/distributed/org/apache/cassandra/distributed/test/log/SimulatedOperation.java index 15ee938cf2..c8c2315529 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/SimulatedOperation.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/SimulatedOperation.java @@ -156,8 +156,8 @@ public abstract class SimulatedOperation sutActions.next(); ClusterMetadata m2 = ClusterMetadata.current(); - Map<Range<Token>, VersionedEndpoints.ForRange> after = m2.placements.get(simulatedState.rf.asKeyspaceParams().replication).reads.replicaGroups(); - m1.placements.get(simulatedState.rf.asKeyspaceParams().replication).reads.replicaGroups().forEach((k, beforePlacements) -> { + Map<Range<Token>, VersionedEndpoints.ForRange> after = m2.placements.get(simulatedState.rf.asKeyspaceParams().replication).reads.asMap(); + m1.placements.get(simulatedState.rf.asKeyspaceParams().replication).reads.forEach((k, beforePlacements) -> { if (after.containsKey(k)) { VersionedEndpoints.ForRange afterPlacements = after.get(k); diff --git a/test/distributed/org/apache/cassandra/distributed/test/ring/RangeVersioningTest.java b/test/distributed/org/apache/cassandra/distributed/test/ring/RangeVersioningTest.java index 47a80124e2..35c81a53bb 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/ring/RangeVersioningTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/ring/RangeVersioningTest.java @@ -54,9 +54,7 @@ public class RangeVersioningTest extends FuzzTestBase for (int i = 1; i <= 4; i++) { Epoch smallestSeen = null; - for (VersionedEndpoints.ForRange fr : metadata.placements - .get(ReplicationParams.simple(i)) - .writes.replicaGroups().values()) + for (VersionedEndpoints.ForRange fr : metadata.placements.get(ReplicationParams.simple(i)).writes.endpoints) { if (smallestSeen == null || fr.lastModified().isBefore(smallestSeen)) smallestSeen = fr.lastModified(); diff --git a/test/microbench/org/apache/cassandra/test/microbench/ReplicaGroupsBench.java b/test/microbench/org/apache/cassandra/test/microbench/ReplicaGroupsBench.java new file mode 100644 index 0000000000..7de7a8ef54 --- /dev/null +++ b/test/microbench/org/apache/cassandra/test/microbench/ReplicaGroupsBench.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.test.microbench; + +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.Keyspaces; +import org.apache.cassandra.schema.ReplicationParams; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.tcm.membership.Location; +import org.apache.cassandra.tcm.membership.NodeAddresses; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.membership.NodeVersion; +import org.apache.cassandra.tcm.ownership.ReplicaGroups; +import org.apache.cassandra.tcm.ownership.PlacementProvider; +import org.apache.cassandra.tcm.ownership.UniformRangePlacement; +import org.apache.cassandra.tcm.transformations.UnsafeJoin; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +@State(Scope.Benchmark) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Fork(value = 1) +@Warmup(iterations = 5, timeUnit = TimeUnit.MILLISECONDS, time = 5000) +@Measurement(iterations = 5, timeUnit = TimeUnit.MILLISECONDS, time = 5000) +public class ReplicaGroupsBench +{ + static Token [] queryTokens = new Token[5000]; + static Random random = new Random(1); + static ReplicaGroups replicaGroups; +/* +new: ReplicaGroupBench.bench avgt 5 0,317 ± 0,037 ms/op + +old: ReplicaGroupBench.bench avgt 5 10,187 ± 0,040 ms/op + + */ + @Setup(Level.Trial) + public void setup() throws UnknownHostException + { + int nodecount = 1000; + DatabaseDescriptor.daemonInitialization(); + for (int i = 0; i < 5000; i++) + queryTokens[i] = Murmur3Partitioner.instance.getRandomToken(random); + Keyspaces keyspaces = fakeKeyspaces(6); + ReplicationParams params = keyspaces.get("pfrbench").get().params.replication; + replicaGroups = new UniformRangePlacement().calculatePlacements(Epoch.FIRST, fakeMetadata(nodecount), keyspaces).get(params).reads; + } + + @Benchmark + public void bench() + { + for (Token t : queryTokens) + replicaGroups.forRange(t); + } + + public ClusterMetadata fakeMetadata(int nodeCount) throws UnknownHostException + { + ClusterMetadata metadata = new ClusterMetadata(Murmur3Partitioner.instance); + TokenSupplier tokensupplier = TokenSupplier.evenlyDistributedTokens(nodeCount); + PlacementProvider placementProvider = new UniformRangePlacement(); + for (int i = 1; i < nodeCount; i++) + { + ClusterMetadata.Transformer transformer = metadata.transformer(); + UUID uuid = UUID.randomUUID(); + NodeAddresses addresses = addresses(uuid, i); + metadata = transformer.register(addresses, new Location("dc1", "rack1"), NodeVersion.CURRENT).build().metadata; + NodeId nodeId = metadata.directory.peerId(addresses.broadcastAddress); + metadata = new UnsafeJoin(nodeId, Collections.singleton(new Murmur3Partitioner.LongToken(tokensupplier.token(i))), placementProvider).execute(metadata).success().metadata; + } + + return metadata; + } + + NodeAddresses addresses(UUID uuid, int idx) throws UnknownHostException + { + byte [] address = new byte [] {127, 0, + (byte) (((idx + 1) & 0x0000ff00) >> 8), + (byte) ((idx + 1) & 0x000000ff)}; + + InetAddressAndPort host = InetAddressAndPort.getByAddress(address); + return new NodeAddresses(uuid, host, host, host); + } + + public Keyspaces fakeKeyspaces(int rf) + { + KeyspaceMetadata metadata = KeyspaceMetadata.create("pfrbench", KeyspaceParams.simple(rf)); + return Keyspaces.of(metadata); + } + public static void main(String[] args) throws RunnerException + { + Options options = new OptionsBuilder() + .include(ReplicaGroupsBench.class.getSimpleName()) + .build(); + new Runner(options).run(); + } +} diff --git a/test/unit/org/apache/cassandra/tcm/compatibility/GossipHelperTest.java b/test/unit/org/apache/cassandra/tcm/compatibility/GossipHelperTest.java index 329770db07..3ff7c03e83 100644 --- a/test/unit/org/apache/cassandra/tcm/compatibility/GossipHelperTest.java +++ b/test/unit/org/apache/cassandra/tcm/compatibility/GossipHelperTest.java @@ -46,7 +46,7 @@ import org.apache.cassandra.schema.Keyspaces; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ownership.DataPlacements; import org.apache.cassandra.tcm.membership.NodeId; -import org.apache.cassandra.tcm.ownership.PlacementForRange; +import org.apache.cassandra.tcm.ownership.ReplicaGroups; import org.apache.cassandra.utils.CassandraVersion; import static org.apache.cassandra.gms.ApplicationState.*; @@ -157,8 +157,8 @@ public class GossipHelperTest assertEquals(entry.getValue(), metadata.tokenMap.tokens(nodeId).iterator().next()); } - PlacementForRange reads = metadata.placements.get(KSM_NTS.params.replication).reads; - PlacementForRange writes = metadata.placements.get(KSM_NTS.params.replication).writes; + ReplicaGroups reads = metadata.placements.get(KSM_NTS.params.replication).reads; + ReplicaGroups writes = metadata.placements.get(KSM_NTS.params.replication).writes; assertEquals(reads, writes); // tokens are // dc1: 1: 1000, 3: 3000, 5: 5000, 6: 7000, 7: 9000 diff --git a/test/unit/org/apache/cassandra/tcm/ownership/UniformRangePlacementTest.java b/test/unit/org/apache/cassandra/tcm/ownership/UniformRangePlacementTest.java index f046be7955..c987eff2e8 100644 --- a/test/unit/org/apache/cassandra/tcm/ownership/UniformRangePlacementTest.java +++ b/test/unit/org/apache/cassandra/tcm/ownership/UniformRangePlacementTest.java @@ -50,12 +50,12 @@ public class UniformRangePlacementTest // which is either the min or max value in the token space. Any other single token would produce two ranges - // (MIN, t] & (t, MAX] - but because (x, x] denotes a wraparound, this case produces (MIN, MAX] and we need to // verify that we can safely split that when more tokens are introduced. This test supposes MIN = 0, MAX = 100 - PlacementForRange before = PlacementForRange.builder() - .withReplicaGroup(VersionedEndpoints.forRange(Epoch.EMPTY, rg(0, 100, 1, 2, 3))) - .build(); + ReplicaGroups before = ReplicaGroups.builder() + .withReplicaGroup(VersionedEndpoints.forRange(Epoch.EMPTY, rg(0, 100, 1, 2, 3))) + .build(); // existing token is MIN (i.e. 0 for the purposes of this test) List<Token> tokens = ImmutableList.of(token(0), token(30), token(60), token(90)); - PlacementForRange after = PlacementForRange.splitRangesForPlacement(tokens, before); + ReplicaGroups after = ReplicaGroups.splitRangesForPlacement(tokens, before); assertPlacement(after, rg(0, 30, 1, 2, 3), rg(30, 60, 1, 2, 3), @@ -65,7 +65,7 @@ public class UniformRangePlacementTest // existing token is MAX (i.e. 100 for the purposes of this test). tokens = ImmutableList.of(token(30), token(60), token(90), token(100)); - after = PlacementForRange.splitRangesForPlacement(tokens, before); + after = ReplicaGroups.splitRangesForPlacement(tokens, before); assertPlacement(after, rg(0, 30, 1, 2, 3), rg(30, 60, 1, 2, 3), @@ -80,10 +80,10 @@ public class UniformRangePlacementTest // (100,200] : (n1,n2,n3); // (200,300] : (n1,n2,n3); // (300,400] : (n1,n2,n3); - PlacementForRange before = initialPlacement(); + ReplicaGroups before = initialPlacement(); // split (100,200] to (100,150], (150,200] List<Token> tokens = ImmutableList.of(token(100), token(150), token(200), token(300)); - PlacementForRange after = PlacementForRange.splitRangesForPlacement(tokens, before); + ReplicaGroups after = ReplicaGroups.splitRangesForPlacement(tokens, before); assertPlacement(after, rg(0, 100, 1, 2, 3), rg(100, 150, 1, 2, 3), @@ -99,11 +99,11 @@ public class UniformRangePlacementTest // (100,200] : (n1,n2,n3); // (200,300] : (n1,n2,n3); // (300,400] : (n1,n2,n3); - PlacementForRange before = initialPlacement(); + ReplicaGroups before = initialPlacement(); // split (100,200] to (100,150],(150,200] // and (200,300] to (200,250],(250,300] List<Token> tokens = ImmutableList.of(token(100), token(150), token(200), token(250), token(300)); - PlacementForRange after = PlacementForRange.splitRangesForPlacement(tokens, before); + ReplicaGroups after = ReplicaGroups.splitRangesForPlacement(tokens, before); assertPlacement(after, rg(0, 100, 1, 2, 3), rg(100, 150, 1, 2, 3), @@ -120,10 +120,10 @@ public class UniformRangePlacementTest // (100,200] : (n1,n2,n3); // (200,300] : (n1,n2,n3); // (300,400] : (n1,n2,n3); - PlacementForRange before = initialPlacement(); + ReplicaGroups before = initialPlacement(); // split (100,200] to (100,125],(125,150],(150,200] List<Token> tokens = ImmutableList.of(token(100), token(125), token(150), token(200), token(300)); - PlacementForRange after = PlacementForRange.splitRangesForPlacement(tokens, before); + ReplicaGroups after = ReplicaGroups.splitRangesForPlacement(tokens, before); assertPlacement(after, rg(0, 100, 1, 2, 3), rg(100, 125, 1, 2, 3), @@ -136,11 +136,11 @@ public class UniformRangePlacementTest @Test public void testSplitMultipleRangesMultipleTimes() { - PlacementForRange before = initialPlacement(); + ReplicaGroups before = initialPlacement(); // split (100,200] to (100,125],(125,150],(150,200] // and (200,300] to (200,225],(225,250],(250,300] List<Token> tokens = ImmutableList.of(token(100), token(125), token(150), token(200), token(225), token(250), token(300)); - PlacementForRange after = PlacementForRange.splitRangesForPlacement(tokens, before); + ReplicaGroups after = ReplicaGroups.splitRangesForPlacement(tokens, before); assertPlacement(after, rg(0, 100, 1, 2, 3), rg(100, 125, 1, 2, 3), @@ -159,10 +159,10 @@ public class UniformRangePlacementTest // (100,200] : (n1,n2,n3); // (200,300] : (n1,n2,n3); // (300,400] : (n1,n2,n3); - PlacementForRange before = initialPlacement(); + ReplicaGroups before = initialPlacement(); // split (300,400] to (300,325],(325,350],(350,400] List<Token> tokens = ImmutableList.of(token(100), token(200), token(300), token(325), token(350)); - PlacementForRange after = PlacementForRange.splitRangesForPlacement(tokens, before); + ReplicaGroups after = ReplicaGroups.splitRangesForPlacement(tokens, before); assertPlacement(after, rg(0, 100, 1, 2, 3), rg(100, 200, 1, 2, 3), @@ -179,10 +179,10 @@ public class UniformRangePlacementTest // (100,200] : (n1,n2,n3); // (200,300] : (n1,n2,n3); // (300,400] : (n1,n2,n3); - PlacementForRange before = initialPlacement(); + ReplicaGroups before = initialPlacement(); // split (0,100] to (0,25],(25,50],(50,100] List<Token> tokens = ImmutableList.of(token(25), token(50), token(100), token(200), token(300)); - PlacementForRange after = PlacementForRange.splitRangesForPlacement(tokens, before); + ReplicaGroups after = ReplicaGroups.splitRangesForPlacement(tokens, before); assertPlacement(after, rg(0, 25, 1, 2, 3), rg(25, 50, 1, 2, 3), @@ -199,22 +199,22 @@ public class UniformRangePlacementTest rg(100, 200, 1, 2, 3), rg(200, 300, 1, 2, 3), rg(300, 400, 1, 2, 3) }; - PlacementForRange p1 = PlacementForRange.builder() - .withReplicaGroups(Arrays.asList(firstGroups).stream().map(this::v).collect(Collectors.toList())) - .build(); + ReplicaGroups p1 = ReplicaGroups.builder() + .withReplicaGroups(Arrays.asList(firstGroups).stream().map(this::v).collect(Collectors.toList())) + .build(); EndpointsForRange[] secondGroups = { rg(0, 100, 2, 3, 4), rg(100, 200, 2, 3, 5), rg(200, 300, 2, 3, 6), rg(300, 400, 2, 3, 7) }; - PlacementForRange p2 = PlacementForRange.builder() - .withReplicaGroups(Arrays.asList(secondGroups).stream().map(this::v).collect(Collectors.toList())) - .build(); + ReplicaGroups p2 = ReplicaGroups.builder() + .withReplicaGroups(Arrays.asList(secondGroups).stream().map(this::v).collect(Collectors.toList())) + .build(); ReplicationParams params = ReplicationParams.simple(1); DataPlacements map1 = DataPlacements.builder(1).with(params, new DataPlacement(p1, p1)).build(); DataPlacements map2 = DataPlacements.builder(1).with(params, new DataPlacement(p2, p2)).build(); DataPlacement p3 = map1.combineReplicaGroups(map2).get(params); - for (PlacementForRange placement : new PlacementForRange[]{ p3.reads, p3.writes }) + for (ReplicaGroups placement : new ReplicaGroups[]{ p3.reads, p3.writes }) { assertPlacement(placement, rg(0, 100, 1, 2, 3, 4), @@ -235,9 +235,7 @@ public class UniformRangePlacementTest DataPlacement initialPlacement = builder.build(); DataPlacement split = initialPlacement.splitRangesForPlacement(tokens); - assertPlacement(split.writes, rg(-3, -9223372036854775808L, 1), - rg(-9223372036854775808L,-4611686018427387905L, 1), - rg(-4611686018427387905L, -3, 1)); + assertPlacement(split.writes, rg(-9223372036854775808L,-4611686018427387905L, 1), rg(-4611686018427387905L, -3, 1), rg(-3, -9223372036854775808L, 1)); } @Test @@ -254,7 +252,7 @@ public class UniformRangePlacementTest DataPlacement initialPlacement = builder.build(); DataPlacement split = initialPlacement.splitRangesForPlacement(tokens); - assertPlacement(split.writes, rg(3074457345618258602L,-9223372036854775808L, 1), rg(-9223372036854775808L, 3074457345618258602L, 1)); + assertPlacement(split.writes, rg(-9223372036854775808L, 3074457345618258602L, 1), rg(3074457345618258602L,-9223372036854775808L, 1)); } @Test @@ -268,25 +266,25 @@ public class UniformRangePlacementTest DataPlacement initialPlacement = builder.build(); List<Token> tokens = ImmutableList.of(token(Long.MIN_VALUE), token(0)); DataPlacement newPlacement = initialPlacement.splitRangesForPlacement(tokens); - assertEquals(2, newPlacement.writes.replicaGroups.values().size()); + assertEquals(2, newPlacement.writes.size()); } - private PlacementForRange initialPlacement() + private ReplicaGroups initialPlacement() { EndpointsForRange[] initialGroups = { rg(0, 100, 1, 2, 3), rg(100, 200, 1, 2, 3), rg(200, 300, 1, 2, 3), rg(300, 400, 1, 2, 3) }; - PlacementForRange placement = PlacementForRange.builder() - .withReplicaGroups(Arrays.asList(initialGroups).stream().map(this::v).collect(Collectors.toList())) - .build(); + ReplicaGroups placement = ReplicaGroups.builder() + .withReplicaGroups(Arrays.stream(initialGroups).map(this::v).collect(Collectors.toList())) + .build(); assertPlacement(placement, initialGroups); return placement; } - private void assertPlacement(PlacementForRange placement, EndpointsForRange...expected) + private void assertPlacement(ReplicaGroups placement, EndpointsForRange...expected) { - Collection<EndpointsForRange> replicaGroups = placement.replicaGroups.values().stream().map(v -> v.get()).collect(Collectors.toList()); + Collection<EndpointsForRange> replicaGroups = placement.endpoints.stream().map(VersionedEndpoints.ForRange::get).collect(Collectors.toList()); assertEquals(replicaGroups.size(), expected.length); int i = 0; boolean allMatch = true; diff --git a/test/unit/org/apache/cassandra/tcm/sequences/InProgressSequenceCancellationTest.java b/test/unit/org/apache/cassandra/tcm/sequences/InProgressSequenceCancellationTest.java index 44b04abcf2..1c1a44296b 100644 --- a/test/unit/org/apache/cassandra/tcm/sequences/InProgressSequenceCancellationTest.java +++ b/test/unit/org/apache/cassandra/tcm/sequences/InProgressSequenceCancellationTest.java @@ -47,7 +47,7 @@ import org.apache.cassandra.tcm.membership.NodeVersion; import org.apache.cassandra.tcm.ownership.DataPlacement; import org.apache.cassandra.tcm.ownership.DataPlacements; import org.apache.cassandra.tcm.ownership.PlacementDeltas; -import org.apache.cassandra.tcm.ownership.PlacementForRange; +import org.apache.cassandra.tcm.ownership.ReplicaGroups; import org.apache.cassandra.tcm.transformations.PrepareJoin; import org.apache.cassandra.tcm.transformations.PrepareLeave; import org.apache.cassandra.tcm.transformations.PrepareReplace; @@ -321,19 +321,19 @@ public class InProgressSequenceCancellationTest first.asMap().forEach((params, placement) -> { DataPlacement otherPlacement = second.get(params); - PlacementForRange r1 = placement.reads; - PlacementForRange r2 = otherPlacement.reads; - assertEquals(r1.replicaGroups().keySet(), r2.replicaGroups().keySet()); - r1.replicaGroups().forEach((range, e1) -> { + ReplicaGroups r1 = placement.reads; + ReplicaGroups r2 = otherPlacement.reads; + assertEquals(r1.ranges, r2.ranges); + r1.forEach((range, e1) -> { EndpointsForRange e2 = r2.forRange(range).get(); assertEquals(e1.size(),e2.size()); assertTrue(e1.get().stream().allMatch(e2::contains)); }); - PlacementForRange w1 = placement.reads; - PlacementForRange w2 = otherPlacement.reads; - assertEquals(w1.replicaGroups().keySet(), w2.replicaGroups().keySet()); - w1.replicaGroups().forEach((range, e1) -> { + ReplicaGroups w1 = placement.reads; + ReplicaGroups w2 = otherPlacement.reads; + assertEquals(w1.ranges, w2.ranges); + w1.forEach((range, e1) -> { EndpointsForRange e2 = w2.forRange(range).get(); assertEquals(e1.size(),e2.size()); assertTrue(e1.get().stream().allMatch(e2::contains)); diff --git a/test/unit/org/apache/cassandra/tcm/sequences/SequencesUtils.java b/test/unit/org/apache/cassandra/tcm/sequences/SequencesUtils.java index 6c0580ea41..eb310cc78b 100644 --- a/test/unit/org/apache/cassandra/tcm/sequences/SequencesUtils.java +++ b/test/unit/org/apache/cassandra/tcm/sequences/SequencesUtils.java @@ -67,7 +67,7 @@ public class SequencesUtils { LockedRanges.AffectedRangesBuilder affected = LockedRanges.AffectedRanges.builder(); placements.asMap().forEach((params, placement) -> { - placement.reads.replicaGroups().keySet().forEach((range) -> { + placement.reads.ranges.forEach((range) -> { if (random.nextDouble() >= 0.6) affected.add(params, range); }); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org