This is an automated email from the ASF dual-hosted git repository.

frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 41686a54 CASSANALYTICS-50: Add support for vnodes (#93)
41686a54 is described below

commit 41686a548125cfdff57be9cb06593a50b7d98ab6
Author: Andrew Johnson <[email protected]>
AuthorDate: Thu May 8 23:10:11 2025 +0100

    CASSANALYTICS-50: Add support for vnodes (#93)
    
    Patch by Andrew Johnson; reviewed by James Berragan, Yifan Cai, Francisco 
Guerrero for CASSANALYTICS-50
---
 CHANGES.txt                                        |  1 +
 .../cassandra/spark/data/CassandraDataLayer.java   |  5 ++++
 .../distributed/impl/CassandraCluster.java         |  9 +++++--
 .../cassandra/testing/TestTokenSupplier.java       |  4 +--
 .../BulkWriteDownInstanceMultipleTokensTest.java   | 31 ++++++++++++++++++++++
 .../BulkWriteDownSidecarMultipleTokensTest.java    | 31 ++++++++++++++++++++++
 ...CassandraAnalyticsSimpleMultipleTokensTest.java | 31 ++++++++++++++++++++++
 ...kWriteS3CompatModeSimpleMultipleTokensTest.java | 31 ++++++++++++++++++++++
 8 files changed, 139 insertions(+), 4 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 960f892f..04181cc7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.0.0
+ * Add support for vnodes (CASSANALYTICS-50)
  * Add CDC Kafka and Avro codecs module to translate CDC mutations into Avro 
format for publication over Kafka (CASSANALYTICS-9)
  * Bump Cassandra version to 4.0.17 and integrate with CQLSSTableWriter 
notification (CASSANALYTICS-5)
  * Bulk writer in S3_COMPAT mode calculates file digest twice 
(CASSANALYTICS-19)
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
index 983cd8ca..00f7438f 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
@@ -29,6 +29,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -340,10 +341,14 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
     {
         Map<String, PartitionedDataLayer.AvailabilityHint> availabilityHints = 
new ConcurrentHashMap<>(ring.size());
 
+        // Set to ensure a snapshot is only created once per host
+        Set<String> distinctInstances = new HashSet<>();
+
         // Fire off create snapshot request across the entire cluster
         List<CompletableFuture<Void>> futures =
         ring.stream()
             .filter(ringEntry -> datacenter == null || 
datacenter.equals(ringEntry.datacenter()))
+            .filter(ringEntry -> distinctInstances.add(ringEntry.fqdn() + ':' 
+ ringEntry.port()))
             .map(ringEntry -> {
                 PartitionedDataLayer.AvailabilityHint hint =
                 
PartitionedDataLayer.AvailabilityHint.fromState(ringEntry.status(), 
ringEntry.state());
diff --git 
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
 
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
index fc082a46..a719539c 100644
--- 
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
+++ 
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
@@ -101,12 +101,17 @@ public class CassandraCluster<I extends IInstance> 
implements IClusterExtension<
                       .withTokenCount(configuration.tokenCount)
                       .withDataDirCount(configuration.numDataDirsPerInstance);
 
+        if (configuration.tokenCount > 1)
+        {
+            clusterBuilder.withVNodes();
+        }
+
         TokenSupplier tokenSupplier;
         Consumer<IInstanceConfig> instanceConfigUpdater;
         if (configuration.partitioner != null)
         {
             tokenSupplier = 
TestTokenSupplier.evenlyDistributedTokens(Partitioner.fromClassName(configuration.partitioner),
-                                                                      
nodesPerDc, newNodesPerDc, dcCount, 1);
+                                                                      
nodesPerDc, newNodesPerDc, dcCount, configuration.tokenCount);
             instanceConfigUpdater = instanceConfig -> {
                 instanceConfig.set("partitioner", configuration.partitioner);
                 configuration.features.forEach(instanceConfig::with);
@@ -114,7 +119,7 @@ public class CassandraCluster<I extends IInstance> 
implements IClusterExtension<
         }
         else
         {
-            tokenSupplier = 
TestTokenSupplier.evenlyDistributedTokens(nodesPerDc, newNodesPerDc, dcCount, 
1);
+            tokenSupplier = 
TestTokenSupplier.evenlyDistributedTokens(nodesPerDc, newNodesPerDc, dcCount, 
configuration.tokenCount);
             instanceConfigUpdater = config -> 
configuration.features.forEach(config::with);
         }
         if (configuration.additionalInstanceConfig != null)
diff --git 
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/TestTokenSupplier.java
 
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/TestTokenSupplier.java
index e068b024..449072cb 100644
--- 
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/TestTokenSupplier.java
+++ 
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/TestTokenSupplier.java
@@ -117,14 +117,14 @@ public class TestTokenSupplier
                                                              int 
numTokensPerNode,
                                                              BigInteger 
increment)
     {
-        List<String>[] tokens = new List[(numNodesPerDC + newNodesPerDC) * 
numDcs];
+        List<String>[] tokens = new List[(numNodesPerDC + newNodesPerDC) * 
numDcs * numTokensPerNode];
         Arrays.setAll(tokens, ignored -> new ArrayList<>(numTokensPerNode));
 
         BigInteger value = partitioner.minToken.add(BigInteger.ONE);
 
-        int nodeId = 1;
         for (int i = 0; i < numTokensPerNode; ++i)
         {
+            int nodeId = 1;
             while (nodeId <= (numNodesPerDC * numDcs))
             {
                 value = value.add(increment);
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDownInstanceMultipleTokensTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDownInstanceMultipleTokensTest.java
new file mode 100644
index 00000000..3351d546
--- /dev/null
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDownInstanceMultipleTokensTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import org.apache.cassandra.testing.ClusterBuilderConfiguration;
+
+public class BulkWriteDownInstanceMultipleTokensTest extends 
BulkWriteDownInstanceTest
+{
+    @Override
+    protected ClusterBuilderConfiguration testClusterConfiguration()
+    {
+        return super.testClusterConfiguration()
+                    .tokenCount(4);
+    }
+}
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDownSidecarMultipleTokensTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDownSidecarMultipleTokensTest.java
new file mode 100644
index 00000000..57e98920
--- /dev/null
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDownSidecarMultipleTokensTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import org.apache.cassandra.testing.ClusterBuilderConfiguration;
+
+public class BulkWriteDownSidecarMultipleTokensTest extends 
BulkWriteDownSidecarTest
+{
+    @Override
+    protected ClusterBuilderConfiguration testClusterConfiguration()
+    {
+        return super.testClusterConfiguration()
+                    .tokenCount(4);
+    }
+}
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/CassandraAnalyticsSimpleMultipleTokensTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/CassandraAnalyticsSimpleMultipleTokensTest.java
new file mode 100644
index 00000000..b5e9cf78
--- /dev/null
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/CassandraAnalyticsSimpleMultipleTokensTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import org.apache.cassandra.testing.ClusterBuilderConfiguration;
+
+public class CassandraAnalyticsSimpleMultipleTokensTest extends 
CassandraAnalyticsSimpleTest
+{
+    @Override
+    protected ClusterBuilderConfiguration testClusterConfiguration()
+    {
+        return super.testClusterConfiguration()
+                    .tokenCount(4);
+    }
+}
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/testcontainer/BulkWriteS3CompatModeSimpleMultipleTokensTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/testcontainer/BulkWriteS3CompatModeSimpleMultipleTokensTest.java
new file mode 100644
index 00000000..c75ba488
--- /dev/null
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/testcontainer/BulkWriteS3CompatModeSimpleMultipleTokensTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics.testcontainer;
+
+import org.apache.cassandra.testing.ClusterBuilderConfiguration;
+
+public class BulkWriteS3CompatModeSimpleMultipleTokensTest extends 
BulkWriteS3CompatModeSimpleTest
+{
+    @Override
+    protected ClusterBuilderConfiguration testClusterConfiguration()
+    {
+        return super.testClusterConfiguration()
+                    .tokenCount(4);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to