This is an automated email from the ASF dual-hosted git repository.
frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new 41686a54 CASSANALYTICS-50: Add support for vnodes (#93)
41686a54 is described below
commit 41686a548125cfdff57be9cb06593a50b7d98ab6
Author: Andrew Johnson <[email protected]>
AuthorDate: Thu May 8 23:10:11 2025 +0100
CASSANALYTICS-50: Add support for vnodes (#93)
Patch by Andrew Johnson; reviewed by James Berragan, Yifan Cai, Francisco
Guerrero for CASSANALYTICS-50
---
CHANGES.txt | 1 +
.../cassandra/spark/data/CassandraDataLayer.java | 5 ++++
.../distributed/impl/CassandraCluster.java | 9 +++++--
.../cassandra/testing/TestTokenSupplier.java | 4 +--
.../BulkWriteDownInstanceMultipleTokensTest.java | 31 ++++++++++++++++++++++
.../BulkWriteDownSidecarMultipleTokensTest.java | 31 ++++++++++++++++++++++
...CassandraAnalyticsSimpleMultipleTokensTest.java | 31 ++++++++++++++++++++++
...kWriteS3CompatModeSimpleMultipleTokensTest.java | 31 ++++++++++++++++++++++
8 files changed, 139 insertions(+), 4 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 960f892f..04181cc7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.0.0
+ * Add support for vnodes (CASSANALYTICS-50)
* Add CDC Kafka and Avro codecs module to translate CDC mutations into Avro
format for publication over Kafka (CASSANALYTICS-9)
* Bump Cassandra version to 4.0.17 and integrate with CQLSSTableWriter
notification (CASSANALYTICS-5)
* Bulk writer in S3_COMPAT mode calculates file digest twice
(CASSANALYTICS-19)
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
index 983cd8ca..00f7438f 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
@@ -29,6 +29,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -340,10 +341,14 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
{
Map<String, PartitionedDataLayer.AvailabilityHint> availabilityHints =
new ConcurrentHashMap<>(ring.size());
+ // Set to ensure a snapshot is only created once per host
+ Set<String> distinctInstances = new HashSet<>();
+
// Fire off create snapshot request across the entire cluster
List<CompletableFuture<Void>> futures =
ring.stream()
.filter(ringEntry -> datacenter == null ||
datacenter.equals(ringEntry.datacenter()))
+ .filter(ringEntry -> distinctInstances.add(ringEntry.fqdn() + ':'
+ ringEntry.port()))
.map(ringEntry -> {
PartitionedDataLayer.AvailabilityHint hint =
PartitionedDataLayer.AvailabilityHint.fromState(ringEntry.status(),
ringEntry.state());
diff --git
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
index fc082a46..a719539c 100644
---
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
+++
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
@@ -101,12 +101,17 @@ public class CassandraCluster<I extends IInstance>
implements IClusterExtension<
.withTokenCount(configuration.tokenCount)
.withDataDirCount(configuration.numDataDirsPerInstance);
+ if (configuration.tokenCount > 1)
+ {
+ clusterBuilder.withVNodes();
+ }
+
TokenSupplier tokenSupplier;
Consumer<IInstanceConfig> instanceConfigUpdater;
if (configuration.partitioner != null)
{
tokenSupplier =
TestTokenSupplier.evenlyDistributedTokens(Partitioner.fromClassName(configuration.partitioner),
-
nodesPerDc, newNodesPerDc, dcCount, 1);
+
nodesPerDc, newNodesPerDc, dcCount, configuration.tokenCount);
instanceConfigUpdater = instanceConfig -> {
instanceConfig.set("partitioner", configuration.partitioner);
configuration.features.forEach(instanceConfig::with);
@@ -114,7 +119,7 @@ public class CassandraCluster<I extends IInstance>
implements IClusterExtension<
}
else
{
- tokenSupplier =
TestTokenSupplier.evenlyDistributedTokens(nodesPerDc, newNodesPerDc, dcCount,
1);
+ tokenSupplier =
TestTokenSupplier.evenlyDistributedTokens(nodesPerDc, newNodesPerDc, dcCount,
configuration.tokenCount);
instanceConfigUpdater = config ->
configuration.features.forEach(config::with);
}
if (configuration.additionalInstanceConfig != null)
diff --git
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/TestTokenSupplier.java
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/TestTokenSupplier.java
index e068b024..449072cb 100644
---
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/TestTokenSupplier.java
+++
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/TestTokenSupplier.java
@@ -117,14 +117,14 @@ public class TestTokenSupplier
int
numTokensPerNode,
BigInteger
increment)
{
- List<String>[] tokens = new List[(numNodesPerDC + newNodesPerDC) *
numDcs];
+ List<String>[] tokens = new List[(numNodesPerDC + newNodesPerDC) *
numDcs * numTokensPerNode];
Arrays.setAll(tokens, ignored -> new ArrayList<>(numTokensPerNode));
BigInteger value = partitioner.minToken.add(BigInteger.ONE);
- int nodeId = 1;
for (int i = 0; i < numTokensPerNode; ++i)
{
+ int nodeId = 1;
while (nodeId <= (numNodesPerDC * numDcs))
{
value = value.add(increment);
diff --git
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDownInstanceMultipleTokensTest.java
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDownInstanceMultipleTokensTest.java
new file mode 100644
index 00000000..3351d546
--- /dev/null
+++
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDownInstanceMultipleTokensTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import org.apache.cassandra.testing.ClusterBuilderConfiguration;
+
+public class BulkWriteDownInstanceMultipleTokensTest extends
BulkWriteDownInstanceTest
+{
+ @Override
+ protected ClusterBuilderConfiguration testClusterConfiguration()
+ {
+ return super.testClusterConfiguration()
+ .tokenCount(4);
+ }
+}
diff --git
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDownSidecarMultipleTokensTest.java
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDownSidecarMultipleTokensTest.java
new file mode 100644
index 00000000..57e98920
--- /dev/null
+++
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDownSidecarMultipleTokensTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import org.apache.cassandra.testing.ClusterBuilderConfiguration;
+
+public class BulkWriteDownSidecarMultipleTokensTest extends
BulkWriteDownSidecarTest
+{
+ @Override
+ protected ClusterBuilderConfiguration testClusterConfiguration()
+ {
+ return super.testClusterConfiguration()
+ .tokenCount(4);
+ }
+}
diff --git
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/CassandraAnalyticsSimpleMultipleTokensTest.java
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/CassandraAnalyticsSimpleMultipleTokensTest.java
new file mode 100644
index 00000000..b5e9cf78
--- /dev/null
+++
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/CassandraAnalyticsSimpleMultipleTokensTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import org.apache.cassandra.testing.ClusterBuilderConfiguration;
+
+public class CassandraAnalyticsSimpleMultipleTokensTest extends
CassandraAnalyticsSimpleTest
+{
+ @Override
+ protected ClusterBuilderConfiguration testClusterConfiguration()
+ {
+ return super.testClusterConfiguration()
+ .tokenCount(4);
+ }
+}
diff --git
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/testcontainer/BulkWriteS3CompatModeSimpleMultipleTokensTest.java
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/testcontainer/BulkWriteS3CompatModeSimpleMultipleTokensTest.java
new file mode 100644
index 00000000..c75ba488
--- /dev/null
+++
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/testcontainer/BulkWriteS3CompatModeSimpleMultipleTokensTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics.testcontainer;
+
+import org.apache.cassandra.testing.ClusterBuilderConfiguration;
+
+public class BulkWriteS3CompatModeSimpleMultipleTokensTest extends
BulkWriteS3CompatModeSimpleTest
+{
+ @Override
+ protected ClusterBuilderConfiguration testClusterConfiguration()
+ {
+ return super.testClusterConfiguration()
+ .tokenCount(4);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]