This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-in-jvm-dtest-api.git
The following commit(s) were added to refs/heads/trunk by this push: new c629105 Add support for vnodes in jvm-dtest c629105 is described below commit c629105d15a10d6166ddf393dc38d0b0ab87743d Author: dcapwell <dcapw...@gmail.com> AuthorDate: Mon Feb 7 15:15:51 2022 -0800 Add support for vnodes in jvm-dtest Patch by David Capwell; reviewed by Alex Petrov, Josh McKenzie for CASSANDRA-17332 --- pom.xml | 6 ++ .../apache/cassandra/distributed/api/ICluster.java | 11 +++- .../cassandra/distributed/api/QueryResults.java | 2 +- .../org/apache/cassandra/distributed/api/Row.java | 22 +++++-- .../distributed/api/SimpleQueryResult.java | 14 ++-- .../cassandra/distributed/api/TokenSupplier.java | 41 ++++++++++-- .../distributed/shared/AbstractBuilder.java | 58 ++++++++++++++++- .../cassandra/distributed/shared/Versions.java | 10 ++- .../distributed/api/TokenSupplierTest.java | 75 ++++++++++++++++++++++ 9 files changed, 216 insertions(+), 23 deletions(-) diff --git a/pom.xml b/pom.xml index 534390e..7dc5603 100644 --- a/pom.xml +++ b/pom.xml @@ -80,6 +80,12 @@ <version>3.5.10</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.quicktheories</groupId> + <artifactId>quicktheories</artifactId> + <version>0.26</version> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/src/main/java/org/apache/cassandra/distributed/api/ICluster.java b/src/main/java/org/apache/cassandra/distributed/api/ICluster.java index 4af4ae5..f5ff75d 100644 --- a/src/main/java/org/apache/cassandra/distributed/api/ICluster.java +++ b/src/main/java/org/apache/cassandra/distributed/api/ICluster.java @@ -24,13 +24,14 @@ import java.net.InetSocketAddress; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Iterator; import java.util.function.BiPredicate; import java.util.function.Predicate; import java.util.stream.Stream; -public interface ICluster<I extends IInstance> extends AutoCloseable +public interface ICluster<I extends IInstance> extends AutoCloseable, Iterable<I> { - public static final String PROPERTY_PREFIX = "cassandra.test"; + String PROPERTY_PREFIX = "cassandra.test"; void startup(); @@ -54,6 +55,12 @@ public interface ICluster<I extends IInstance> extends AutoCloseable Stream<I> stream(String dcName, String rackName); + @Override + default Iterator<I> iterator() + { + return stream().iterator(); + } + IMessageFilters filters(); default void setMessageSink(IMessageSink messageSink) { throw new UnsupportedOperationException(); } diff --git a/src/main/java/org/apache/cassandra/distributed/api/QueryResults.java b/src/main/java/org/apache/cassandra/distributed/api/QueryResults.java index 081d06a..b1c5ca7 100644 --- a/src/main/java/org/apache/cassandra/distributed/api/QueryResults.java +++ b/src/main/java/org/apache/cassandra/distributed/api/QueryResults.java @@ -58,7 +58,7 @@ public final class QueryResults @Override public Row next() { - row.setResults(iterator.next()); + row.unsafeSetResults(iterator.next()); return row; } }); diff --git a/src/main/java/org/apache/cassandra/distributed/api/Row.java b/src/main/java/org/apache/cassandra/distributed/api/Row.java index ff4efbe..5487d5f 100644 --- a/src/main/java/org/apache/cassandra/distributed/api/Row.java +++ b/src/main/java/org/apache/cassandra/distributed/api/Row.java @@ -19,10 +19,8 @@ package org.apache.cassandra.distributed.api; import java.util.Arrays; -import java.util.Collections; import java.util.Date; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; @@ -62,8 +60,24 @@ public class Row this.nameIndex = nameIndex; } - void setResults(Object[] results) + public static Row of(Object... results) { + String[] names = new String[results.length]; + for (int i = 0; i < names.length; i++) + names[i] = "c" + i; + Row row = new Row(names); + row.setResults(results); + return row; + } + + void unsafeSetResults(Object[] results) + { + this.results = results; + } + + public void setResults(Object... results) + { + assert names.length == results.length : "Column names " + Arrays.toString(names) + " does not have the same length as results " + Arrays.toString(results); this.results = results; } @@ -73,7 +87,7 @@ public class Row public Row copy() { Row copy = new Row(names, nameIndex); - copy.setResults(results); + copy.unsafeSetResults(results); return copy; } diff --git a/src/main/java/org/apache/cassandra/distributed/api/SimpleQueryResult.java b/src/main/java/org/apache/cassandra/distributed/api/SimpleQueryResult.java index 04509e2..5e58d37 100644 --- a/src/main/java/org/apache/cassandra/distributed/api/SimpleQueryResult.java +++ b/src/main/java/org/apache/cassandra/distributed/api/SimpleQueryResult.java @@ -19,6 +19,7 @@ package org.apache.cassandra.distributed.api; import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; import java.util.Objects; @@ -59,7 +60,7 @@ import java.util.stream.Stream; * points to newer data. If this behavior is not desirable and access is needed between calls, then {@link Row#copy()} * should be used; this will clone the {@link Row} and return a new object pointing to the same data. */ -public class SimpleQueryResult implements QueryResult +public class SimpleQueryResult implements QueryResult, Iterable<Row> { private final String[] names; private final Object[][] results; @@ -108,13 +109,18 @@ public class SimpleQueryResult implements QueryResult return new SimpleQueryResult(names, results, filter.and(fn), offset); } + @Override + public Iterator<Row> iterator() { + return new SimpleQueryResult(names, results, filter, offset); + } + /** * Reset the cursor to the start of the query result; if the query result has not been iterated, this has no effect. */ public void reset() { offset = -1; - row.setResults(null); + row.unsafeSetResults(null); } /** @@ -133,13 +139,13 @@ public class SimpleQueryResult implements QueryResult return false; while ((offset += 1) < results.length) { - row.setResults(results[offset]); + row.unsafeSetResults(results[offset]); if (filter.test(row)) { return true; } } - row.setResults(null); + row.unsafeSetResults(null); return false; } diff --git a/src/main/java/org/apache/cassandra/distributed/api/TokenSupplier.java b/src/main/java/org/apache/cassandra/distributed/api/TokenSupplier.java index ebc921c..96f51bc 100644 --- a/src/main/java/org/apache/cassandra/distributed/api/TokenSupplier.java +++ b/src/main/java/org/apache/cassandra/distributed/api/TokenSupplier.java @@ -18,17 +18,44 @@ package org.apache.cassandra.distributed.api; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + public interface TokenSupplier { - long token(int nodeId); + Collection<String> tokens(int nodeId); + + @Deprecated + default long token(int nodeId) + { + Collection<String> tokens = tokens(nodeId); + assert tokens.size() == 1: "tokens function returned multiple tokens, only expected 1: " + tokens; + return Long.parseLong(tokens.stream().findFirst().get()); + } + @Deprecated static TokenSupplier evenlyDistributedTokens(int numNodes) { - long increment = (Long.MAX_VALUE / numNodes) * 2; - return (int nodeId) -> { - assert nodeId <= numNodes : String.format("Can not allocate a token for a node %s, since only %s nodes are allowed by the token allocation strategy", - nodeId, numNodes); - return Long.MIN_VALUE + 1 + nodeId * increment; - }; + return evenlyDistributedTokens(numNodes, 1); + } + + static TokenSupplier evenlyDistributedTokens(int numNodes, int numTokens) + { + long increment = (Long.MAX_VALUE / (numNodes * numTokens)) * 2; + List<String>[] tokens = new List[numNodes]; + for (int i = 0; i < numNodes; i++) + tokens[i] = new ArrayList<>(numTokens); + + long value = Long.MIN_VALUE + 1; + for (int i = 0; i < numTokens; i++) + { + for (int nodeId = 1; nodeId <= numNodes; nodeId++) + { + value += increment; + tokens[nodeId - 1].add(Long.toString(value)); + } + } + return (int nodeId) -> tokens[nodeId - 1]; } } diff --git a/src/main/java/org/apache/cassandra/distributed/shared/AbstractBuilder.java b/src/main/java/org/apache/cassandra/distributed/shared/AbstractBuilder.java index d3c3494..665cdc5 100644 --- a/src/main/java/org/apache/cassandra/distributed/shared/AbstractBuilder.java +++ b/src/main/java/org/apache/cassandra/distributed/shared/AbstractBuilder.java @@ -26,13 +26,13 @@ import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.function.BiConsumer; -import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -64,6 +64,16 @@ public abstract class AbstractBuilder<I extends IInstance, C extends ICluster, B private int datadirCount = 3; private final List<Rack> racks = new ArrayList<>(); private boolean finalised; + private int tokenCount = getDefaultTokenCount(); + private boolean allowVnodes = true; + + protected int getDefaultTokenCount() { + String key = "cassandra.dtest.num_tokens"; + String value = System.getProperty(key); + if (value == null) + value = System.getenv(key.replace(".", "_").toUpperCase()); + return value == null ? 1 : Integer.parseInt(value); + } public AbstractBuilder(Factory<I, C, B> factory) { @@ -135,6 +145,14 @@ public abstract class AbstractBuilder<I extends IInstance, C extends ICluster, B return datadirCount; } + public int getTokenCount() { + return tokenCount; + } + + public boolean isAllowVnodes() { + return allowVnodes; + } + public C start() throws IOException { C cluster = createWithoutStarting(); @@ -153,7 +171,7 @@ public abstract class AbstractBuilder<I extends IInstance, C extends ICluster, B // TODO: make token allocation strategy configurable if (tokenSupplier == null) - tokenSupplier = evenlyDistributedTokens(nodeCount); + tokenSupplier = evenlyDistributedTokens(nodeCount, tokenCount); return factory.newCluster((B) this); } @@ -181,6 +199,29 @@ public abstract class AbstractBuilder<I extends IInstance, C extends ICluster, B return (B) this; } + @Deprecated + public B withTokenSupplier(SingleTokenSupplier tokenSupplier) + { + this.tokenSupplier = tokenSupplier; + return (B) this; + } + + /** + * This class is for source backwards compatability + */ + @Deprecated + public interface SingleTokenSupplier extends TokenSupplier + { + @Override + default Collection<String> tokens(int nodeId) + { + return Collections.singletonList(Long.toString(token(nodeId))); + } + + @Override + long token(int nodeId); + } + public B withSubnet(int subnet) { this.subnet = subnet; @@ -339,6 +380,19 @@ public abstract class AbstractBuilder<I extends IInstance, C extends ICluster, B return (B) this; } + public B withTokenCount(int tokenCount) + { + assert tokenCount > 0 : "Token count must be positive; given " + tokenCount; + this.tokenCount = tokenCount; + return (B) this; + } + + public B disallowVNodes() + { + this.allowVnodes = false; + return (B) this; + } + private void finaliseBuilder() { if (finalised) diff --git a/src/main/java/org/apache/cassandra/distributed/shared/Versions.java b/src/main/java/org/apache/cassandra/distributed/shared/Versions.java index 1fb7149..f12a7b4 100644 --- a/src/main/java/org/apache/cassandra/distributed/shared/Versions.java +++ b/src/main/java/org/apache/cassandra/distributed/shared/Versions.java @@ -27,6 +27,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -101,11 +102,14 @@ public final class Versions public Version get(Semver version) { - return versions.get(first(version)) - .stream() + Supplier<RuntimeException> onError = () -> new RuntimeException("No version " + version.getOriginalValue() + " found"); + List<Version> versions = this.versions.get(first(version)); + if (versions == null) + throw onError.get(); + return versions.stream() .filter(v -> version.equals(v.version)) .findFirst() - .orElseThrow(() -> new RuntimeException("No version " + version.getOriginalValue() + " found")); + .orElseThrow(onError); } private static Semver first(Semver version) diff --git a/src/test/java/org/apache/cassandra/distributed/api/TokenSupplierTest.java b/src/test/java/org/apache/cassandra/distributed/api/TokenSupplierTest.java new file mode 100644 index 0000000..d56df1f --- /dev/null +++ b/src/test/java/org/apache/cassandra/distributed/api/TokenSupplierTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.distributed.api; + +import org.junit.jupiter.api.Test; +import org.quicktheories.core.Gen; +import org.quicktheories.generators.SourceDSL; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.SortedSet; +import java.util.TreeSet; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.quicktheories.QuickTheory.qt; + +public class TokenSupplierTest { + @Test + public void evenlyDistributedTokens() { + Gen<Integer> nodeGen = SourceDSL.integers().between(1, 100); + Gen<Integer> tokenGen = SourceDSL.integers().between(1, 24); + qt().forAll(nodeGen, tokenGen).checkAssert((numNodes, numTokens) -> { + TokenSupplier ts = TokenSupplier.evenlyDistributedTokens(numNodes, numTokens); + SortedSet<Long> sortedTokens = new TreeSet<>(); + for (int i = 0; i < numNodes; i++) { + Collection<String> tokens = ts.tokens(i + 1); + assertThat(tokens).hasSize(numTokens); + tokens.forEach(s -> sortedTokens.add(Long.valueOf(s))); + } + Long previous = null; + List<Long> diff = new ArrayList<>(sortedTokens.size() - 1); + for (Long token : sortedTokens) { + if (previous != null) + diff.add(token - previous); + previous = token; + } + + assertThat(calculateSD(diff)).isLessThan(1_000); + }); + } + + private static double calculateSD(Collection<Long> values) + { + if (values.isEmpty()) + return 0; + double sum = 0.0; + double sd = 0.0; + + for (double num : values) + sum += num; + + double mean = sum / values.size(); + + for (double num : values) + sd += Math.pow(num - mean, 2); + + return Math.sqrt(sd / values.size()); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org