This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-in-jvm-dtest-api.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c629105  Add support for vnodes in jvm-dtest
c629105 is described below

commit c629105d15a10d6166ddf393dc38d0b0ab87743d
Author: dcapwell <dcapw...@gmail.com>
AuthorDate: Mon Feb 7 15:15:51 2022 -0800

    Add support for vnodes in jvm-dtest
    
    Patch by David Capwell; reviewed by Alex Petrov, Josh McKenzie for 
CASSANDRA-17332
---
 pom.xml                                            |  6 ++
 .../apache/cassandra/distributed/api/ICluster.java | 11 +++-
 .../cassandra/distributed/api/QueryResults.java    |  2 +-
 .../org/apache/cassandra/distributed/api/Row.java  | 22 +++++--
 .../distributed/api/SimpleQueryResult.java         | 14 ++--
 .../cassandra/distributed/api/TokenSupplier.java   | 41 ++++++++++--
 .../distributed/shared/AbstractBuilder.java        | 58 ++++++++++++++++-
 .../cassandra/distributed/shared/Versions.java     | 10 ++-
 .../distributed/api/TokenSupplierTest.java         | 75 ++++++++++++++++++++++
 9 files changed, 216 insertions(+), 23 deletions(-)

diff --git a/pom.xml b/pom.xml
index 534390e..7dc5603 100644
--- a/pom.xml
+++ b/pom.xml
@@ -80,6 +80,12 @@
             <version>3.5.10</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.quicktheories</groupId>
+            <artifactId>quicktheories</artifactId>
+            <version>0.26</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/src/main/java/org/apache/cassandra/distributed/api/ICluster.java 
b/src/main/java/org/apache/cassandra/distributed/api/ICluster.java
index 4af4ae5..f5ff75d 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/ICluster.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/ICluster.java
@@ -24,13 +24,14 @@ import java.net.InetSocketAddress;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Iterator;
 import java.util.function.BiPredicate;
 import java.util.function.Predicate;
 import java.util.stream.Stream;
 
-public interface ICluster<I extends IInstance> extends AutoCloseable
+public interface ICluster<I extends IInstance> extends AutoCloseable, 
Iterable<I>
 {
-    public static final String PROPERTY_PREFIX = "cassandra.test";
+    String PROPERTY_PREFIX = "cassandra.test";
 
     void startup();
 
@@ -54,6 +55,12 @@ public interface ICluster<I extends IInstance> extends 
AutoCloseable
 
     Stream<I> stream(String dcName, String rackName);
 
+    @Override
+    default Iterator<I> iterator()
+    {
+        return stream().iterator();
+    }
+
     IMessageFilters filters();
 
     default void setMessageSink(IMessageSink messageSink) { throw new 
UnsupportedOperationException(); }
diff --git 
a/src/main/java/org/apache/cassandra/distributed/api/QueryResults.java 
b/src/main/java/org/apache/cassandra/distributed/api/QueryResults.java
index 081d06a..b1c5ca7 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/QueryResults.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/QueryResults.java
@@ -58,7 +58,7 @@ public final class QueryResults
             @Override
             public Row next()
             {
-                row.setResults(iterator.next());
+                row.unsafeSetResults(iterator.next());
                 return row;
             }
         });
diff --git a/src/main/java/org/apache/cassandra/distributed/api/Row.java 
b/src/main/java/org/apache/cassandra/distributed/api/Row.java
index ff4efbe..5487d5f 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/Row.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/Row.java
@@ -19,10 +19,8 @@
 package org.apache.cassandra.distributed.api;
 
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
@@ -62,8 +60,24 @@ public class Row
         this.nameIndex = nameIndex;
     }
 
-    void setResults(Object[] results)
+    public static Row of(Object... results)
     {
+        String[] names = new String[results.length];
+        for (int i = 0; i < names.length; i++)
+            names[i] = "c" + i;
+        Row row = new Row(names);
+        row.setResults(results);
+        return row;
+    }
+
+    void unsafeSetResults(Object[] results)
+    {
+        this.results = results;
+    }
+
+    public void setResults(Object... results)
+    {
+        assert names.length == results.length : "Column names " + 
Arrays.toString(names) + " does not have the same length as results " + 
Arrays.toString(results);
         this.results = results;
     }
 
@@ -73,7 +87,7 @@ public class Row
     public Row copy()
     {
         Row copy = new Row(names, nameIndex);
-        copy.setResults(results);
+        copy.unsafeSetResults(results);
         return copy;
     }
 
diff --git 
a/src/main/java/org/apache/cassandra/distributed/api/SimpleQueryResult.java 
b/src/main/java/org/apache/cassandra/distributed/api/SimpleQueryResult.java
index 04509e2..5e58d37 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/SimpleQueryResult.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/SimpleQueryResult.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.distributed.api;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Objects;
@@ -59,7 +60,7 @@ import java.util.stream.Stream;
  * points to newer data.  If this behavior is not desirable and access is 
needed between calls, then {@link Row#copy()}
  * should be used; this will clone the {@link Row} and return a new object 
pointing to the same data.
  */
-public class SimpleQueryResult implements QueryResult
+public class SimpleQueryResult implements QueryResult, Iterable<Row>
 {
     private final String[] names;
     private final Object[][] results;
@@ -108,13 +109,18 @@ public class SimpleQueryResult implements QueryResult
         return new SimpleQueryResult(names, results, filter.and(fn), offset);
     }
 
+    @Override
+    public Iterator<Row> iterator() {
+        return new SimpleQueryResult(names, results, filter, offset);
+    }
+
     /**
      * Reset the cursor to the start of the query result; if the query result 
has not been iterated, this has no effect.
      */
     public void reset()
     {
         offset = -1;
-        row.setResults(null);
+        row.unsafeSetResults(null);
     }
 
     /**
@@ -133,13 +139,13 @@ public class SimpleQueryResult implements QueryResult
             return false;
         while ((offset += 1) < results.length)
         {
-            row.setResults(results[offset]);
+            row.unsafeSetResults(results[offset]);
             if (filter.test(row))
             {
                 return true;
             }
         }
-        row.setResults(null);
+        row.unsafeSetResults(null);
         return false;
     }
 
diff --git 
a/src/main/java/org/apache/cassandra/distributed/api/TokenSupplier.java 
b/src/main/java/org/apache/cassandra/distributed/api/TokenSupplier.java
index ebc921c..96f51bc 100644
--- a/src/main/java/org/apache/cassandra/distributed/api/TokenSupplier.java
+++ b/src/main/java/org/apache/cassandra/distributed/api/TokenSupplier.java
@@ -18,17 +18,44 @@
 
 package org.apache.cassandra.distributed.api;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
 public interface TokenSupplier
 {
-    long token(int nodeId);
+    Collection<String> tokens(int nodeId);
+
+    @Deprecated
+    default long token(int nodeId)
+    {
+        Collection<String> tokens = tokens(nodeId);
+        assert tokens.size() == 1: "tokens function returned multiple tokens, 
only expected 1: " + tokens;
+        return Long.parseLong(tokens.stream().findFirst().get());
+    }
 
+    @Deprecated
     static TokenSupplier evenlyDistributedTokens(int numNodes)
     {
-        long increment = (Long.MAX_VALUE / numNodes) * 2;
-        return (int nodeId) -> {
-            assert nodeId <= numNodes : String.format("Can not allocate a 
token for a node %s, since only %s nodes are allowed by the token allocation 
strategy",
-                                                      nodeId, numNodes);
-            return Long.MIN_VALUE + 1 + nodeId * increment;
-        };
+        return evenlyDistributedTokens(numNodes, 1);
+    }
+
+    static TokenSupplier evenlyDistributedTokens(int numNodes, int numTokens)
+    {
+        long increment = (Long.MAX_VALUE / (numNodes * numTokens)) * 2;
+        List<String>[] tokens = new List[numNodes];
+        for (int i = 0; i < numNodes; i++)
+            tokens[i] = new ArrayList<>(numTokens);
+
+        long value = Long.MIN_VALUE + 1;
+        for (int i = 0; i < numTokens; i++)
+        {
+            for (int nodeId = 1; nodeId <= numNodes; nodeId++)
+            {
+                value += increment;
+                tokens[nodeId - 1].add(Long.toString(value));
+            }
+        }
+        return (int nodeId) -> tokens[nodeId - 1];
     }
 }
diff --git 
a/src/main/java/org/apache/cassandra/distributed/shared/AbstractBuilder.java 
b/src/main/java/org/apache/cassandra/distributed/shared/AbstractBuilder.java
index d3c3494..665cdc5 100644
--- a/src/main/java/org/apache/cassandra/distributed/shared/AbstractBuilder.java
+++ b/src/main/java/org/apache/cassandra/distributed/shared/AbstractBuilder.java
@@ -26,13 +26,13 @@ import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
@@ -64,6 +64,16 @@ public abstract class AbstractBuilder<I extends IInstance, C 
extends ICluster, B
     private int datadirCount = 3;
     private final List<Rack> racks = new ArrayList<>();
     private boolean finalised;
+    private int tokenCount = getDefaultTokenCount();
+    private boolean allowVnodes = true;
+
+    protected int getDefaultTokenCount() {
+        String key = "cassandra.dtest.num_tokens";
+        String value = System.getProperty(key);
+        if (value == null)
+            value = System.getenv(key.replace(".", "_").toUpperCase());
+        return value == null ? 1 : Integer.parseInt(value);
+    }
 
     public AbstractBuilder(Factory<I, C, B> factory)
     {
@@ -135,6 +145,14 @@ public abstract class AbstractBuilder<I extends IInstance, 
C extends ICluster, B
         return datadirCount;
     }
 
+    public int getTokenCount() {
+        return tokenCount;
+    }
+
+    public boolean isAllowVnodes() {
+        return allowVnodes;
+    }
+
     public C start() throws IOException
     {
         C cluster = createWithoutStarting();
@@ -153,7 +171,7 @@ public abstract class AbstractBuilder<I extends IInstance, 
C extends ICluster, B
 
         // TODO: make token allocation strategy configurable
         if (tokenSupplier == null)
-            tokenSupplier = evenlyDistributedTokens(nodeCount);
+            tokenSupplier = evenlyDistributedTokens(nodeCount, tokenCount);
 
         return factory.newCluster((B) this);
     }
@@ -181,6 +199,29 @@ public abstract class AbstractBuilder<I extends IInstance, 
C extends ICluster, B
         return (B) this;
     }
 
+    @Deprecated
+    public B withTokenSupplier(SingleTokenSupplier tokenSupplier)
+    {
+        this.tokenSupplier = tokenSupplier;
+        return (B) this;
+    }
+
+    /**
+     * This class is for source backwards compatability
+     */
+    @Deprecated
+    public interface SingleTokenSupplier extends TokenSupplier
+    {
+        @Override
+        default Collection<String> tokens(int nodeId)
+        {
+            return Collections.singletonList(Long.toString(token(nodeId)));
+        }
+
+        @Override
+        long token(int nodeId);
+    }
+
     public B withSubnet(int subnet)
     {
         this.subnet = subnet;
@@ -339,6 +380,19 @@ public abstract class AbstractBuilder<I extends IInstance, 
C extends ICluster, B
         return (B) this;
     }
 
+    public B withTokenCount(int tokenCount)
+    {
+        assert tokenCount > 0 : "Token count must be positive; given " + 
tokenCount;
+        this.tokenCount = tokenCount;
+        return (B) this;
+    }
+
+    public B disallowVNodes()
+    {
+        this.allowVnodes = false;
+        return (B) this;
+    }
+
     private void finaliseBuilder()
     {
         if (finalised)
diff --git 
a/src/main/java/org/apache/cassandra/distributed/shared/Versions.java 
b/src/main/java/org/apache/cassandra/distributed/shared/Versions.java
index 1fb7149..f12a7b4 100644
--- a/src/main/java/org/apache/cassandra/distributed/shared/Versions.java
+++ b/src/main/java/org/apache/cassandra/distributed/shared/Versions.java
@@ -27,6 +27,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Supplier;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -101,11 +102,14 @@ public final class Versions
 
     public Version get(Semver version)
     {
-        return versions.get(first(version))
-                       .stream()
+        Supplier<RuntimeException> onError = () -> new RuntimeException("No 
version " + version.getOriginalValue() + " found");
+        List<Version> versions = this.versions.get(first(version));
+        if (versions == null)
+            throw onError.get();
+        return versions.stream()
                        .filter(v -> version.equals(v.version))
                        .findFirst()
-                       .orElseThrow(() -> new RuntimeException("No version " + 
version.getOriginalValue() + " found"));
+                       .orElseThrow(onError);
     }
 
     private static Semver first(Semver version)
diff --git 
a/src/test/java/org/apache/cassandra/distributed/api/TokenSupplierTest.java 
b/src/test/java/org/apache/cassandra/distributed/api/TokenSupplierTest.java
new file mode 100644
index 0000000..d56df1f
--- /dev/null
+++ b/src/test/java/org/apache/cassandra/distributed/api/TokenSupplierTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.api;
+
+import org.junit.jupiter.api.Test;
+import org.quicktheories.core.Gen;
+import org.quicktheories.generators.SourceDSL;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.quicktheories.QuickTheory.qt;
+
+public class TokenSupplierTest {
+    @Test
+    public void evenlyDistributedTokens() {
+        Gen<Integer> nodeGen = SourceDSL.integers().between(1, 100);
+        Gen<Integer> tokenGen = SourceDSL.integers().between(1, 24);
+        qt().forAll(nodeGen, tokenGen).checkAssert((numNodes, numTokens) -> {
+            TokenSupplier ts = TokenSupplier.evenlyDistributedTokens(numNodes, 
numTokens);
+            SortedSet<Long> sortedTokens = new TreeSet<>();
+            for (int i = 0; i < numNodes; i++) {
+                Collection<String> tokens = ts.tokens(i + 1);
+                assertThat(tokens).hasSize(numTokens);
+                tokens.forEach(s -> sortedTokens.add(Long.valueOf(s)));
+            }
+            Long previous = null;
+            List<Long> diff = new ArrayList<>(sortedTokens.size() - 1);
+            for (Long token : sortedTokens) {
+                if (previous != null)
+                    diff.add(token - previous);
+                previous = token;
+            }
+
+            assertThat(calculateSD(diff)).isLessThan(1_000);
+        });
+    }
+
+    private static double calculateSD(Collection<Long> values)
+    {
+        if (values.isEmpty())
+            return 0;
+        double sum = 0.0;
+        double sd = 0.0;
+
+        for (double num : values)
+            sum += num;
+
+        double mean = sum / values.size();
+
+        for (double num : values)
+            sd += Math.pow(num - mean, 2);
+
+        return Math.sqrt(sd / values.size());
+    }
+}
\ No newline at end of file

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to