Re: [I] Feature Request: Support Vector Type [cassandra-gocql-driver]

2024-10-09 Thread via GitHub


tengu-alt commented on issue #1734:
URL: 
https://github.com/apache/cassandra-gocql-driver/issues/1734#issuecomment-2401579178

   > > > @tengu-alt, did make progress on vector support? I cannot find 
relevant PR opened. Do you mind if I submit my nearly completed PoC? Based on 
your comment from 22 May, I think you are encoding length of vector element 
incorrectly.
   > > 
   > > 
   > > I am currently implementing vector type support. I think the error that 
I mentioned is caused by the cqlsh incorrect work.
   > 
   > It looks like @lukasz-antoniak already has a functioning prototype, it 
might be more efficient to just have him open a PR with his work depending on 
how much progress you have on your work
   
   Sounds great! I would be glad to see the @lukasz-antoniak PR. Currently I 
implemented a vector type unmarshal nearly the all datatypes that are supported 
by the driver (the vector data has a different serialization). The Marshal 
remains, and the test coverage also.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19980: Remove SparkSQL dependency from CassandraBridge so t… [cassandra-analytics]

2024-10-08 Thread via GitHub


yifan-c commented on code in PR #88:
URL: 
https://github.com/apache/cassandra-analytics/pull/88#discussion_r1792282366


##
cassandra-bridge/build.gradle:
##
@@ -40,14 +40,32 @@ publishing {
 
 dependencies {
 api(project(':cassandra-analytics-common'))
-api(project(':cassandra-analytics-spark-converter'))
-compileOnly(group: "${sparkGroupId}", name: 
"spark-core_${scalaMajorVersion}", version: 
"${project.rootProject.sparkVersion}")
-compileOnly(group: "${sparkGroupId}", name: 
"spark-sql_${scalaMajorVersion}", version: 
"${project.rootProject.sparkVersion}")
 
+implementation "org.slf4j:slf4j-api:${slf4jApiVersion}"
+compileOnly "com.esotericsoftware:kryo-shaded:${kryoVersion}"
+compileOnly "com.google.guava:guava:${guavaVersion}"
+compileOnly 
"com.fasterxml.jackson.core:jackson-annotations:${jacksonVersion}"
+compileOnly "com.fasterxml.jackson.core:jackson-core:${jacksonVersion}"
+compileOnly "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"
+
+testImplementation(project(path: ":cassandra-analytics-spark-converter"))
+testImplementation(group: "${sparkGroupId}", name: 
"spark-core_${scalaMajorVersion}", version: 
"${project.rootProject.sparkVersion}")
+testImplementation(group: "${sparkGroupId}", name: 
"spark-sql_${scalaMajorVersion}", version: 
"${project.rootProject.sparkVersion}")
 testImplementation(group: 'com.fasterxml.jackson.core', name: 
'jackson-databind', version: "${jacksonVersion}")
 testImplementation(group: 'com.google.guava', name: 'guava', version: 
'31.1-jre')
 testImplementation(group: 'org.slf4j', name: 'slf4j-simple', version: 
'1.7.26')
 
testImplementation("org.junit.jupiter:junit-jupiter-api:${project.junitVersion}")
 
testImplementation("org.junit.jupiter:junit-jupiter-params:${project.junitVersion}")
 
testImplementation("org.junit.jupiter:junit-jupiter-engine:${project.junitVersion}")
 }
+
+configurations {
+testArtifacts
+}
+task testJar(type: Jar) {
+archiveBaseName = "${project.name}-bridge-test"
+from sourceSets.test.output
+}
+artifacts {
+testArtifacts testJar
+}

Review Comment:
   Using test-fixture instead? 
https://docs.gradle.org/current/userguide/java_testing.html#sec:java_test_fixtures



##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/DataLayer.java:
##
@@ -155,6 +157,11 @@ public CassandraVersion version()
  */
 public abstract CassandraBridge bridge();
 
+public SparkSqlTypeConverter typeConverter()

Review Comment:
   Add java doc



##
cassandra-analytics-core/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java:
##
@@ -75,12 +78,32 @@ public static CassandraBridge get(@NotNull 
CassandraVersionFeatures features)
 return get(getCassandraVersion(features));
 }
 
+@NotNull
+public static SparkSqlTypeConverter getSparkSql(@NotNull 
CassandraVersionFeatures features)
+{
+return getSparkSql(getCassandraVersion(features));
+}
+

Review Comment:
   Please locate the method together with the other `getSparkSql` overloads. 



##
cassandra-analytics-core/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java:
##
@@ -33,12 +33,15 @@
 
 import com.google.common.base.Preconditions;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.Pair;
 
+import org.apache.cassandra.spark.data.converter.SparkSqlTypeConverter;
 import org.jetbrains.annotations.NotNull;
 
 public final class CassandraBridgeFactory
 {
-private static final Map CASSANDRA_BRIDGES = new 
ConcurrentHashMap<>(CassandraVersion.values().length);
+private static final Map> CASSANDRA_BRIDGES =

Review Comment:
   nit: add a comment to indicate what is the key String value represents



##
cassandra-analytics-core/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java:
##
@@ -130,14 +153,20 @@ private static String typesResourceName(@NotNull String 
label)
 return jarResourceName(label, "types");
 }
 
+@NotNull
+private static String sparkSqlResourceName(@NotNull String label)
+{
+return jarResourceName(label, "sparksql");
+}
+
 private static String jarResourceName(String... parts)
 {
 return "/bridges/" + String.join("-", parts) + ".jar";
 }
 
 @NotNull
 @SuppressWarnings("unchecked")
-private static CassandraBridge create(@NotNull String label)
+private static Pair 
create(@NotNull String label)

Review Comment:
   The method signature looks a bit odd. `CassandraBridgeFactory#create` 
returns a pair.. 
   
   How about create a dedicated data class, say `VersionedCassandraBridge`, 
that wraps both `CassandraBridge` and `SparkSqlTypeConverter`?  Then `create` 
and `get` methods return the `VersionedCassandraBridge`. `getSparkSql` may be 
removed. 
   
   ```java
   public class VersionedCassandraBrid

Re: [PR] CASSANDRA-19980: Remove SparkSQL dependency from CassandraBridge so t… [cassandra-analytics]

2024-10-08 Thread via GitHub


jberragan commented on PR #88:
URL: 
https://github.com/apache/cassandra-analytics/pull/88#issuecomment-2400445626

   Latest green CI: 
https://app.circleci.com/pipelines/github/jberragan/cassandra-analytics/118/workflows/1dc1d879-865f-44b3-8eec-7363529bf51f
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [I] Feature Request: Support Vector Type [cassandra-gocql-driver]

2024-10-08 Thread via GitHub


joao-r-reis commented on issue #1734:
URL: 
https://github.com/apache/cassandra-gocql-driver/issues/1734#issuecomment-2400137486

   > > @tengu-alt, did make progress on vector support? I cannot find relevant 
PR opened. Do you mind if I submit my nearly completed PoC? Based on your 
comment from 22 May, I think you are encoding length of vector element 
incorrectly.
   > 
   > I am currently implementing vector type support. I think the error that I 
mentioned is caused by the cqlsh incorrect work.
   
   It looks like @lukasz-antoniak already has a functioning prototype, it might 
be more efficient to just have him open a PR with his work depending on how 
much progress you have on your work


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



[PR] CEP-44 Kafka integration for Cassandra CDC using Sidecar [cassandra-analytics]

2024-09-27 Thread via GitHub


jberragan opened a new pull request, #87:
URL: https://github.com/apache/cassandra-analytics/pull/87

   This is the initial commit for CEP-44 to introduce a standalone CDC module 
into the Analytics project. This module provides the foundation for CDC in the 
Apache Cassandra Sidecar.
   
   This module provides:
   - a standalone Cdc class as the entrypoint for initializing CDC.
   - pluggable interfaces for: listing and reading commit log segments for a 
token range, persisting and reading CDC state, providing the Cassandra table 
schema, optionally reading values from Cassandra.
   - read and deserialize commit log mutations.
   - reconcile and de-duplicate mutations across replicas.
   - serialize CDC state into a binary object for persistence.
   - a layer for converting Cassandra mutations into a standard consumable 
format.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19933: Support aggregated consistency validation for multiple clusters [cassandra-analytics]

2024-09-24 Thread via GitHub


yifan-c merged PR #86:
URL: https://github.com/apache/cassandra-analytics/pull/86


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19933: Support aggregated consistency validation for multiple clusters [cassandra-analytics]

2024-09-24 Thread via GitHub


yifan-c commented on code in PR #86:
URL: 
https://github.com/apache/cassandra-analytics/pull/86#discussion_r1774116231


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/SingleClusterReplicaAwareFailureHandler.java:
##
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter.token;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeMap;
+import com.google.common.collect.TreeRangeMap;
+
+import org.apache.cassandra.spark.bulkwriter.ClusterInfo;
+import org.apache.cassandra.spark.bulkwriter.JobInfo;
+import org.apache.cassandra.spark.common.model.CassandraInstance;
+import org.apache.cassandra.spark.common.model.NodeStatus;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * ReplicaAwareFailureHandler for a single cluster
+ * The handler should be constructed by {@link 
MultiClusterReplicaAwareFailureHandler} only, hence package-private
+ * @param  CassandraInstance type
+ */
+class SingleClusterReplicaAwareFailureHandler 
extends ReplicaAwareFailureHandler
+{
+// failures captures per each range; note that failures do not necessarily 
fail a range, as long as consistency level is considered
+@GuardedBy("this")
+private final RangeMap rangeFailuresMap = 
TreeRangeMap.create();
+
+@GuardedBy("this")
+private boolean isEmpty = true;
+
+@Nullable
+private String clusterId;
+
+SingleClusterReplicaAwareFailureHandler(Partitioner partitioner, String 
clusterId)
+{
+this.clusterId = clusterId;
+rangeFailuresMap.put(Range.openClosed(partitioner.minToken(), 
partitioner.maxToken()), new FailuresPerInstance());
+}
+
+/**
+ * Check whether the handler contains any failure
+ * @return true if there is at least a failure; false otherwise.
+ */
+public boolean isEmpty()
+{
+return isEmpty;
+}
+
+@Override
+public List.ConsistencyFailurePerRange>
+getFailedRanges(TokenRangeMapping tokenRangeMapping, JobInfo job, 
ClusterInfo cluster)
+{
+return getFailedRangesInternal(tokenRangeMapping, 
job.getConsistencyLevel(), job.getLocalDC(), cluster.replicationFactor());
+}
+
+@Override
+public synchronized void addFailure(Range tokenRange, I 
instance, String errMessage)
+{
+RangeMap overlappingFailures = 
rangeFailuresMap.subRangeMap(tokenRange);
+RangeMap mappingsToAdd = 
TreeRangeMap.create();
+
+for (Map.Entry, FailuresPerInstance> entry : 
overlappingFailures.asMapOfRanges().entrySet())
+{
+FailuresPerInstance newErrorMap = entry.getValue().copy();
+newErrorMap.addErrorForInstance(instance, errMessage);
+mappingsToAdd.put(entry.getKey(), newErrorMap);
+}
+rangeFailuresMap.putAll(mappingsToAdd);
+isEmpty = false;
+}
+
+@Override
+public synchronized Set getFailedInstances()
+{
+if (isEmpty)
+{
+return Collections.emptySet();
+}
+
+return rangeFailuresMap.asMapOfRanges()
+   .values()
+   .stream()
+   .map(FailuresPerInstance::instances)
+   .flatMap(Collection::stream)
+   .collect(Collectors.toSet());
+}
+
+@Override
+protected synchronized 
List.ConsistencyFailurePerRange>
+getFailedRangesInternal(TokenRangeMapping tokenRangeMapping,
+ConsistencyLevel cl,
+@Nullable String localDC,
+ReplicationFactor replicationFactor)
+{
+Preconditions.checkArgument((

Re: [PR] CASSANDRA-19933: Support aggregated consistency validation for multiple clusters [cassandra-analytics]

2024-09-24 Thread via GitHub


yifan-c commented on code in PR #86:
URL: 
https://github.com/apache/cassandra-analytics/pull/86#discussion_r1774114388


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/SingleClusterReplicaAwareFailureHandler.java:
##
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter.token;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeMap;
+import com.google.common.collect.TreeRangeMap;
+
+import org.apache.cassandra.spark.bulkwriter.ClusterInfo;
+import org.apache.cassandra.spark.bulkwriter.JobInfo;
+import org.apache.cassandra.spark.common.model.CassandraInstance;
+import org.apache.cassandra.spark.common.model.NodeStatus;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * ReplicaAwareFailureHandler for a single cluster
+ * The handler should be constructed by {@link 
MultiClusterReplicaAwareFailureHandler} only, hence package-private
+ * @param  CassandraInstance type
+ */
+class SingleClusterReplicaAwareFailureHandler 
extends ReplicaAwareFailureHandler
+{
+// failures captures per each range; note that failures do not necessarily 
fail a range, as long as consistency level is considered
+@GuardedBy("this")
+private final RangeMap rangeFailuresMap = 
TreeRangeMap.create();
+
+@GuardedBy("this")
+private boolean isEmpty = true;
+
+@Nullable
+private String clusterId;
+
+SingleClusterReplicaAwareFailureHandler(Partitioner partitioner, String 
clusterId)
+{
+this.clusterId = clusterId;
+rangeFailuresMap.put(Range.openClosed(partitioner.minToken(), 
partitioner.maxToken()), new FailuresPerInstance());
+}
+
+/**
+ * Check whether the handler contains any failure
+ * @return true if there is at least a failure; false otherwise.
+ */
+public boolean isEmpty()
+{
+return isEmpty;
+}
+
+@Override
+public List.ConsistencyFailurePerRange>
+getFailedRanges(TokenRangeMapping tokenRangeMapping, JobInfo job, 
ClusterInfo cluster)
+{
+return getFailedRangesInternal(tokenRangeMapping, 
job.getConsistencyLevel(), job.getLocalDC(), cluster.replicationFactor());
+}
+
+@Override
+public synchronized void addFailure(Range tokenRange, I 
instance, String errMessage)
+{
+RangeMap overlappingFailures = 
rangeFailuresMap.subRangeMap(tokenRange);
+RangeMap mappingsToAdd = 
TreeRangeMap.create();
+
+for (Map.Entry, FailuresPerInstance> entry : 
overlappingFailures.asMapOfRanges().entrySet())
+{
+FailuresPerInstance newErrorMap = entry.getValue().copy();
+newErrorMap.addErrorForInstance(instance, errMessage);
+mappingsToAdd.put(entry.getKey(), newErrorMap);
+}
+rangeFailuresMap.putAll(mappingsToAdd);
+isEmpty = false;
+}
+
+@Override
+public synchronized Set getFailedInstances()
+{
+if (isEmpty)
+{
+return Collections.emptySet();
+}
+
+return rangeFailuresMap.asMapOfRanges()
+   .values()
+   .stream()
+   .map(FailuresPerInstance::instances)
+   .flatMap(Collection::stream)
+   .collect(Collectors.toSet());
+}
+
+@Override
+protected synchronized 
List.ConsistencyFailurePerRange>
+getFailedRangesInternal(TokenRangeMapping tokenRangeMapping,
+ConsistencyLevel cl,
+@Nullable String localDC,
+ReplicationFactor replicationFactor)
+{
+Preconditions.checkArgument((

Re: [PR] CASSANDRA-19933: Support aggregated consistency validation for multiple clusters [cassandra-analytics]

2024-09-24 Thread via GitHub


frankgh commented on code in PR #86:
URL: 
https://github.com/apache/cassandra-analytics/pull/86#discussion_r1774074153


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/SingleClusterReplicaAwareFailureHandler.java:
##
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter.token;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeMap;
+import com.google.common.collect.TreeRangeMap;
+
+import org.apache.cassandra.spark.bulkwriter.ClusterInfo;
+import org.apache.cassandra.spark.bulkwriter.JobInfo;
+import org.apache.cassandra.spark.common.model.CassandraInstance;
+import org.apache.cassandra.spark.common.model.NodeStatus;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * ReplicaAwareFailureHandler for a single cluster
+ * The handler should be constructed by {@link 
MultiClusterReplicaAwareFailureHandler} only, hence package-private
+ * @param  CassandraInstance type
+ */
+class SingleClusterReplicaAwareFailureHandler 
extends ReplicaAwareFailureHandler
+{
+// failures captures per each range; note that failures do not necessarily 
fail a range, as long as consistency level is considered
+@GuardedBy("this")
+private final RangeMap rangeFailuresMap = 
TreeRangeMap.create();
+
+@GuardedBy("this")
+private boolean isEmpty = true;
+
+@Nullable
+private String clusterId;
+
+SingleClusterReplicaAwareFailureHandler(Partitioner partitioner, String 
clusterId)
+{
+this.clusterId = clusterId;
+rangeFailuresMap.put(Range.openClosed(partitioner.minToken(), 
partitioner.maxToken()), new FailuresPerInstance());
+}
+
+/**
+ * Check whether the handler contains any failure
+ * @return true if there is at least a failure; false otherwise.
+ */
+public boolean isEmpty()
+{
+return isEmpty;
+}
+
+@Override
+public List.ConsistencyFailurePerRange>
+getFailedRanges(TokenRangeMapping tokenRangeMapping, JobInfo job, 
ClusterInfo cluster)
+{
+return getFailedRangesInternal(tokenRangeMapping, 
job.getConsistencyLevel(), job.getLocalDC(), cluster.replicationFactor());
+}
+
+@Override
+public synchronized void addFailure(Range tokenRange, I 
instance, String errMessage)
+{
+RangeMap overlappingFailures = 
rangeFailuresMap.subRangeMap(tokenRange);
+RangeMap mappingsToAdd = 
TreeRangeMap.create();
+
+for (Map.Entry, FailuresPerInstance> entry : 
overlappingFailures.asMapOfRanges().entrySet())
+{
+FailuresPerInstance newErrorMap = entry.getValue().copy();
+newErrorMap.addErrorForInstance(instance, errMessage);
+mappingsToAdd.put(entry.getKey(), newErrorMap);
+}
+rangeFailuresMap.putAll(mappingsToAdd);
+isEmpty = false;
+}
+
+@Override
+public synchronized Set getFailedInstances()
+{
+if (isEmpty)
+{
+return Collections.emptySet();
+}
+
+return rangeFailuresMap.asMapOfRanges()
+   .values()
+   .stream()
+   .map(FailuresPerInstance::instances)
+   .flatMap(Collection::stream)
+   .collect(Collectors.toSet());
+}
+
+@Override
+protected synchronized 
List.ConsistencyFailurePerRange>
+getFailedRangesInternal(TokenRangeMapping tokenRangeMapping,
+ConsistencyLevel cl,
+@Nullable String localDC,
+ReplicationFactor replicationFactor)
+{
+Preconditions.checkArgument((

Re: [PR] CASSANDRA-19933: Support aggregated consistency validation for multiple clusters [cassandra-analytics]

2024-09-24 Thread via GitHub


yifan-c commented on code in PR #86:
URL: 
https://github.com/apache/cassandra-analytics/pull/86#discussion_r1773769751


##
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java:
##
@@ -220,30 +161,7 @@ public void multiMapWorksWithRingInstances()
 assertEquals(1, newErrorMap.keySet().size());
 }
 
-@Test
-public void testMultipleFailuresSingleInstanceSucceedRF3()

Review Comment:
   the test is moved into `SingleClusterReplicaAwareFailureHandlerTest`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [I] Donate GoCQL to Apache Cassandra [YOUR ACTION REQUIRED] [cassandra-gocql-driver]

2024-09-23 Thread via GitHub


bcrusu commented on issue #1751:
URL: 
https://github.com/apache/cassandra-gocql-driver/issues/1751#issuecomment-2370352287

   My contribution was minimal, but I'm happy to donate it. Apologies for the 
late reply!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] [CASSANDRA-19625] Initial Configuration for SonarCube Analysis [cassandra-analytics]

2024-09-20 Thread via GitHub


5 closed pull request #57: [CASSANDRA-19625] Initial Configuration for 
SonarCube Analysis
URL: https://github.com/apache/cassandra-analytics/pull/57


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [I] marshalBigInt return 8 bytes slice in all cases except for big.Int, which returns a variable length slice [cassandra-gocql-driver]

2024-09-20 Thread via GitHub


OleksiienkoMykyta commented on issue #1740:
URL: 
https://github.com/apache/cassandra-gocql-driver/issues/1740#issuecomment-2363705613

   Hello, Felipe!
   This issue is caused by different bigint types of implementations in 
Cassandra and in the driver. Cassandra has a limitation in eight-byte for 
bigint, however, in gocql it's implemented as an arbitrary-precision type.
   It's already fixed, thank you for issuing it.
   Have a great day! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



[PR] CASSANDRA-19933: Support aggregated consistency validation for multiple clusters [cassandra-analytics]

2024-09-18 Thread via GitHub


yifan-c opened a new pull request, #86:
URL: https://github.com/apache/cassandra-analytics/pull/86

   Patch by Yifan Cai; Reviewed by TBD for CASSANDRA-19933


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19923: Add transport extension for coordinated write [cassandra-analytics]

2024-09-18 Thread via GitHub


yifan-c merged PR #83:
URL: https://github.com/apache/cassandra-analytics/pull/83


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19923: Add transport extension for coordinated write [cassandra-analytics]

2024-09-18 Thread via GitHub


frankgh commented on code in PR #83:
URL: 
https://github.com/apache/cassandra-analytics/pull/83#discussion_r1765416450


##
scripts/build-sidecar.sh:
##
@@ -24,7 +24,7 @@ else
   SCRIPT_DIR=$( dirname -- "$( readlink -f -- "$0"; )"; )
   
SIDECAR_REPO="${SIDECAR_REPO:-https://github.com/apache/cassandra-sidecar.git}";
   SIDECAR_BRANCH="${SIDECAR_BRANCH:-trunk}"
-  SIDECAR_COMMIT="${SIDECAR_COMMIT:-55a9efee30555d3645680c6524043a6c9bc1194b}"
+  SIDECAR_COMMIT="${SIDECAR_COMMIT:-f07e248d0ce8303a06daf93b462190ef7be7304d}"

Review Comment:
   any reason not to move to the current head of Sidecar? 
`7f8db256377ff4e449a83282b2561ce5e7b74adf`



##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/StorageTransportHandler.java:
##
@@ -89,6 +106,20 @@ private void updateCredentials(UUID jobId, 
StorageCredentialPair credentialPair)
 }
 }
 
+private void sendCoordinationSignal(UUID jobId, RestoreJobStatus status)
+{
+UpdateRestoreJobRequestPayload requestPayload = new 
UpdateRestoreJobRequestPayload(null, null, status, null);
+try
+{
+// TODO: send to all clusters\
+
transportContext.dataTransferApi().updateRestoreJob(requestPayload);
+}
+catch (ClientException e)
+{
+throw new RuntimeException("Failed to update secretes for restore 
job. restoreJobId: " + jobId, e);

Review Comment:
   wrong exception verbiage here?



##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/CoordinatedTransportExtension.java:
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.transports.storage.extensions;
+
+import org.apache.cassandra.spark.bulkwriter.CassandraBulkSourceRelation;
+
+/**
+ * Extension methods that enables coordinated write to multiple target clusters
+ * Package-private interface only to be extended by {@link 
StorageTransportExtension}
+ * 
+ * Note that the methods defined in this extension run in Spark Driver only
+ * 
+ * The coordinated write has 2 phases, i.e. staging phase and importing phase. 
In the happy path, the steps of a run are the following:
+ * 
+ * Extension sets the {@link CoordinationSignalListener} on 
initialization.
+ * Extension invokes {@link 
CoordinationSignalListener#onStageReady(String)},
+ * when it decides it is time to stage SSTables on all clusters.
+ * Cassandra Analytics calls Sidecars to stage data.
+ * {@link #onStageSucceeded(String, long, long, long)} is called per 
cluster to notify the extension.
+ * Extension invokes {@link 
CoordinationSignalListener#onApplyReady(String)},
+ * when it decides it is time to apply/import SSTables on all 
clusters.
+ * ECassandra Analytics calls Sidecars to import data.

Review Comment:
   ```suggestion
* Cassandra Analytics calls Sidecars to import data.
   ```



##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/CoordinationSignalListener.java:
##
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.transports.storage.extensions;
+
+/**
+ * A listener interface that receives coordination signals.
+ * It works in cooperation with {@link CoordinatedTransportExtension} to

Re: [PR] CASSANDRA-19923: Add transport extension for coordinated write [cassandra-analytics]

2024-09-18 Thread via GitHub


yifan-c commented on code in PR #83:
URL: 
https://github.com/apache/cassandra-analytics/pull/83#discussion_r1765431422


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/CoordinationSignalListener.java:
##
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.transports.storage.extensions;
+
+/**
+ * A listener interface that receives coordination signals.
+ * It works in cooperation with {@link CoordinatedTransportExtension} to 
enable coordinated write

Review Comment:
   the feature name is "coordinated write"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19923: Add transport extension for coordinated write [cassandra-analytics]

2024-09-18 Thread via GitHub


yifan-c commented on code in PR #83:
URL: 
https://github.com/apache/cassandra-analytics/pull/83#discussion_r1765428720


##
scripts/build-sidecar.sh:
##
@@ -24,7 +24,7 @@ else
   SCRIPT_DIR=$( dirname -- "$( readlink -f -- "$0"; )"; )
   
SIDECAR_REPO="${SIDECAR_REPO:-https://github.com/apache/cassandra-sidecar.git}";
   SIDECAR_BRANCH="${SIDECAR_BRANCH:-trunk}"
-  SIDECAR_COMMIT="${SIDECAR_COMMIT:-55a9efee30555d3645680c6524043a6c9bc1194b}"
+  SIDECAR_COMMIT="${SIDECAR_COMMIT:-f07e248d0ce8303a06daf93b462190ef7be7304d}"

Review Comment:
   Thanks. I was not aware that there is a new commit. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] Ninja fix for CASSANDRA-19815 to publish jar for cassandra-analytics-… [cassandra-analytics]

2024-09-18 Thread via GitHub


yifan-c closed pull request #85: Ninja fix for CASSANDRA-19815 to publish jar 
for cassandra-analytics-…
URL: https://github.com/apache/cassandra-analytics/pull/85


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



[PR] Ninja fix for CASSANDRA-19815 to publish jar for cassandra-analytics-… [cassandra-analytics]

2024-09-18 Thread via GitHub


jberragan opened a new pull request, #85:
URL: https://github.com/apache/cassandra-analytics/pull/85

   …spark-converter


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]

2024-09-18 Thread via GitHub


yifan-c merged PR #80:
URL: https://github.com/apache/cassandra-analytics/pull/80


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]

2024-09-18 Thread via GitHub


JeetKunDoug commented on code in PR #80:
URL: 
https://github.com/apache/cassandra-analytics/pull/80#discussion_r1765194312


##
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfoTest.java:
##
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter;
+
+import java.math.BigInteger;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Range;
+import org.junit.jupiter.api.Test;
+
+import o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse;
+import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
+import org.apache.cassandra.spark.exception.TimeSkewTooLargeException;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class CassandraClusterInfoTest
+{
+@Test
+void testTimeSkewAcceptable()
+{
+Instant localNow = Instant.now();
+int allowanceMinutes = 10;
+Instant remoteNow = localNow.plus(Duration.ofMinutes(1));
+CassandraClusterInfo ci = 
mockClusterInfoForTimeSkewTest(allowanceMinutes, remoteNow);
+
ci.validateTimeSkewWithLocalNow(Range.openClosed(BigInteger.valueOf(10), 
BigInteger.valueOf(20)), localNow);
+}
+
+@Test
+void testTimeSkewTooLarge()
+{
+Instant localNow = Instant.now();
+int allowanceMinutes = 10;
+Instant remoteNow = localNow.plus(Duration.ofMinutes(11)); // 11 > 
allowanceMinutes
+CassandraClusterInfo ci = 
mockClusterInfoForTimeSkewTest(allowanceMinutes, remoteNow);
+assertThatThrownBy(() -> 
ci.validateTimeSkewWithLocalNow(Range.openClosed(BigInteger.valueOf(10), 
BigInteger.valueOf(20)), localNow))
+.isExactlyInstanceOf(TimeSkewTooLargeException.class);
+}

Review Comment:
   NIT: I'm not crazy about tests that have no asserts, even if it's really 
just "this doesn't throw" - a suggestion to make the test just a bit more 
explicit about what it's testing:
   
   ```suggestion

   public static final Range RANGE = 
Range.openClosed(BigInteger.valueOf(10), BigInteger.valueOf(20));
   
   @Test
   void testTimeSkewAcceptable()
   {
   Instant localNow = Instant.now();
   int allowanceMinutes = 10;
   Instant remoteNow = localNow.plus(Duration.ofMinutes(1));
   CassandraClusterInfo ci = 
mockClusterInfoForTimeSkewTest(allowanceMinutes, remoteNow);
   assertThatNoException().describedAs("Acceptable time skew should 
validate without exception")
  .isThrownBy(() -> 
ci.validateTimeSkewWithLocalNow(RANGE, localNow));
   }
   
   @Test
   void testTimeSkewTooLarge()
   {
   Instant localNow = Instant.now();
   int allowanceMinutes = 10;
   Instant remoteNow = localNow.plus(Duration.ofMinutes(11)); // 11 > 
allowanceMinutes
   CassandraClusterInfo ci = 
mockClusterInfoForTimeSkewTest(allowanceMinutes, remoteNow);
   assertThatException().describedAs("Time skew with too large a value 
should throw TimeSkewTooLargeException")
.isThrownBy(() -> 
ci.validateTimeSkewWithLocalNow(RANGE, localNow))

.isExactlyInstanceOf(TimeSkewTooLargeException.class);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19927: Remove old compression cache and move to using cache of Compressio… [cassandra-analytics]

2024-09-17 Thread via GitHub


frankgh merged PR #84:
URL: https://github.com/apache/cassandra-analytics/pull/84


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19927: Remove old compression cache and move to using cache of Compressio… [cassandra-analytics]

2024-09-17 Thread via GitHub


jberragan commented on PR #84:
URL: 
https://github.com/apache/cassandra-analytics/pull/84#issuecomment-2356869431

   Green CI: 
https://app.circleci.com/pipelines/github/jberragan/cassandra-analytics/107/workflows/9cb134d8-67c2-404b-b4f8-88bacbef36dc


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [I] Trying to get cassandra active, idle, inUse connection [cassandra-gocql-driver]

2024-09-17 Thread via GitHub


md-robiul-hassan-kr commented on issue #1774:
URL: 
https://github.com/apache/cassandra-gocql-driver/issues/1774#issuecomment-2356860789

   hi @testisnullus, your assumption is correct. I want to be able to know how 
many active, in-use, or idle connect I have to the database at a given time, so 
that I can use those metrics. 
   
   A good example is 
[here](https://docs.datastax.com/en/developer/java-driver/4.9/manual/core/metrics/index.html)
 where those metrics are available through JVM, and I was wondering if there 
could be possibility to use from the gocql lib. 
   
   ```
   datastax-java-driver.advanced.metrics {
 session.enabled = [ connected-nodes, cql-requests ]
 node.enabled = [ pool.open-connections, pool.in-flight ]
   }
   ```
   
   I would like to get the information for the connection pool mostly, so that 
can be used in grafana with prometheus. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19927: Remove old compression cache and move to using cache of Compressio… [cassandra-analytics]

2024-09-17 Thread via GitHub


yifan-c commented on code in PR #84:
URL: 
https://github.com/apache/cassandra-analytics/pull/84#discussion_r1763758614


##
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java:
##
@@ -59,15 +64,32 @@ public class SSTableCache

   propOrDefault("sbr.cache.stats.expireAfterMins", 60));
 private final Cache filter = 
buildCache(propOrDefault("sbr.cache.filter.maxEntries", 16384),

   propOrDefault("sbr.cache.filter.expireAfterMins", 60));
+private final Cache>   
compressionMetadata = 
buildCache(propOrDefault("sbr.cache.compressionInfo.maxEntries", 128),

Review Comment:
   Now looking at the GitHub comment, I realized that it can be confusing. I 
meant to ask for adding java code comment to explain the cache value. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19927: Remove old compression cache and move to using cache of Compressio… [cassandra-analytics]

2024-09-17 Thread via GitHub


yifan-c commented on code in PR #84:
URL: 
https://github.com/apache/cassandra-analytics/pull/84#discussion_r1763750927


##
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java:
##
@@ -20,10 +20,13 @@
 package org.apache.cassandra.spark.reader;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 
 import com.google.common.cache.Cache;

Review Comment:
   As a potential follow-up, can we migrate to Caffeine for caching?



##
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java:
##
@@ -59,15 +64,32 @@ public class SSTableCache

   propOrDefault("sbr.cache.stats.expireAfterMins", 60));
 private final Cache filter = 
buildCache(propOrDefault("sbr.cache.filter.maxEntries", 16384),

   propOrDefault("sbr.cache.filter.expireAfterMins", 60));
+private final Cache>   
compressionMetadata = 
buildCache(propOrDefault("sbr.cache.compressionInfo.maxEntries", 128),

Review Comment:
   Can you comment why the cache value is `Optional`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]

2024-09-17 Thread via GitHub


yifan-c commented on code in PR #80:
URL: 
https://github.com/apache/cassandra-analytics/pull/80#discussion_r1763681330


##
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CassandraClusterInfoGroupTest.java:
##
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter.coordinatedwrite;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.jupiter.api.Test;
+
+import o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse;
+import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse;
+import org.apache.cassandra.spark.bulkwriter.CassandraClusterInfo;
+import org.apache.cassandra.spark.bulkwriter.ClusterInfo;
+import org.apache.cassandra.spark.bulkwriter.RingInstance;
+import org.apache.cassandra.spark.bulkwriter.TokenRangeMappingUtils;
+import org.apache.cassandra.spark.bulkwriter.WriteAvailability;
+import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class CassandraClusterInfoGroupTest
+{
+@Test
+void testCreateGroupFailWithEmptyList()
+{
+assertThatThrownBy(() -> new CassandraClusterInfoGroup(null))
+.isExactlyInstanceOf(IllegalArgumentException.class)
+.hasMessage("clusterInfos cannot be null or empty");
+
+assertThatThrownBy(() -> new 
CassandraClusterInfoGroup(Collections.emptyList()))
+.isExactlyInstanceOf(IllegalArgumentException.class)
+.hasMessage("clusterInfos cannot be null or empty");
+}
+
+@Test
+void testLookupCluster()
+{
+CassandraClusterInfoGroup group = mockClusterGroup(2, index -> 
mockClusterInfo("cluster" + index));
+List clusters = group.clusters();
+assertThat(clusters).hasSize(2);
+assertThat(group.cluster("cluster0")).isSameAs(clusters.get(0));
+assertThat(group.cluster("cluster1")).isSameAs(clusters.get(1));
+assertThat(group.cluster("cluster2")).isNull();
+}
+
+@Test
+void testClusterId()
+{
+CassandraClusterInfoGroup group = mockClusterGroup(2, index -> 
mockClusterInfo("cluster" + index));
+assertThat(group.clusterId()).isEqualTo("ClusterInfoGroup: [cluster0, 
cluster1]");
+
+group = mockClusterGroup(1, index -> mockClusterInfo("cluster" + 
index));
+assertThat(group.clusterId()).isEqualTo("ClusterInfoGroup: 
[cluster0]");
+}
+
+@Test
+void testDelegationOfSingleCluster()
+{
+CassandraClusterInfo clusterInfo = mockClusterInfo("cluster0");
+TokenRangeReplicasResponse response = 
TokenRangeMappingUtils.mockSimpleTokenRangeReplicasResponse(10, 3);
+TokenRangeMapping expectedTokenRangeMapping = 
TokenRangeMapping.create(() -> response,
+   
  () -> Partitioner.Murmur3Partitioner,
+   
  RingInstance::new);
+
when(clusterInfo.getTokenRangeMapping(anyBoolean())).thenReturn(expectedTokenRangeMapping);
+
when(clusterInfo.timeSkew(any())).thenReturn(mock(TimeSkewResponse.class));
+
when(clusterInfo.getLowestCassandraVersion()).thenReturn("lowestCassandraVersion");
+
when(clusterInfo.clusterWriteAvailability()).thenReturn(Collections.emptyMap());
+CassandraClusterInfoGroup group = mockClusterGroup(1, index -> 
clusterInfo);
+// Since there is a single clusterInfo in the group. It behaves as a 

Re: [PR] CASSANDRA-19927: Deprecate old compression cache and move to using cache of Compressio… [cassandra-analytics]

2024-09-17 Thread via GitHub


yifan-c commented on code in PR #84:
URL: 
https://github.com/apache/cassandra-analytics/pull/84#discussion_r1763634919


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java:
##
@@ -52,11 +52,12 @@
 public class SidecarProvisionedSSTable extends SSTable
 {
 private static final long serialVersionUID = 6452703925812602832L;
+@Deprecated // use SSTableCache
 @VisibleForTesting
 public static final Cache COMPRESSION_CACHE = 
CacheBuilder.newBuilder()
-   
.expireAfterAccess(1, TimeUnit.HOURS)
-   
.maximumSize(2048)
-   
.build();
+  
.expireAfterAccess(1, TimeUnit.HOURS)
+  
.maximumSize(2048)
+  
.build();

Review Comment:
   Can we just remove it? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19927: Deprecate old compression cache and move to using cache of Compressio… [cassandra-analytics]

2024-09-17 Thread via GitHub


yifan-c commented on code in PR #84:
URL: 
https://github.com/apache/cassandra-analytics/pull/84#discussion_r1763624222


##
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java:
##
@@ -108,6 +111,27 @@ public BloomFilter bloomFilter(@NotNull SSTable ssTable, 
Descriptor descriptor)
 return get(filter, ssTable, () -> ReaderUtils.readFilter(ssTable, 
descriptor.version.hasOldBfFormat()));
 }
 
+public CompressionMetadata compressionMetaData(@NotNull SSTable ssTable, 
boolean hasMaxCompressedLength) throws IOException
+{
+if 
(!"true".equalsIgnoreCase(System.getProperty("sbr.cache.compression.enabled", 
"true")))

Review Comment:
   How about naming the property `sbr.cache.compressionInfo.enabled`? matching 
the name of the component (compressionInfo)
   
   I think there is utils to read from map but not properties yet. We should 
refactor to unify the SBW and SBR code further. It is a future action item. 
Just mention it here.



##
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/Properties.java:
##
@@ -40,7 +40,9 @@ public final class Properties
 public static final int DEFAULT_MAX_RETRIES = 10;
 public static final long DEFAULT_MILLIS_TO_SLEEP = 500;
 public static final int DEFAULT_MAX_POOL_SIZE = 64;
-public static final boolean DEFAULT_CACHE_COMPRESSION_METADATA = true;
+@Deprecated
+public static final boolean DEFAULT_CACHE_COMPRESSION_METADATA = false; // 
use org.apache.cassandra.spark.reader.SSTable cache
+@Deprecated
 public static final long DEFAULT_MAX_SIZE_CACHE_COMPRESSION_METADATA_BYTES 
= 8 * MEBI_BYTES; // 8MiB

Review Comment:
   Let's simply remove the fields that are newly introduced in the recent patch 
of CASSANDRA-19900



##
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java:
##
@@ -628,22 +627,19 @@ public class SSTableStreamReader implements 
ISSTableScanner
 SSTableStreamReader() throws IOException
 {
 lastToken = sparkRangeFilter != null ? 
sparkRangeFilter.tokenRange().upperEndpoint() : null;
-try (@Nullable InputStream compressionInfoInputStream = 
ssTable.openCompressionStream())
-{
-DataInputStream dataInputStream = new 
DataInputStream(ssTable.openDataStream());
+@Nullable CompressionMetadata compressionMetadata = 
SSTableCache.INSTANCE.compressionMetaData(ssTable, 
version.hasMaxCompressedLength());

Review Comment:
   nit: `metadata` is often used as a single word



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]

2024-09-17 Thread via GitHub


yifan-c commented on code in PR #80:
URL: 
https://github.com/apache/cassandra-analytics/pull/80#discussion_r1763626933


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CassandraClusterInfoGroup.java:
##
@@ -161,40 +160,22 @@ public void checkBulkWriterIsEnabledOrThrow()
 }
 
 /**
- * @return the largest time skew retrieved from the target replicas
+ * @return the largest time skew retrieved from the target clusters
  */
 @Override
-public TimeSkewResponse getTimeSkew(List instances)
+public TimeSkewResponse timeSkew(Range range)
 {
 if (clusterInfos.size() == 1)
 {
-return clusterInfos.get(0).getTimeSkew(instances);
+return clusterInfos.get(0).timeSkew(range);
 }
 
-Map> instancesByClusterId = 
instances.stream().collect(Collectors.groupingBy(instance -> {
-String clusterId = instance.clusterId();
-Preconditions.checkState(clusterId != null,
- "RingInstance must define its clusterId 
for coordinated write");
-return clusterId;
-}));
-long localNow = System.currentTimeMillis();
-long maxDiff = 0;
-TimeSkewResponse largestSkew = null;
-for (Map.Entry> entry : 
instancesByClusterId.entrySet())
-{
-String clusterId = entry.getKey();
-List instancesOfCluster = entry.getValue();
-ClusterInfo clusterInfo = cluster(clusterId);
-Preconditions.checkState(clusterInfo != null, "ClusterInfo not 
found with clusterId: " + clusterId);
-TimeSkewResponse response = 
clusterInfo.getTimeSkew(instancesOfCluster);
-long d = Math.abs(response.currentTime - localNow);
-if (Math.abs(response.currentTime - localNow) > maxDiff)
-{
-maxDiff = d;
-largestSkew = response;
-}
-}
-return largestSkew;
+return clusterInfos.stream()
+   .map(clusterInfo -> clusterInfo.timeSkew(range))
+// Find the timeSkew with the lowest remote 
currentTime, i.e. largest difference with the local current time.

Review Comment:
   Talked offline. We are going to refactor the method to do the validation, 
instead of exposing a response object. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]

2024-09-17 Thread via GitHub


JeetKunDoug commented on code in PR #80:
URL: 
https://github.com/apache/cassandra-analytics/pull/80#discussion_r1763592085


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CassandraClusterInfoGroup.java:
##
@@ -161,40 +160,22 @@ public void checkBulkWriterIsEnabledOrThrow()
 }
 
 /**
- * @return the largest time skew retrieved from the target replicas
+ * @return the largest time skew retrieved from the target clusters
  */
 @Override
-public TimeSkewResponse getTimeSkew(List instances)
+public TimeSkewResponse timeSkew(Range range)
 {
 if (clusterInfos.size() == 1)
 {
-return clusterInfos.get(0).getTimeSkew(instances);
+return clusterInfos.get(0).timeSkew(range);
 }
 
-Map> instancesByClusterId = 
instances.stream().collect(Collectors.groupingBy(instance -> {
-String clusterId = instance.clusterId();
-Preconditions.checkState(clusterId != null,
- "RingInstance must define its clusterId 
for coordinated write");
-return clusterId;
-}));
-long localNow = System.currentTimeMillis();
-long maxDiff = 0;
-TimeSkewResponse largestSkew = null;
-for (Map.Entry> entry : 
instancesByClusterId.entrySet())
-{
-String clusterId = entry.getKey();
-List instancesOfCluster = entry.getValue();
-ClusterInfo clusterInfo = cluster(clusterId);
-Preconditions.checkState(clusterInfo != null, "ClusterInfo not 
found with clusterId: " + clusterId);
-TimeSkewResponse response = 
clusterInfo.getTimeSkew(instancesOfCluster);
-long d = Math.abs(response.currentTime - localNow);
-if (Math.abs(response.currentTime - localNow) > maxDiff)
-{
-maxDiff = d;
-largestSkew = response;
-}
-}
-return largestSkew;
+return clusterInfos.stream()
+   .map(clusterInfo -> clusterInfo.timeSkew(range))
+// Find the timeSkew with the lowest remote 
currentTime, i.e. largest difference with the local current time.

Review Comment:
   This isn't really right, as it would be possible for the current time on a 
remote host to be much _greater_ than the current time on the host running the 
task. We really need to do the whole "take the absolute value of the difference 
between the times" thing, not just look for the min currentTime.
   
   I realize also that for multi-cluster scenarios, the other field in the 
TimeSkewResponse (`allowableSkewInMinutes`) can, in fact, be different 
(although in theory I'd doubt that it would be). We should really grab 2 things:
   1) The time skew response with the maximum _difference_ between localNow and 
the remote time, where localNow is calculated each time you get a response back 
(per my previous suggestion)
   2) The _minimum_ `allowableSkewInMinutes` value
   
   And then craft the "combined" time skew response from those two things.



##
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CassandraClusterInfoGroupTest.java:
##
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter.coordinatedwrite;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.jupiter.api.Test;
+
+import o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse;
+import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse;
+import org.apache.cassandra.spark.bulkwriter.CassandraClusterInfo;
+import org.apache.cassandra.spark.bulkwriter.ClusterInfo;
+import org.apache.cassandra.spark.bulkwriter.RingInstance;
+import org.apache.cassandra.spark.bulkwriter.TokenRangeMappingUtils;
+import org.apache.cassandra.spark.bulkwriter.WriteAv

[PR] Deprecate old compression cache and move to using cache of Compressio… [cassandra-analytics]

2024-09-17 Thread via GitHub


jberragan opened a new pull request, #84:
URL: https://github.com/apache/cassandra-analytics/pull/84

   …nMetadata, so that:
   
   - we no longer cache an entire byte array on heap
   - we cache and re-use the CompressionMetadata object so that only one 
BigLongArray object is allocated for the chunk offsets


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] Ninja fix for CASSANDRA-19815 [cassandra-analytics]

2024-09-16 Thread via GitHub


jberragan commented on PR #81:
URL: 
https://github.com/apache/cassandra-analytics/pull/81#issuecomment-2353411319

   Merged in.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] Ninja fix for CASSANDRA-19815 [cassandra-analytics]

2024-09-16 Thread via GitHub


jberragan closed pull request #81: Ninja fix for CASSANDRA-19815
URL: https://github.com/apache/cassandra-analytics/pull/81


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19923: Add transport extension for coordinated write [cassandra-analytics]

2024-09-13 Thread via GitHub


yifan-c commented on PR #83:
URL: 
https://github.com/apache/cassandra-analytics/pull/83#issuecomment-2350465869

   After #80 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



[PR] CASSANDRA-19923: Add transport extension for coordinated write [cassandra-analytics]

2024-09-13 Thread via GitHub


yifan-c opened a new pull request, #83:
URL: https://github.com/apache/cassandra-analytics/pull/83

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]

2024-09-13 Thread via GitHub


yifan-c commented on code in PR #80:
URL: 
https://github.com/apache/cassandra-analytics/pull/80#discussion_r1759270780


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CassandraClusterInfoGroup.java:
##
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter.coordinatedwrite;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse;
+import org.apache.cassandra.bridge.CassandraVersionFeatures;
+import org.apache.cassandra.spark.bulkwriter.CassandraContext;
+import org.apache.cassandra.spark.bulkwriter.ClusterInfo;
+import org.apache.cassandra.spark.bulkwriter.WriteAvailability;
+import org.apache.cassandra.spark.bulkwriter.RingInstance;
+import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.MapUtils;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+public class CassandraClusterInfoGroup implements ClusterInfo, 
MultiClusterInfoProvider
+{
+private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraClusterInfoGroup.class);
+
+private static final long serialVersionUID = 5337884321245616172L;
+
+// immutable
+private final List clusterInfos;
+private transient volatile Map clusterInfoById;
+private transient volatile TokenRangeMapping 
consolidatedTokenRangeMapping;
+
+public CassandraClusterInfoGroup(List clusterInfos)
+{
+Preconditions.checkArgument(clusterInfos != null && 
!clusterInfos.isEmpty(),
+"clusterInfos cannot be null or empty");
+this.clusterInfos = Collections.unmodifiableList(clusterInfos);
+buildClusterInfoById();
+}
+
+@Override
+public void refreshClusterInfo()
+{
+runOnEach(ClusterInfo::refreshClusterInfo);
+}
+
+@Override
+public TokenRangeMapping getTokenRangeMapping(boolean cached)
+{
+if (clusterInfos.size() == 1)
+{
+return clusterInfos.get(0).getTokenRangeMapping(cached);
+}
+
+if (!cached || consolidatedTokenRangeMapping == null)
+{
+synchronized (this)
+{
+// return immediately if consolidatedTokenRangeMapping has 
been initialized and call-site asks for the cached value
+if (cached && consolidatedTokenRangeMapping != null)
+{
+return consolidatedTokenRangeMapping;
+}
+Map> aggregated = 
applyOnEach(c -> c.getTokenRangeMapping(cached));
+consolidatedTokenRangeMapping = 
TokenRangeMapping.consolidate(new ArrayList<>(aggregated.values()));
+}
+}
+
+return consolidatedTokenRangeMapping;
+}
+
+/**
+ * @return the lowest cassandra version among all clusters
+ */
+@Override
+public String getLowestCassandraVersion()
+{
+if (clusterInfos.size() == 1)
+{
+return clusterInfos.get(0).getLowestCassandraVersion();
+}
+
+Map aggregated = 
applyOnEach(ClusterInfo::getLowestCassandraVersion);
+List versions = aggregated.values()
+.stream()
+
.map(CassandraVersionFeatures::cassandraVersionFeaturesFromCassandraVersion)
+.sorted()
+
.collect(Collectors.toList());
+CassandraVersionFeatures first = versions

Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]

2024-09-12 Thread via GitHub


yifan-c commented on code in PR #80:
URL: 
https://github.com/apache/cassandra-analytics/pull/80#discussion_r1757684085


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CassandraClusterInfoGroup.java:
##
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter.coordinatedwrite;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse;
+import org.apache.cassandra.bridge.CassandraVersionFeatures;
+import org.apache.cassandra.spark.bulkwriter.CassandraContext;
+import org.apache.cassandra.spark.bulkwriter.ClusterInfo;
+import org.apache.cassandra.spark.bulkwriter.WriteAvailability;
+import org.apache.cassandra.spark.bulkwriter.RingInstance;
+import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.MapUtils;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+public class CassandraClusterInfoGroup implements ClusterInfo, 
MultiClusterInfoProvider
+{
+private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraClusterInfoGroup.class);
+
+private static final long serialVersionUID = 5337884321245616172L;
+
+// immutable
+private final List clusterInfos;
+private transient volatile Map clusterInfoById;
+private transient volatile TokenRangeMapping 
consolidatedTokenRangeMapping;
+
+public CassandraClusterInfoGroup(List clusterInfos)
+{
+Preconditions.checkArgument(clusterInfos != null && 
!clusterInfos.isEmpty(),
+"clusterInfos cannot be null or empty");
+this.clusterInfos = Collections.unmodifiableList(clusterInfos);
+buildClusterInfoById();
+}
+
+@Override
+public void refreshClusterInfo()
+{
+runOnEach(ClusterInfo::refreshClusterInfo);
+}
+
+@Override
+public TokenRangeMapping getTokenRangeMapping(boolean cached)
+{
+if (clusterInfos.size() == 1)
+{
+return clusterInfos.get(0).getTokenRangeMapping(cached);
+}
+
+if (!cached || consolidatedTokenRangeMapping == null)
+{
+synchronized (this)
+{
+// return immediately if consolidatedTokenRangeMapping has 
been initialized and call-site asks for the cached value
+if (cached && consolidatedTokenRangeMapping != null)
+{
+return consolidatedTokenRangeMapping;
+}
+Map> aggregated = 
applyOnEach(c -> c.getTokenRangeMapping(cached));
+consolidatedTokenRangeMapping = 
TokenRangeMapping.consolidate(new ArrayList<>(aggregated.values()));
+}
+}
+
+return consolidatedTokenRangeMapping;
+}
+
+/**
+ * @return the lowest cassandra version among all clusters
+ */
+@Override
+public String getLowestCassandraVersion()
+{
+if (clusterInfos.size() == 1)
+{
+return clusterInfos.get(0).getLowestCassandraVersion();
+}
+
+Map aggregated = 
applyOnEach(ClusterInfo::getLowestCassandraVersion);
+List versions = aggregated.values()
+.stream()
+
.map(CassandraVersionFeatures::cassandraVersionFeaturesFromCassandraVersion)
+.sorted()
+
.collect(Collectors.toList());
+CassandraVersionFeatures first = versions

Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]

2024-09-12 Thread via GitHub


yifan-c commented on code in PR #80:
URL: 
https://github.com/apache/cassandra-analytics/pull/80#discussion_r1757680508


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CassandraClusterInfoGroup.java:
##
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter.coordinatedwrite;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse;
+import org.apache.cassandra.bridge.CassandraVersionFeatures;
+import org.apache.cassandra.spark.bulkwriter.CassandraContext;
+import org.apache.cassandra.spark.bulkwriter.ClusterInfo;
+import org.apache.cassandra.spark.bulkwriter.WriteAvailability;
+import org.apache.cassandra.spark.bulkwriter.RingInstance;
+import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.MapUtils;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+public class CassandraClusterInfoGroup implements ClusterInfo, 
MultiClusterInfoProvider

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]

2024-09-12 Thread via GitHub


yifan-c commented on code in PR #80:
URL: 
https://github.com/apache/cassandra-analytics/pull/80#discussion_r1757679225


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CassandraClusterInfoGroup.java:
##
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter.coordinatedwrite;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse;
+import org.apache.cassandra.bridge.CassandraVersionFeatures;
+import org.apache.cassandra.spark.bulkwriter.CassandraContext;
+import org.apache.cassandra.spark.bulkwriter.ClusterInfo;
+import org.apache.cassandra.spark.bulkwriter.WriteAvailability;
+import org.apache.cassandra.spark.bulkwriter.RingInstance;
+import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.MapUtils;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+public class CassandraClusterInfoGroup implements ClusterInfo, 
MultiClusterInfoProvider
+{
+private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraClusterInfoGroup.class);
+
+private static final long serialVersionUID = 5337884321245616172L;
+
+// immutable
+private final List clusterInfos;
+private transient volatile Map clusterInfoById;
+private transient volatile TokenRangeMapping 
consolidatedTokenRangeMapping;
+
+public CassandraClusterInfoGroup(List clusterInfos)
+{
+Preconditions.checkArgument(clusterInfos != null && 
!clusterInfos.isEmpty(),
+"clusterInfos cannot be null or empty");
+this.clusterInfos = Collections.unmodifiableList(clusterInfos);
+buildClusterInfoById();
+}
+
+@Override
+public void refreshClusterInfo()
+{
+runOnEach(ClusterInfo::refreshClusterInfo);
+}
+
+@Override
+public TokenRangeMapping getTokenRangeMapping(boolean cached)
+{
+if (clusterInfos.size() == 1)
+{
+return clusterInfos.get(0).getTokenRangeMapping(cached);
+}
+
+if (!cached || consolidatedTokenRangeMapping == null)
+{
+synchronized (this)
+{
+// return immediately if consolidatedTokenRangeMapping has 
been initialized and call-site asks for the cached value
+if (cached && consolidatedTokenRangeMapping != null)
+{
+return consolidatedTokenRangeMapping;
+}
+Map> aggregated = 
applyOnEach(c -> c.getTokenRangeMapping(cached));
+consolidatedTokenRangeMapping = 
TokenRangeMapping.consolidate(new ArrayList<>(aggregated.values()));
+}
+}
+
+return consolidatedTokenRangeMapping;
+}
+
+/**
+ * @return the lowest cassandra version among all clusters
+ */
+@Override
+public String getLowestCassandraVersion()
+{
+if (clusterInfos.size() == 1)
+{
+return clusterInfos.get(0).getLowestCassandraVersion();
+}
+
+Map aggregated = 
applyOnEach(ClusterInfo::getLowestCassandraVersion);
+List versions = aggregated.values()
+.stream()
+
.map(CassandraVersionFeatures::cassandraVersionFeaturesFromCassandraVersion)
+.sorted()
+
.collect(Collectors.toList());
+CassandraVersionFeatures first = versions

Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]

2024-09-12 Thread via GitHub


yifan-c commented on code in PR #80:
URL: 
https://github.com/apache/cassandra-analytics/pull/80#discussion_r1757674951


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/MultiClusterInfoProvider.java:
##
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter.coordinatedwrite;
+
+import java.util.List;
+
+import org.apache.cassandra.spark.bulkwriter.ClusterInfo;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Provider for multiple ClusterInfo and lookup
+ */
+public interface MultiClusterInfoProvider

Review Comment:
   I think `Provider` qualifies here. The interface is to provide the getters 
for clusters. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]

2024-09-12 Thread via GitHub


yifan-c commented on code in PR #80:
URL: 
https://github.com/apache/cassandra-analytics/pull/80#discussion_r1757673435


##
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/MapUtils.java:
##
@@ -279,4 +282,19 @@ public static  T resolveDeprecated(Map 
options, String option
 
 return deprecatedOptionValue == null ? resolver.apply(null) : 
deprecatedOptionValue;
 }
+
+/**
+ * Get the first map entry from the map
+ *
+ * @return the first map entry, if there are at least one entry in the map
+ * @throws NoSuchElementException if the map is emtpy
+ */
+public static  Map.Entry firstEntry(@NotNull Map map)

Review Comment:
   ok. removing the code. It is not as useful. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]

2024-09-12 Thread via GitHub


yifan-c commented on code in PR #80:
URL: 
https://github.com/apache/cassandra-analytics/pull/80#discussion_r1757673277


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CassandraClusterInfoGroup.java:
##
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter.coordinatedwrite;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse;
+import org.apache.cassandra.bridge.CassandraVersionFeatures;
+import org.apache.cassandra.spark.bulkwriter.CassandraContext;
+import org.apache.cassandra.spark.bulkwriter.ClusterInfo;
+import org.apache.cassandra.spark.bulkwriter.WriteAvailability;
+import org.apache.cassandra.spark.bulkwriter.RingInstance;
+import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.MapUtils;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+public class CassandraClusterInfoGroup implements ClusterInfo, 
MultiClusterInfoProvider
+{
+private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraClusterInfoGroup.class);
+
+private static final long serialVersionUID = 5337884321245616172L;
+
+// immutable
+private final List clusterInfos;
+private final Map clusterInfoById;
+// map key might be stale, e.g. instance is removed from cluster, but it 
remains in the map. In such case, we do not expect call-sites use the stale key
+private final Map instanceToClusterMap = new 
HashMap<>();
+
+public CassandraClusterInfoGroup(List clusterInfos)
+{
+Preconditions.checkArgument(clusterInfos != null && 
!clusterInfos.isEmpty(),
+"clusterInfos cannot be null or empty");
+this.clusterInfos = Collections.unmodifiableList(clusterInfos);
+this.clusterInfoById = 
clusterInfos.stream().collect(Collectors.toMap(ClusterInfo::clusterId, 
Function.identity()));
+}
+
+@Override
+public void refreshClusterInfo()
+{
+runOnEach(ClusterInfo::refreshClusterInfo);
+}
+
+@Override
+public TokenRangeMapping getTokenRangeMapping(boolean cached)
+{
+if (clusterInfos.size() == 1)
+{
+return clusterInfos.get(0).getTokenRangeMapping(cached);
+}
+
+Map> aggregated = 
applyOnEach(c -> c.getTokenRangeMapping(cached));
+// When there are multiple clusters, populate the reverse lookup map 
when fetching latest or initializing
+if (!cached || aggregated.isEmpty())
+{
+// todo: synchronize and clear the reverse lookup map?
+instanceToClusterMap.clear();
+aggregated.forEach((clusterId, mapping) -> mapping.getTokenRanges()
+  .keySet()
+  
.forEach(instance -> instanceToClusterMap.put(instance, clusterId)));
+}
+
+return TokenRangeMapping.consolidate(new 
ArrayList<>(aggregated.values()));
+}
+
+/**
+ * @return the lowest cassandra version among all clusters
+ */
+@Override
+public String getLowestCassandraVersion()
+{
+if (clusterInfos.size() == 1)
+{
+return clusterInfos.get(0).getLowestCassandraVersion();
+}
+
+Map aggregated = 
applyOnEach(ClusterInfo::getLowestCassandraVersion);
+List versions = aggregated.values()
+.stream()
+
.map(CassandraVersionF

Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]

2024-09-12 Thread via GitHub


yifan-c commented on code in PR #80:
URL: 
https://github.com/apache/cassandra-analytics/pull/80#discussion_r1757669477


##
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMappingTest.java:
##
@@ -105,16 +101,145 @@ void testCreateTokenRangeMappingWithPending()
 }
 TokenRangeMapping topology = TokenRangeMapping.create(() 
-> response,
 () 
-> Partitioner.Murmur3Partitioner,
-() 
-> rf, RingInstance::new);
+
RingInstance::new);
 assertThat(topology.pendingInstances()).hasSize(pendingCount);
 }
 
+@Test
+void testConsolidateListOfTokenRangeMappingFails()

Review Comment:
   The `TokenRangeMapping` covers the entire ring and contains only the 
unwrapped ranges already. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]

2024-09-12 Thread via GitHub


JeetKunDoug commented on code in PR #80:
URL: 
https://github.com/apache/cassandra-analytics/pull/80#discussion_r1757195386


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/MultiClusterInfoProvider.java:
##
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter.coordinatedwrite;
+
+import java.util.List;
+
+import org.apache.cassandra.spark.bulkwriter.ClusterInfo;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Provider for multiple ClusterInfo and lookup
+ */
+public interface MultiClusterInfoProvider

Review Comment:
   NIT: this is just `MultiClusterInfo` like the previous PR - no need for 
`Provider` in the interface name.



##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CassandraClusterInfoGroup.java:
##
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter.coordinatedwrite;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse;
+import org.apache.cassandra.bridge.CassandraVersionFeatures;
+import org.apache.cassandra.spark.bulkwriter.CassandraContext;
+import org.apache.cassandra.spark.bulkwriter.ClusterInfo;
+import org.apache.cassandra.spark.bulkwriter.WriteAvailability;
+import org.apache.cassandra.spark.bulkwriter.RingInstance;
+import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.MapUtils;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+public class CassandraClusterInfoGroup implements ClusterInfo, 
MultiClusterInfoProvider
+{
+private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraClusterInfoGroup.class);
+
+private static final long serialVersionUID = 5337884321245616172L;
+
+// immutable
+private final List clusterInfos;
+private final Map clusterInfoById;
+// map key might be stale, e.g. instance is removed from cluster, but it 
remains in the map. In such case, we do not expect call-sites use the stale key
+private final Map instanceToClusterMap = new 
HashMap<>();
+
+public CassandraClusterInfoGroup(List clusterInfos)
+{
+Preconditions.checkArgument(clusterInfos != null && 
!clusterInfos.isEmpty(),
+"clusterInfos cannot be null or empty");
+this.clusterInfos = Collections.unmodifiableList(clusterInfos);
+this.clusterInfoById = 
clusterInfos.stream().collect(Collectors.toMap(ClusterInfo::clusterId, 
Function.identity()));
+}
+
+@Override
+public void refreshClusterInfo()
+{
+runOnEach(ClusterInfo::refreshClusterInfo);
+}
+
+@Override
+public TokenRangeMapping getTokenRangeMapping(boolean 

[PR] Improve build-dependencies.sh script to set CASSANDRA_USE_JDK11 if an… [cassandra-analytics]

2024-09-12 Thread via GitHub


jberragan opened a new pull request, #82:
URL: https://github.com/apache/cassandra-analytics/pull/82

   …alytics is running on JDK11


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]

2024-09-12 Thread via GitHub


JeetKunDoug commented on code in PR #80:
URL: 
https://github.com/apache/cassandra-analytics/pull/80#discussion_r1757005825


##
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMappingTest.java:
##
@@ -105,16 +101,145 @@ void testCreateTokenRangeMappingWithPending()
 }
 TokenRangeMapping topology = TokenRangeMapping.create(() 
-> response,
 () 
-> Partitioner.Murmur3Partitioner,
-() 
-> rf, RingInstance::new);
+
RingInstance::new);
 assertThat(topology.pendingInstances()).hasSize(pendingCount);
 }
 
+@Test
+void testConsolidateListOfTokenRangeMappingFails()

Review Comment:
   Is it possible to add some tests (or modify the replica building logic) to 
cover the entire token range of the partitioner so we have wrap-around ranges 
in these tests please?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]

2024-09-11 Thread via GitHub


yifan-c commented on code in PR #80:
URL: 
https://github.com/apache/cassandra-analytics/pull/80#discussion_r1756085954


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMapping.java:
##
@@ -113,12 +139,10 @@ Multimap> 
tokenRangesByInstance(List writeRepl
 }
 
 public TokenRangeMapping(Partitioner partitioner,
- ReplicationFactor replicationFactor,

Review Comment:
   Since `TokenRangeMapping` now can be consolidated, it no longer makes sense 
to contain ReplicationFactor, which is tie to cluster. 
   The `TokenRangeMapping` or topology is only to describe the ring layout. 
`ReplicationFactor` is for consistency coordination. It is not part of 
`TokenRangeMapping` at the first place. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]

2024-09-11 Thread via GitHub


yifan-c commented on code in PR #80:
URL: 
https://github.com/apache/cassandra-analytics/pull/80#discussion_r1756080437


##
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CqlTable.java:
##
@@ -39,7 +39,7 @@
 @SuppressWarnings({ "WeakerAccess", "unused" })
 public class CqlTable implements Serializable
 {
-public static final long serialVersionUID = 42L;
+private static final long serialVersionUID = 1018995207366817661L;

Review Comment:
   42 is the ultimate answer, but likely a common one. Changed to a random 
value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]

2024-09-11 Thread via GitHub


yifan-c commented on code in PR #80:
URL: 
https://github.com/apache/cassandra-analytics/pull/80#discussion_r1756079450


##
cassandra-analytics-common/build.gradle:
##
@@ -41,20 +41,21 @@ publishing {
 }
 
 dependencies {
-implementation "org.slf4j:slf4j-api:${slf4jApiVersion}"
-compileOnly "com.esotericsoftware:kryo-shaded:${kryoVersion}"
-compileOnly "com.google.guava:guava:${guavaVersion}"
-compileOnly 
"com.fasterxml.jackson.core:jackson-annotations:${jacksonVersion}"
-compileOnly "com.fasterxml.jackson.core:jackson-core:${jacksonVersion}"
-compileOnly "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"
+implementation("org.slf4j:slf4j-api:${slf4jApiVersion}")
+compileOnly("com.esotericsoftware:kryo-shaded:${kryoVersion}")
+compileOnly("com.google.guava:guava:${guavaVersion}")
+
compileOnly("com.fasterxml.jackson.core:jackson-annotations:${jacksonVersion}")
+compileOnly("com.fasterxml.jackson.core:jackson-core:${jacksonVersion}")
+
compileOnly("com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}")
 
-testImplementation "com.google.guava:guava:${guavaVersion}"
-testImplementation 
"com.fasterxml.jackson.core:jackson-annotations:${jacksonVersion}"
-testImplementation 
"com.fasterxml.jackson.core:jackson-core:${jacksonVersion}"
-testImplementation 
"com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"
-
testImplementation("org.junit.jupiter:junit-jupiter-api:${project.junitVersion}")
-
testImplementation("org.junit.jupiter:junit-jupiter-params:${project.junitVersion}")
-
testImplementation("org.junit.jupiter:junit-jupiter-engine:${project.junitVersion}")
+testImplementation("com.google.guava:guava:${guavaVersion}")
+
testImplementation("com.fasterxml.jackson.core:jackson-annotations:${jacksonVersion}")
+
testImplementation("com.fasterxml.jackson.core:jackson-core:${jacksonVersion}")
+
testImplementation("com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}")
+testImplementation("org.junit.jupiter:junit-jupiter-api:${junitVersion}")
+
testImplementation("org.junit.jupiter:junit-jupiter-params:${junitVersion}")
+
testImplementation("org.junit.jupiter:junit-jupiter-engine:${junitVersion}")
+testImplementation("org.assertj:assertj-core:${assertjCoreVersion}")

Review Comment:
   the actual change is this line, adding assertj-core dependency. Other lines 
in the file are changed to unify the dependency declaration style. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]

2024-09-11 Thread via GitHub


yifan-c commented on PR #80:
URL: 
https://github.com/apache/cassandra-analytics/pull/80#issuecomment-2345227918

   > I will rebase once #79 is merged
   
   #79 is merged. Rebase is done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19909: Add writer options COORDINATED_WRITE_CONFIG to define coordinated write to multiple Cassandra clusters [cassandra-analytics]

2024-09-11 Thread via GitHub


yifan-c merged PR #79:
URL: https://github.com/apache/cassandra-analytics/pull/79


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] Ninja fix for CASSANDRA-19815 [cassandra-analytics]

2024-09-11 Thread via GitHub


jberragan commented on PR #81:
URL: 
https://github.com/apache/cassandra-analytics/pull/81#issuecomment-2345106427

   Green CircleCI: 
https://app.circleci.com/pipelines/github/jberragan/cassandra-analytics/100/workflows/813b00fb-1807-4471-bb94-d2685bbe7c91


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19909: Add writer options COORDINATED_WRITE_CONFIG to define coordinated write to multiple Cassandra clusters [cassandra-analytics]

2024-09-11 Thread via GitHub


JeetKunDoug commented on code in PR #79:
URL: 
https://github.com/apache/cassandra-analytics/pull/79#discussion_r1755318108


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java:
##
@@ -223,7 +227,8 @@ public BulkSparkConf(SparkConf conf, Map 
options)
 }
 this.jobTimeoutSeconds = MapUtils.getLong(options, 
WriterOptions.JOB_TIMEOUT_SECONDS.name(), -1L);
 this.configuredJobId = MapUtils.getOrDefault(options, 
WriterOptions.JOB_ID.name(), null);
-
+this.coordinatedWriteConfJson = MapUtils.getOrDefault(options, 
WriterOptions.COORDINATED_WRITE_CONFIG.name(), null);
+this.coordinatedWriteConf = buildCoordinatedWriteConf(); // must only 
call the build method at this step, after the related fields have been resolved

Review Comment:
   If, instead, you passed the necessary parameters to 
`buildCoordinatedWriteConf` you could ensure the order was preserved, rather 
than just having a comment here. Move the build method to CoordinatedWriteConf 
as a static method and call it where necessary.



##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java:
##
@@ -282,6 +287,63 @@ Set sidecarContactPoints()
 return sidecarContactPoints;
 }
 
+public boolean isCoordinatedWriteConfigured()
+{
+return coordinatedWriteConf != null;
+}
+
+public CoordinatedWriteConf coordinatedWriteConf()
+{
+if (coordinatedWriteConf == null)
+{
+coordinatedWriteConf = buildCoordinatedWriteConf();
+}
+
+return coordinatedWriteConf;
+}
+
+@Nullable
+protected CoordinatedWriteConf buildCoordinatedWriteConf()

Review Comment:
   See above - this logic I think really belongs in 
CoordinatedWriteConf.build() or something.



##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CoordinatedWriteConf.java:
##
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter.coordinatedwrite;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.TypeFactory;
+import org.apache.cassandra.sidecar.client.SidecarInstance;
+import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
+import org.apache.cassandra.spark.common.SidecarInstanceFactory;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.stream.Collectors.collectingAndThen;
+import static java.util.stream.Collectors.toSet;
+
+/**
+ * Data class containing the configurations required for coordinated write.
+ * The serialization format is JSON string. The class takes care of 
serialization and deserialization.
+ */
+public class CoordinatedWriteConf
+{
+private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+// The runtime type of ClusterConfProvider is erased; use clustersOf 
method to read the desired type back
+private final Map clusters;
+
+/**
+ * Parse JSON string and create a CoordinatedWriteConf object with the 
specified ClusterConfProvider format
+ *
+ * @param json JSON string
+ * @param clusterConfType concrete type of ClusterConfProvider that can be 
used for JSON serialization and deserialization
+ * @return CoordinatedWriteConf object
+ * @param  subtype of ClusterConfProvider
+ */
+public static 
+CoordinatedWriteConf fromJson(String json, Class clusterConfType)
+{
+JavaType javaType = 
TypeFactory.defaultInstance().constructMapType(Map.class, String.class, 
clusterConfType);
+   

[PR] Ninja fix for CASSANDRA-19815 [cassandra-analytics]

2024-09-11 Thread via GitHub


jberragan opened a new pull request, #81:
URL: https://github.com/apache/cassandra-analytics/pull/81

   Fix Scala 2.13 build


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19815: Decouple Cassandra types from Spark types so Cassandra types can be u… [cassandra-analytics]

2024-09-11 Thread via GitHub


frankgh merged PR #71:
URL: https://github.com/apache/cassandra-analytics/pull/71


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [I] Donate GoCQL to Apache Cassandra [YOUR ACTION REQUIRED] [cassandra-gocql-driver]

2024-09-11 Thread via GitHub


mattheath commented on issue #1751:
URL: 
https://github.com/apache/cassandra-gocql-driver/issues/1751#issuecomment-234838

   @michaelsembwever I'm happy to assign any contributions I've made to this to 
the ASF. Sorry for the delay! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]

2024-09-10 Thread via GitHub


yifan-c commented on PR #80:
URL: 
https://github.com/apache/cassandra-analytics/pull/80#issuecomment-2342424833

   I will rebase once https://github.com/apache/cassandra-analytics/pull/79 is 
merged 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



[PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]

2024-09-10 Thread via GitHub


yifan-c opened a new pull request, #80:
URL: https://github.com/apache/cassandra-analytics/pull/80

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19909: Add writer options COORDINATED_WRITE_CONF to define coordinated write to multiple Cassandra clusters [cassandra-analytics]

2024-09-10 Thread via GitHub


yifan-c commented on code in PR #79:
URL: 
https://github.com/apache/cassandra-analytics/pull/79#discussion_r1752673693


##
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CoordinatedWriteConfTest.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter.coordinatedwrite;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.jupiter.api.Test;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.exc.ValueInstantiationException;
+import org.apache.cassandra.sidecar.client.SidecarInstance;
+import org.apache.cassandra.sidecar.client.SidecarInstanceImpl;
+import 
org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf.ClusterConfProvider;
+import 
org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf.SimpleClusterConf;
+import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel.CL;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class CoordinatedWriteConfTest

Review Comment:
   I believe the test case is already there.



##
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java:
##
@@ -275,6 +277,70 @@ void testSidecarContactPoints()
 .isNull();
 }
 
+@Test
+void testReadCoordinatedWriteConfFails()

Review Comment:
   I believe the test case is already there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19815: Decouple Cassandra types from Spark types so Cassandra types can be u… [cassandra-analytics]

2024-09-10 Thread via GitHub


jberragan commented on code in PR #71:
URL: 
https://github.com/apache/cassandra-analytics/pull/71#discussion_r1752638296


##
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java:
##
@@ -176,16 +176,18 @@ public CassandraBridgeImplementation()
 kryoSerializers.put(CqlField.class, new 
CqlField.Serializer(cassandraTypes()));
 kryoSerializers.put(CqlTable.class, new 
CqlTable.Serializer(cassandraTypes()));
 kryoSerializers.put(CqlUdt.class, new 
CqlUdt.Serializer(cassandraTypes()));
-
-nativeTypes = 
allTypes().stream().collect(Collectors.toMap(CqlField.CqlType::name, 
Function.identity()));
 }
 
-@Override
 public CassandraTypes cassandraTypes()
 {
 return CassandraTypesImplementation.INSTANCE;
 }
 
+public SparkSqlTypeConverter typeConverter()
+{
+return SparkSqlTypeConverterImplementation.INSTANCE;

Review Comment:
   We could remove the Spark dependency from `cassandra-bridge` but it would be 
a lot more work and probably better to do outside this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19815: Decouple Cassandra types from Spark types so Cassandra types can be u… [cassandra-analytics]

2024-09-10 Thread via GitHub


jberragan commented on code in PR #71:
URL: 
https://github.com/apache/cassandra-analytics/pull/71#discussion_r1752631902


##
cassandra-bridge/build.gradle:
##
@@ -40,6 +40,7 @@ publishing {
 
 dependencies {
 api(project(':cassandra-analytics-common'))
+api(project(':cassandra-analytics-spark-converter'))

Review Comment:
   I think bridge is the connecting point between Cassandra and Spark. It 
already depends on the `spark-core`, `spark_sql`  jars.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



[PR] CASSANDRA-19909: Add writer options COORDINATED_WRITE_CONF to define coordinated write to multiple Cassandra clusters [cassandra-analytics]

2024-09-10 Thread via GitHub


yifan-c opened a new pull request, #79:
URL: https://github.com/apache/cassandra-analytics/pull/79

   The option specifies the configuration (in JSON) for coordinated write. See 
org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf. 
When the option is present, SIDECAR_CONTACT_POINTS, SIDECAR_INSTANCES and 
LOCAL_DC are ignored if they are present.
   
   Patch by Yifan Cai; Reviewed by TBD for CASSANDRA-19909


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [I] How to set limit for pool connection [cassandra-gocql-driver]

2024-09-10 Thread via GitHub


OleksiienkoMykyta commented on issue #1741:
URL: 
https://github.com/apache/cassandra-gocql-driver/issues/1741#issuecomment-2340521705

   NumConns sets an initial number of connections per node( by default 2) but 
not the maximum number of connections per node. Also by default, gocql 
maintains a minimum number of connections, but as demand increases, it will 
open more connections to handle the load. I think you are looking for similar 
limitation functionality as Python and Java drivers have. So we can add 
MaxNumberOfConnections or something to set the wanted limit, and we should add 
a detailed NumConns description to the documentation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19900: Make the compression cache configurable to reduce he… [cassandra-analytics]

2024-09-09 Thread via GitHub


belliottsmith commented on code in PR #77:
URL: 
https://github.com/apache/cassandra-analytics/pull/77#discussion_r1749723810


##
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/Properties.java:
##
@@ -40,6 +40,8 @@ public final class Properties
 public static final int DEFAULT_MAX_RETRIES = 10;
 public static final long DEFAULT_MILLIS_TO_SLEEP = 500;
 public static final int DEFAULT_MAX_POOL_SIZE = 64;
+public static final boolean DEFAULT_CACHE_COMPRESSION_METADATA_KEY = true;
+public static final int DEFAULT_MAX_SIZE_CACHE_COMPRESSION_METADATA_KEY = 
8388608; // 8MiB

Review Comment:
   for future reference, shifting left in multiples of 10 multiplies by KiB. so 
8 << 20 is a clean way to say 8 Mebibytes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19901: Refactor TokenRangeMapping to use proper types instead of Strings [cassandra-analytics]

2024-09-06 Thread via GitHub


frankgh commented on code in PR #78:
URL: 
https://github.com/apache/cassandra-analytics/pull/78#discussion_r1747575920


##
scripts/build-dtest-jars.sh:
##
@@ -118,4 +118,6 @@ else
 exit ${RETURN}
 fi
   done
+  # always delete the Cassandra source after dtest.jar is built to avoid 
confusing IDE

Review Comment:
   can we alternatively exclude these files from the IDE's indexing?



##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMapping.java:
##
@@ -22,55 +22,131 @@
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Range;
 import com.google.common.collect.RangeMap;
 import com.google.common.collect.TreeRangeMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse;
+import 
o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse.ReplicaInfo;
 import 
o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse.ReplicaMetadata;
-import org.apache.cassandra.spark.bulkwriter.RingInstance;
 import org.apache.cassandra.spark.common.model.CassandraInstance;
 import org.apache.cassandra.spark.data.ReplicationFactor;
 import org.apache.cassandra.spark.data.partitioner.Partitioner;
 import org.jetbrains.annotations.Nullable;
 
-// TODO: refactor to improve the return types of methods to use `Instance` 
instead of String and cleanup
-public class TokenRangeMapping implements 
Serializable
+public class TokenRangeMapping implements 
Serializable
 {
 private static final long serialVersionUID = -7284933683815811160L;
+private static final Logger LOGGER = 
LoggerFactory.getLogger(TokenRangeMapping.class);
+
 private final Partitioner partitioner;
 private final ReplicationFactor replicationFactor;
-private final transient Set replacementInstances;
-private final transient RangeMap> 
replicasByTokenRange;
-private final transient Multimap> 
tokenRangeMap;
-private final transient Map> writeReplicasByDC;
-private final transient Map> pendingReplicasByDC;
-private final transient List replicaMetadata;
+private final transient Set allInstances;
+private final transient RangeMap> replicasByTokenRange;
+private final transient Multimap> tokenRangeMap;
+private final transient Map> writeReplicasByDC;
+private final transient Map> pendingReplicasByDC;
+
+public static 
+TokenRangeMapping create(Supplier 
topologySupplier,
+Supplier partitionerSupplier,
+Supplier 
replicationFactorSupplier,
+Function instanceCreator)
+{
+TokenRangeReplicasResponse response = topologySupplier.get();
+Map instanceByIpAddress = new 
HashMap<>(response.replicaMetadata().size());
+response.replicaMetadata()
+.forEach((ipAddress, metadata) -> 
instanceByIpAddress.put(ipAddress, instanceCreator.apply(metadata)));
+
+Multimap> tokenRangesByInstance = 
tokenRangesByInstance(response.writeReplicas(),
+   
  instanceByIpAddress);
+
+// Each token range has hosts by DC. We collate them across all ranges 
into all hosts by DC
+Map> writeReplicasByDC = new HashMap<>();
+Map> pendingReplicasByDC = new HashMap<>();
+Set allInstances = new HashSet<>(instanceByIpAddress.values());
+for (I instance : allInstances)
+{
+Set dc = 
writeReplicasByDC.computeIfAbsent(instance.datacenter(), k -> new HashSet<>());
+dc.add(instance);
+if (instance.nodeState().isPending)
+{
+Set pendingInDc = 
pendingReplicasByDC.computeIfAbsent(instance.datacenter(), k -> new 
HashSet<>());
+pendingInDc.add(instance);
+}
+}
+
+if (LOGGER.isDebugEnabled())
+{
+LOGGER.debug("Fetched token-ranges with dcs={}, 
write_replica_count={}, pending_replica_count={}",
+ writeReplicasByDC.keySet(),
+ 
writeReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size(),
+ 
pendingReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size());
+}
+
+Map replicaMetadata = 
response.r

Re: [PR] CASSANDRA-19900: Make the compression cache configurable to reduce he… [cassandra-analytics]

2024-09-05 Thread via GitHub


frankgh commented on code in PR #77:
URL: 
https://github.com/apache/cassandra-analytics/pull/77#discussion_r1746183779


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java:
##
@@ -210,6 +212,8 @@ public static final class ClientConfig
 public static final String CHUNK_BUFFER_SIZE_BYTES_KEY = 
"chunkBufferSizeBytes";
 public static final String MAX_POOL_SIZE_KEY = "maxPoolSize";
 public static final String TIMEOUT_SECONDS_KEY = "timeoutSeconds";
+public static final String CACHE_COMPRESSION_METADATA_KEY = 
"cacheCompressionMetadata";
+public static final String MAX_SIZE_CACHE_COMPRESSION_METADATA_KEY = 
"maxCacheCompressionMetadata";

Review Comment:
   Can we add the units for this param?
   ```suggestion
   public static final String MAX_SIZE_CACHE_COMPRESSION_METADATA_KEY = 
"maxCacheCompressionMetadataMiB";
   ```



##
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/Properties.java:
##
@@ -40,6 +40,8 @@ public final class Properties
 public static final int DEFAULT_MAX_RETRIES = 10;
 public static final long DEFAULT_MILLIS_TO_SLEEP = 500;
 public static final int DEFAULT_MAX_POOL_SIZE = 64;
+public static final boolean DEFAULT_CACHE_COMPRESSION_METADATA_KEY = true;
+public static final int DEFAULT_MAX_SIZE_CACHE_COMPRESSION_METADATA_KEY = 
8388608; // 8MiB

Review Comment:
   ```suggestion
   public static final int DEFAULT_MAX_SIZE_CACHE_COMPRESSION_METADATA_KEY 
= 8 * MEBI_BYTES; // 8MiB
   ```



##
cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java:
##
@@ -398,7 +414,19 @@ public static ClientConfig create(int userProvidedPort,
 maxPoolSize,
 timeoutSeconds,
 maxBufferOverride,
-chunkBufferOverride);
+chunkBufferOverride,
+cacheCompressionMetadata,
+maxSizeCacheCompressionMetadata);
+}
+
+public int maxSizeCacheCompressionMetadata()

Review Comment:
   can we add the units here as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19873 Removes checks for blocked instances from bulk-write … [cassandra-analytics]

2024-09-05 Thread via GitHub


frankgh merged PR #76:
URL: https://github.com/apache/cassandra-analytics/pull/76


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19815: Decouple Cassandra types from Spark types so Cassandra types can be u… [cassandra-analytics]

2024-09-05 Thread via GitHub


jberragan commented on code in PR #71:
URL: 
https://github.com/apache/cassandra-analytics/pull/71#discussion_r1745910617


##
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/TypeConverter.java:
##
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.data;
+
+import org.jetbrains.annotations.NotNull;
+
+public interface TypeConverter
+{
+default Object convert(CqlField.CqlType cqlType, @NotNull Object value)
+{
+return convert(cqlType, value, false);
+}

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [I] Donate GoCQL to Apache Cassandra [YOUR ACTION REQUIRED] [cassandra-gocql-driver]

2024-09-04 Thread via GitHub


Drahflow commented on issue #1751:
URL: 
https://github.com/apache/cassandra-gocql-driver/issues/1751#issuecomment-2329826936

   @michaelsembwever I have received copyright-assignment back from the client 
who paid for my (minor) contributions, and am happy to donate said copyright to 
the ASF.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19815: Decouple Cassandra types from Spark types so Cassandra types can be u… [cassandra-analytics]

2024-09-03 Thread via GitHub


jberragan commented on code in PR #71:
URL: 
https://github.com/apache/cassandra-analytics/pull/71#discussion_r1742768323


##
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/reader/DataTypeSerializationTests.java:
##
@@ -152,151 +153,153 @@ public void testUUID()
 public void testLong()
 {
 qt().forAll(TestUtils.bridges()).checkAssert(bridge -> {
-
assertTrue(bridge.bigint().deserialize(bridge.bigint().serialize(Long.MAX_VALUE))
 instanceof Long);
-assertEquals(Long.MAX_VALUE, bridge.bigint().deserialize(
-ByteBuffer.allocate(8).putLong(0, Long.MAX_VALUE)));
+
assertTrue(bridge.bigint().deserializeToType(bridge.typeConverter(), 
bridge.bigint().serialize(Long.MAX_VALUE)) instanceof Long);
+assertEquals(Long.MAX_VALUE, 
bridge.bigint().deserializeToType(bridge.typeConverter(),
+   
ByteBuffer.allocate(8).putLong(0, Long.MAX_VALUE)));
 qt().forAll(integers().all())
-.checkAssert(integer -> assertEquals((long) integer, 
bridge.bigint().deserialize(
-bridge.bigint().serialize((long) integer;
-assertEquals(Long.MAX_VALUE, 
bridge.bigint().deserialize(bridge.bigint().serialize(Long.MAX_VALUE)));
-assertEquals(Long.MIN_VALUE, 
bridge.bigint().deserialize(bridge.bigint().serialize(Long.MIN_VALUE)));
+.checkAssert(integer -> assertEquals((long) integer, 
bridge.bigint().deserializeToType(bridge.typeConverter(),
+   
bridge.bigint().serialize((long) integer;
+assertEquals(Long.MAX_VALUE, 
bridge.bigint().deserializeToJavaType(bridge.bigint().serialize(Long.MAX_VALUE)));
+assertEquals(Long.MIN_VALUE, 
bridge.bigint().deserializeToJavaType(bridge.bigint().serialize(Long.MIN_VALUE)));
 qt().withExamples(MAX_TESTS)
 .forAll(longs().all())
-.checkAssert(aLong -> assertEquals(aLong, 
bridge.bigint().deserialize(
-bridge.bigint().serialize(aLong;
+.checkAssert(aLong -> assertEquals(aLong, 
bridge.bigint().deserializeToType(bridge.typeConverter(),
+   
 bridge.bigint().serialize(aLong;
 });
 }
 
 @Test
 public void testDecimal()
 {
 qt().forAll(TestUtils.bridges()).checkAssert(bridge -> {
-assertTrue(bridge.decimal().deserialize(
-bridge.decimal().serialize(BigDecimal.valueOf(500L))) 
instanceof Decimal);
-assertEquals(Decimal.apply(500), bridge.decimal().deserialize(
-bridge.decimal().serialize(BigDecimal.valueOf(500L;
-assertNotSame(Decimal.apply(501), bridge.decimal().deserialize(
-bridge.decimal().serialize(BigDecimal.valueOf(500L;
-assertEquals(Decimal.apply(-1), bridge.decimal().deserialize(
-bridge.decimal().serialize(BigDecimal.valueOf(-1L;
-assertEquals(Decimal.apply(Long.MAX_VALUE), 
bridge.decimal().deserialize(
-
bridge.decimal().serialize(BigDecimal.valueOf(Long.MAX_VALUE;
-assertEquals(Decimal.apply(Long.MIN_VALUE), 
bridge.decimal().deserialize(
-
bridge.decimal().serialize(BigDecimal.valueOf(Long.MIN_VALUE;
-assertEquals(Decimal.apply(Integer.MAX_VALUE), 
bridge.decimal().deserialize(
-
bridge.decimal().serialize(BigDecimal.valueOf(Integer.MAX_VALUE;
-assertEquals(Decimal.apply(Integer.MIN_VALUE), 
bridge.decimal().deserialize(
-
bridge.decimal().serialize(BigDecimal.valueOf(Integer.MIN_VALUE;
+
assertTrue(bridge.decimal().deserializeToType(bridge.typeConverter(),
+  
bridge.decimal().serialize(BigDecimal.valueOf(500L))) instanceof Decimal);
+assertEquals(Decimal.apply(500), 
bridge.decimal().deserializeToType(bridge.typeConverter(),
+   
 bridge.decimal().serialize(BigDecimal.valueOf(500L;
+assertNotSame(Decimal.apply(501), 
bridge.decimal().deserializeToType(bridge.typeConverter(),
+   
  bridge.decimal().serialize(BigDecimal.valueOf(500L;
+assertEquals(Decimal.apply(-1), 
bridge.decimal().deserializeToType(bridge.typeConverter(),
+   
bridge.decimal().serialize(BigDecimal.valueOf(-1L;
+assertEquals(Decimal.apply(Long.MAX_VALUE), 
bridge.decimal().deserializeToT

Re: [I] Feature Request: Support Vector Type [cassandra-gocql-driver]

2024-09-03 Thread via GitHub


rcosnita commented on issue #1734:
URL: 
https://github.com/apache/cassandra-gocql-driver/issues/1734#issuecomment-2327419047

   Hello everyone,
   
   For people who are using version 4 of the protocol but need to be able to 
write vectors from their code base here is a possible idea:
   
   ```go
   package warm
   
   import (
"fmt"
   
"github.com/gocql/gocql"
   )
   
   func encInt(v int32) []byte {
return []byte{byte(v >> 24), byte(v >> 16), byte(v >> 8), byte(v)}
   }
   
   func encFloat(v float32) []byte {
return encInt(int32(math.Float32bits(v)))
   }
   
   type Float32Vector struct {
value  []float32
dimensions int
   }
   
   func (m *Float32Vector) MarshalCQL(info gocql.TypeInfo) ([]byte, error) {
if len(m.value) != m.dimensions {
return nil, fmt.Errorf("float32vector expects size %d but 
received size %d",
m.dimensions, len(m.value))
}
   
var results []byte
for _, part := range m.value {
results = append(results, encFloat(part)...)
}
   
return results, nil
   }
   
   func (m *Float32Vector) UnmarshalCQL(info gocql.TypeInfo, data []byte) error 
{
panic("unmarshalling vector is not fully implemented")
   }
   
   func (m *Float32Vector) Value() []float32 {
return m.value
   }
   
   func ensureVectorDimension(values []float32, dimensions int) []float32 {
if len(values) == dimensions {
return values
}
   
delta := dimensions - len(values)
result := make([]float32, dimensions)
for idx, v := range values {
if idx < dimensions {
result[idx] = v
} else {
  break
   }
}
   
for idx := len(values); idx < delta+len(values); idx++ {
result[idx] = 0.0
}
   
return result
   }
   
   func FromVectorFloat32(value []float32, dimensions int) *Float32Vector {
return &Float32Vector{
value:  ensureVectorDimension(value, dimensions),
dimensions: dimensions,
}
   }
   ```
   
   You can opt to skip the padding completely and return an error if that feels 
more natural for your use case. The encoding functions are extracted from the 
existing gocql codebase.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [I] gocql RetryPolicies still don't use query idempotence [cassandra-gocql-driver]

2024-09-03 Thread via GitHub


OleksiienkoMykyta commented on issue #1803:
URL: 
https://github.com/apache/cassandra-gocql-driver/issues/1803#issuecomment-2326628144

   @Rikkuru Thank you for providing the details. I reproduced the issue you 
faced, and as you said, according to documentation it's misbehavior. I added a 
small fix, retested it and it works properly now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [I] gocql RetryPolicies still don't use query idempotence [cassandra-gocql-driver]

2024-09-02 Thread via GitHub


Rikkuru commented on issue #1803:
URL: 
https://github.com/apache/cassandra-gocql-driver/issues/1803#issuecomment-2325019139

   > Please share an exec query you are using to reproduce the issue, if you 
tried multiple queries provide them as well.
   
   Hi I have an axample 
   
   main.go (go 1.21.11 ,  github.com/gocql/gocql v1.6.0)
   
   there is a  loop to wait for scylladb start and second loop to have time to 
down one of the nodes 
   ```
   package main
   
   import (
"context"
"fmt"
"time"
   
"github.com/gocql/gocql"
   )
   
   type MyRetryPolicy struct {
   }
   
   func (*MyRetryPolicy) Attempt(q gocql.RetryableQuery) bool {
if q.Attempts() > 2 {
return false
}
return true
   }
   
   func (*MyRetryPolicy) GetRetryType(error) gocql.RetryType {
return gocql.Retry
   }
   
   type LoggingObserver struct{}
   
   func (*LoggingObserver) ObserveQuery(ctx context.Context, q 
gocql.ObservedQuery) {
fmt.Printf("observer attempt: %d err: %s\n", q.Attempt, q.Err)
   }
   
   func main() {
cluster := gocql.NewCluster("some-scylla", "some-scylla3", 
"some-scylla2")
var err error
var s *gocql.Session
for i := 1; i <= 100; i++ {
fmt.Printf("%d attempt\n", i)
s, err = gocql.NewSession(*cluster)
if err == nil {
break
}
time.Sleep(1 * time.Second)
}
if err != nil {
fmt.Printf("session err: %s\n", err)
return
}
defer s.Close()
   
fmt.Printf("CONNECTED!")
   
time.Sleep(60 * time.Second)
for i := 1; i <= 100; i++ {
q := s.Query("INSERT INTO  mykeyspace.events(event_id, time, 
args) VALUES (?,?,?)", 1, gocql.UUIDFromTime(time.Now()), "test")
   
q.RetryPolicy(&MyRetryPolicy{})
q.Observer(&LoggingObserver{})
q.Consistency(gocql.All)
   
fmt.Printf("%d QUERY IsIdempotent: %v\n", i, q.IsIdempotent())
   
err = q.Exec()
if err != nil {
fmt.Printf("exec err: %s\n", err)
}
time.Sleep(1 * time.Second)
}
   }
   ```
   
   cql init for scylla 
   ```
   CREATE KEYSPACE mykeyspace WITH replication = {'class': 
'NetworkTopologyStrategy', 'datacenter1': '3'}  AND durable_writes = true;
   CREATE TABLE mykeyspace.events (event_id int, time timeuuid, args text, 
PRIMARY KEY ((event_id), time));
   ```
   
   docker-compose for nodes and main.go 
   ```
   version: '3'
   
   services:
 some-scylla:
   image: scylladb/scylla:5.1
   container_name: some-scylla
   command: --overprovisioned 1 --smp 1
   ports:
 - 9042:9042
   
 some-scylla2:
   image: scylladb/scylla:5.1
   container_name: some-scylla2
   command: --seeds=some-scylla --overprovisioned 1 --smp 1
   
 some-scylla3:
   image: scylladb/scylla:5.1
   container_name: some-scylla3
   command: --seeds=some-scylla --overprovisioned 1 --smp 1
   
 myapp2:
   image: golang:1.21.11
   container_name: myapp2
   command: go run /app/main.go
   working_dir: /app
   volumes:
   - ./:/app
   
   ```
   
   
   I can see in log that query is retried 
   ```
   2024-09-02 18:40:30 90 QUERY IsIdempotent: false
   2024-09-02 18:40:30 observer attempt: 0 err: Cannot achieve consistency 
level for cl ALL. Requires 3, alive 2
   2024-09-02 18:40:30 observer attempt: 1 err: Cannot achieve consistency 
level for cl ALL. Requires 3, alive 2
   2024-09-02 18:40:30 observer attempt: 2 err: Cannot achieve consistency 
level for cl ALL. Requires 3, alive 2
   2024-09-02 18:40:30 exec err: Cannot achieve consistency level for cl ALL. 
Requires 3, alive 2
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [I] Retry type Ignore does not clear error [cassandra-gocql-driver]

2024-09-02 Thread via GitHub


Rikkuru commented on issue #1808:
URL: 
https://github.com/apache/cassandra-gocql-driver/issues/1808#issuecomment-2324611544

   Example (on empty cluster , keyspace error is ok for our test )
   
   MyRetryPolicy can return Ignore or Rethrow - it changes nothing 
   
   ```
   package main
   
   import (
"context"
"fmt"
"time"
   
"github.com/gocql/gocql"
   )
   
   type MyRetryPolicy struct {
   }
   
   func (*MyRetryPolicy) Attempt(q gocql.RetryableQuery) bool {
if q.Attempts() > 2 {
return false
}
return true
   }
   
   func (*MyRetryPolicy) GetRetryType(error) gocql.RetryType {
return gocql.Ignore
   }
   
   type LoggingObserver struct{}
   
   func (*LoggingObserver) ObserveQuery(ctx context.Context, q 
gocql.ObservedQuery) {
fmt.Printf("observer attempt: %d err: %s\n", q.Attempt, q.Err)
   }
   
   func main() {
cluster := gocql.NewCluster("node-1")
cluster.Authenticator = gocql.PasswordAuthenticator{
Username: "scylla",
Password: "***",
}
   
s, err := gocql.NewSession(*cluster)
if err != nil {
fmt.Printf("session err: %s\n", err)
}
defer s.Close()
   
q := s.Query("INSERT INTO  my.events(event_id, time, args) VALUES 
(?,?,?)", 1, gocql.UUIDFromTime(time.Now()), "test")
   
q.RetryPolicy(&MyRetryPolicy{})
q.Observer(&LoggingObserver{})
   
fmt.Printf("QUERY IsIdempotent: %v\n", q.IsIdempotent())
   
err = q.Exec()
if err != nil {
fmt.Printf("exec err: %s\n", err)
}
   
   }
   
   ```
   
   Output
   ```
   ➜  test go run main.go
   QUERY IsIdempotent: false
   observer attempt: 0 err: Keyspace my does not exist
   exec err: Keyspace my does not exist
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



[I] Retry type Ignore does not clear error [cassandra-gocql-driver]

2024-09-02 Thread via GitHub


Rikkuru opened a new issue, #1808:
URL: https://github.com/apache/cassandra-gocql-driver/issues/1808

   ### What version of Cassandra are you using?
   Scylla Enterprise 2024.1.8
   
   ### What version of Gocql are you using?
   github.com/gocql/gocql v1.6.0
   
   ### What version of Go are you using?
   go 1.21.11
   
   ### What did you do?
   used RetryType Ignore (https://pkg.go.dev/github.com/gocql/gocql#RetryType) 
on write query 
   
   
   ### What did you expect to see?
   I expected that query.Exec() would not return error  
   
   ### What did you see instead?
   the error is returned.
   
   ---
   
   Maybe the documentation is not clear on how Ignore and Rethrow are 
different? 
   
   I thought Ignore would make it so that Exec does not return error ? Maybe 
this would be strange on read ops but it can be used on writes (in Downgrading 
CL retry policy for example ). But they both return error and I don't see any 
difference between those retry types 
   
   Is there any difference in Rethrow and Ignore retry types  ? 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19842: Consistency level check incorrectly passes when majo… [cassandra-analytics]

2024-08-30 Thread via GitHub


yifan-c merged PR #75:
URL: https://github.com/apache/cassandra-analytics/pull/75


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19815: Decouple Cassandra types from Spark types so Cassandra types can be u… [cassandra-analytics]

2024-08-30 Thread via GitHub


yifan-c commented on code in PR #71:
URL: 
https://github.com/apache/cassandra-analytics/pull/71#discussion_r1739179691


##
cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/data/converter/types/BinaryTraits.java:
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.data.converter.types;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.bridge.BigNumberConfig;
+import org.apache.cassandra.spark.data.CqlField;
+import org.apache.cassandra.spark.utils.ByteBufferUtils;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.jetbrains.annotations.NotNull;
+
+public interface BinaryTraits extends SparkType

Review Comment:
   Thank you



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19815: Decouple Cassandra types from Spark types so Cassandra types can be u… [cassandra-analytics]

2024-08-30 Thread via GitHub


jberragan commented on code in PR #71:
URL: 
https://github.com/apache/cassandra-analytics/pull/71#discussion_r1739087029


##
cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/data/converter/types/BinaryTraits.java:
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.data.converter.types;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.bridge.BigNumberConfig;
+import org.apache.cassandra.spark.data.CqlField;
+import org.apache.cassandra.spark.utils.ByteBufferUtils;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.jetbrains.annotations.NotNull;
+
+public interface BinaryTraits extends SparkType

Review Comment:
   I'll go ahead and remove it but I think trait or mixin is a common term not 
necessarily tied to any particular language



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [I] gocql RetryPolicies still don't use query idempotence [cassandra-gocql-driver]

2024-08-30 Thread via GitHub


Rikkuru commented on issue #1803:
URL: 
https://github.com/apache/cassandra-gocql-driver/issues/1803#issuecomment-2320933671

   ```
   SELECT event_id, time, args
FROM my.events
WHERE user = ? AND project_id = ?
AND year = ? AND week = ?
ORDER BY time DESC
LIMIT ?
   ```
   
   Something like that on read 
   with 
   ```
   CREATE TABLE my.events (
   user text,
   year smallint,
   week tinyint,
   project_id smallint,
   time timeuuid,
   args map,
   event_id smallint,
   ip inet,
   PRIMARY KEY ((user, year, week, project_id), time)
   ```
   
   I will try to find time on weekends to create an easier example in some 
sandbox 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



[I] Trouble with DCAware or RackAware HostSelectionPolicy [cassandra-gocql-driver]

2024-08-30 Thread via GitHub


ut0mt8 opened a new issue, #1807:
URL: https://github.com/apache/cassandra-gocql-driver/issues/1807

   ### What version of Cassandra are you using?
   Cassandra 3.11.14 | CQL spec 3.4.4 | Native protocol v4
   
   ### What version of Gocql are you using?
   v1.6.0
   
   ### What version of Go are you using?
   go version go1.22.2
   
   ### What did you do?
   I got a C$ worlwide cluster divide in 3 regions (DC) and multiple AZ (Rack).
   I want to connect and read the most locally possible (rack)
   So I'm testing ; just connecting and few select to test where read and 
connection are distributed
   
   ### What did you expect to see?
   I'm expecting that query only goes to either my local DC or local Rack.
   
   ### What did you see instead?
   instead they are randomly distributed among the worldwide nodes randomly
   Note that with Hostfilter it's work (but it's not convenient at all). It's 
like the driver didn't understand my cluster topology; see clues in debug (2 
below nodes IP means global if read correctly the code)
   
   ---
   
   ### Describe your Cassandra cluster
   please provide the following information
   
   - output of `nodetool status`
   
   `
   Datacenter: ap-southeast-1
   ==
   Status=Up/Down
   |/ State=Normal/Leaving/Joining/Moving
   --  AddressLoad   Tokens   Owns (effective)  Host ID 
  Rack
   UN  10.80.197.48   84.27 GiB  256  17.1% 
4d1a1159-fc5d-431c-a420-8bff72ef2484  ap-southeast-1b
   UN  10.80.111.193  79.07 GiB  256  16.6% 
0a69d219-f42f-4de9-9c8d-7efc06355e59  ap-southeast-1a
   UN  10.80.197.197  85.49 GiB  256  17.7% 
bb87ff84-4b68-46d9-87a3-e1e8d0d6fe07  ap-southeast-1b
   UN  10.80.197.198  78.92 GiB  256  17.6% 
db91859b-4759-4d69-ab99-65ffe97a9deb  ap-southeast-1b
   UN  10.80.197.103  79.37 GiB  256  16.5% 
ab22efbd-585a-4c4e-adb9-b0ce4a56e8f0  ap-southeast-1b
   UN  10.80.197.104  66.52 GiB  256  14.6% 
552d1660-fec5-4205-bfa6-992a30ec702c  ap-southeast-1b
   UN  10.80.197.121  77.52 GiB  256  16.6% 
d326d322-ca5e-4d6d-b052-2971c6ea57cf  ap-southeast-1b
   UN  10.80.111.234  77.78 GiB  256  16.1% 
e0974eb1-7b9b-487c-bcbe-8e16f50d6508  ap-southeast-1a
   UN  10.80.111.235  75.12 GiB  256  16.7% 
29d1bcc3-4a60-477d-8980-d6b7083d2541  ap-southeast-1a
   UN  10.80.111.189  84.49 GiB  256  17.5% 
6d7ea647-5a14-4462-9118-ce20ce6b4602  ap-southeast-1a
   UN  10.80.111.126  84.62 GiB  256  17.1% 
544ce9c7-71ea-47c7-80c7-32dc06ed6b5e  ap-southeast-1a
   UN  10.80.111.190  74.33 GiB  256  16.1% 
650975be-09cf-4db5-9a45-3f99a03bdfea  ap-southeast-1a
   Datacenter: eu-west-1
   =
   Status=Up/Down
   |/ State=Normal/Leaving/Joining/Moving
   --  AddressLoad   Tokens   Owns (effective)  Host ID 
  Rack
   UN  10.0.111.3299.37 GiB  256  33.6% 
7f54a4b0-b30a-4043-854a-7efec3604463  eu-west-1a
   UN  10.0.111.145   96.86 GiB  256  32.3% 
1222059b-bf71-4df9-ab78-2d9fd420f29b  eu-west-1a
   UN  10.0.111.17105.54 GiB  256  35.0% 
42d01053-1bbe-4fef-aed8-8f3bc201fab5  eu-west-1a
   UN  10.0.111.6586.03 GiB  256  32.0% 
f4a7b146-80a1-408e-a31a-893e3465a271  eu-west-1a
   UN  10.0.111.146   91.01 GiB  256  32.9% 
485334d7-11c6-4716-bbe1-4f09db7c41eb  eu-west-1a
   UN  10.0.111.147   98.04 GiB  256  33.9% 
62377c8a-61bd-4bb2-b434-c607a5eba1d2  eu-west-1a
   UN  10.0.197.246   95.4 GiB   256  32.4% 
31905c62-dd32-4acc-b247-90899f8069ca  eu-west-1b
   UN  10.0.197.8898.83 GiB  256  34.6% 
8b42330a-e209-4e43-9174-e7b8c23eaa80  eu-west-1b
   UN  10.0.197.122   106.38 GiB  256  34.8% 
1336edb6-5dec-4826-b263-4263b7ff24d3  eu-west-1b
   UN  10.0.197.7494.17 GiB  256  32.4% 
fe63b3cb-f129-4bd8-b879-63b74730d324  eu-west-1b
   UN  10.0.197.123   92.88 GiB  256  33.7% 
440d4fa5-2f45-472d-8721-fbda44401804  eu-west-1b
   UN  10.0.197.124   89.06 GiB  256  32.4% 
45ba9039-14e4-4082-9243-e5b7465be36f  eu-west-1b
   Datacenter: us-east-1
   =
   Status=Up/Down
   |/ State=Normal/Leaving/Joining/Moving
   --  AddressLoad   Tokens   Owns (effective)  Host ID 
  Rack
   UN  10.66.201.210  102.07 GiB  256  24.8% 
7f369bcf-12c1-4d3a-aae2-53fe06bbff06  us-east-1c
   UN  10.66.200.192  104.88 GiB  256  25.3% 
dbd379f8-545a-412c-82e1-c218f19906c3  us-east-1d
   UN  10.66.111.5102.18 GiB  256  24.4% 
0148c92a-482a-4b5d-98df-d1ac88b40ffe  us-eas

Re: [I] gocql RetryPolicies still don't use query idempotence [cassandra-gocql-driver]

2024-08-30 Thread via GitHub


OleksiienkoMykyta commented on issue #1803:
URL: 
https://github.com/apache/cassandra-gocql-driver/issues/1803#issuecomment-2320418177

   Please share an exec query you are using to reproduce the issue, if you 
tried multiple queries provide them as well. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19842: Consistency level check incorrectly passes when majo… [cassandra-analytics]

2024-08-29 Thread via GitHub


yifan-c commented on code in PR #75:
URL: 
https://github.com/apache/cassandra-analytics/pull/75#discussion_r1737411842


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java:
##
@@ -227,10 +230,18 @@ protected String getCurrentKeyspaceSchema() throws 
Exception
 return schemaResponse.schema();
 }
 
-private TokenRangeReplicasResponse getTokenRangesAndReplicaSets() throws 
ExecutionException, InterruptedException
+private TokenRangeReplicasResponse getTokenRangesAndReplicaSets()
 {
 CassandraContext context = getCassandraContext();
-return context.getSidecarClient().tokenRangeReplicas(new 
ArrayList<>(context.getCluster()), conf.keyspace).get();
+try
+{
+return context.getSidecarClient().tokenRangeReplicas(new 
ArrayList<>(context.getCluster()), conf.keyspace).get();
+}
+catch (ExecutionException | InterruptedException exception)
+{
+LOGGER.error("Failed to get token ranges", exception);
+throw new SidecarApiCallException("Failed to get token ranges", 
exception);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19815: Decouple Cassandra types from Spark types so Cassandra types can be u… [cassandra-analytics]

2024-08-29 Thread via GitHub


yifan-c commented on code in PR #71:
URL: 
https://github.com/apache/cassandra-analytics/pull/71#discussion_r1737158816


##
cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/data/converter/types/BinaryTraits.java:
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.data.converter.types;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.bridge.BigNumberConfig;
+import org.apache.cassandra.spark.data.CqlField;
+import org.apache.cassandra.spark.utils.ByteBufferUtils;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.jetbrains.annotations.NotNull;
+
+public interface BinaryTraits extends SparkType

Review Comment:
   I was confused by the name. At the first sight, I thought it was a `scala` 
source code, since `trait` is the scala's interface. 
   Can you please not use the word `trait` for java interface? This patch add 
many traits. 



##
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/reader/DataTypeSerializationTests.java:
##
@@ -152,151 +153,153 @@ public void testUUID()
 public void testLong()
 {
 qt().forAll(TestUtils.bridges()).checkAssert(bridge -> {
-
assertTrue(bridge.bigint().deserialize(bridge.bigint().serialize(Long.MAX_VALUE))
 instanceof Long);
-assertEquals(Long.MAX_VALUE, bridge.bigint().deserialize(
-ByteBuffer.allocate(8).putLong(0, Long.MAX_VALUE)));
+
assertTrue(bridge.bigint().deserializeToType(bridge.typeConverter(), 
bridge.bigint().serialize(Long.MAX_VALUE)) instanceof Long);
+assertEquals(Long.MAX_VALUE, 
bridge.bigint().deserializeToType(bridge.typeConverter(),
+   
ByteBuffer.allocate(8).putLong(0, Long.MAX_VALUE)));
 qt().forAll(integers().all())
-.checkAssert(integer -> assertEquals((long) integer, 
bridge.bigint().deserialize(
-bridge.bigint().serialize((long) integer;
-assertEquals(Long.MAX_VALUE, 
bridge.bigint().deserialize(bridge.bigint().serialize(Long.MAX_VALUE)));
-assertEquals(Long.MIN_VALUE, 
bridge.bigint().deserialize(bridge.bigint().serialize(Long.MIN_VALUE)));
+.checkAssert(integer -> assertEquals((long) integer, 
bridge.bigint().deserializeToType(bridge.typeConverter(),
+   
bridge.bigint().serialize((long) integer;
+assertEquals(Long.MAX_VALUE, 
bridge.bigint().deserializeToJavaType(bridge.bigint().serialize(Long.MAX_VALUE)));
+assertEquals(Long.MIN_VALUE, 
bridge.bigint().deserializeToJavaType(bridge.bigint().serialize(Long.MIN_VALUE)));
 qt().withExamples(MAX_TESTS)
 .forAll(longs().all())
-.checkAssert(aLong -> assertEquals(aLong, 
bridge.bigint().deserialize(
-bridge.bigint().serialize(aLong;
+.checkAssert(aLong -> assertEquals(aLong, 
bridge.bigint().deserializeToType(bridge.typeConverter(),
+   
 bridge.bigint().serialize(aLong;
 });
 }
 
 @Test
 public void testDecimal()
 {
 qt().forAll(TestUtils.bridges()).checkAssert(bridge -> {
-assertTrue(bridge.decimal().deserialize(
-bridge.decimal().serialize(BigDecimal.valueOf(500L))) 
instanceof Decimal);
-assertEquals(Decimal.apply(500), bridge.decimal().deserialize(
-bridge.decimal().serialize(BigDecimal.valueOf(500L;
-assertNotSame(Decimal.apply(501), bridge.decimal().deserialize(
-bridge.decimal().serialize(BigDecimal.valueOf(500L;
-assertEquals(Decimal.apply(-1), bridge.decimal().deserialize(
-bridge.decimal().serialize(BigDecimal.val

Re: [PR] CASSANDRA-19842: Consistency level check incorrectly passes when majo… [cassandra-analytics]

2024-08-29 Thread via GitHub


yifan-c commented on code in PR #75:
URL: 
https://github.com/apache/cassandra-analytics/pull/75#discussion_r1737326950


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java:
##
@@ -381,67 +392,68 @@ private InstanceAvailability 
determineInstanceAvailability(RingInstance instance
 
 private TokenRangeMapping getTokenRangeReplicas()
 {
-Map> writeReplicasByDC;
-Map> pendingReplicasByDC;
-Map replicaMetadata;
-Set blockedInstances;
+return getTokenRangeReplicas(this::getTokenRangesAndReplicaSets,
+ this::getPartitioner,
+ this::getReplicationFactor,
+ this::instanceIsBlocked);
+}
+
+@VisibleForTesting
+static TokenRangeMapping 
getTokenRangeReplicas(Supplier topologySupplier,
+ 
Supplier partitionerSupplier,
+ 
Supplier replicationFactorSupplier,
+ Predicate blockedInstancePredicate)
+{
+long start = System.nanoTime();
+TokenRangeReplicasResponse response = topologySupplier.get();
+long elapsedTimeNanos = System.nanoTime() - start;
+Multimap> tokenRangesByInstance = 
getTokenRangesByInstance(response.writeReplicas(),
+   
response.replicaMetadata());
+LOGGER.info("Retrieved token ranges for {} instances from write 
replica set in {} milliseconds",
+tokenRangesByInstance.size(),
+TimeUnit.NANOSECONDS.toMillis(elapsedTimeNanos));
+
 Set replacementInstances;
-Multimap> tokenRangesByInstance;
-try
-{
-long start = System.nanoTime();
-TokenRangeReplicasResponse response = 
getTokenRangesAndReplicaSets();
-long elapsedTimeNanos = System.nanoTime() - start;
-replicaMetadata = response.replicaMetadata();
-
-tokenRangesByInstance = 
getTokenRangesByInstance(response.writeReplicas(), response.replicaMetadata());
-LOGGER.info("Retrieved token ranges for {} instances from write 
replica set in {} nanoseconds",
-tokenRangesByInstance.size(),
-elapsedTimeNanos);
-
-replacementInstances = response.replicaMetadata()
-   .values()
-   .stream()
-   .filter(m -> 
m.state().equalsIgnoreCase(InstanceState.REPLACING.name()))
-   .map(RingInstance::new)
-   .collect(Collectors.toSet());
-
-blockedInstances = response.replicaMetadata()
+replacementInstances = response.replicaMetadata()
.values()
.stream()
+   .filter(m -> 
m.state().equalsIgnoreCase(InstanceState.REPLACING.name()))
.map(RingInstance::new)
-   .filter(this::instanceIsBlocked)
.collect(Collectors.toSet());
 
-Set blockedIps = blockedInstances.stream().map(i -> 
i.ringInstance().address())
- 
.collect(Collectors.toSet());
+Set blockedInstances;
+blockedInstances = response.replicaMetadata()

Review Comment:
   done. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19842: Consistency level check incorrectly passes when majo… [cassandra-analytics]

2024-08-29 Thread via GitHub


yifan-c commented on code in PR #75:
URL: 
https://github.com/apache/cassandra-analytics/pull/75#discussion_r1737305375


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java:
##
@@ -478,21 +490,21 @@ private Map> 
getPendingReplicas(TokenRangeReplicasResponse r
 // Filter writeReplica entries and the value replicaSet to only 
include those with pending replicas
 return writeReplicasByDC.entrySet()
 .stream()
-.filter(e -> e.getValue().stream()
+.filter(e -> e.getValue().stream() // todo: 
transformToHostWithoutPort is called twice for entries
   .anyMatch(v -> 
pendingReplicas.contains(transformToHostWithoutPort(v
 .collect(Collectors.toMap(Map.Entry::getKey,
   e -> 
e.getValue().stream()
 .filter(v -> 
pendingReplicas.contains(transformToHostWithoutPort(v)))
 
.collect(Collectors.toSet(;
 }
 
-private String transformToHostWithoutPort(String v)
+private static String transformToHostWithoutPort(String v)

Review Comment:
   talked offline. decide to not touch it as it is unrelated with the patch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19842: Consistency level check incorrectly passes when majo… [cassandra-analytics]

2024-08-29 Thread via GitHub


JeetKunDoug commented on code in PR #75:
URL: 
https://github.com/apache/cassandra-analytics/pull/75#discussion_r1737113324


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java:
##
@@ -164,7 +164,13 @@ public BulkSparkConf(SparkConf conf, Map 
options)
 this.skipExtendedVerify = MapUtils.getBoolean(options, 
WriterOptions.SKIP_EXTENDED_VERIFY.name(), true,
   "skip extended 
verification of SSTables by Cassandra");
 this.consistencyLevel = 
ConsistencyLevel.CL.valueOf(MapUtils.getOrDefault(options, 
WriterOptions.BULK_WRITER_CL.name(), "EACH_QUORUM"));
-this.localDC = MapUtils.getOrDefault(options, 
WriterOptions.LOCAL_DC.name(), null);
+String dc = MapUtils.getOrDefault(options, 
WriterOptions.LOCAL_DC.name(), null);

Review Comment:
   👍 



##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java:
##
@@ -381,67 +392,68 @@ private InstanceAvailability 
determineInstanceAvailability(RingInstance instance
 
 private TokenRangeMapping getTokenRangeReplicas()
 {
-Map> writeReplicasByDC;
-Map> pendingReplicasByDC;
-Map replicaMetadata;
-Set blockedInstances;
+return getTokenRangeReplicas(this::getTokenRangesAndReplicaSets,
+ this::getPartitioner,
+ this::getReplicationFactor,
+ this::instanceIsBlocked);
+}
+
+@VisibleForTesting
+static TokenRangeMapping 
getTokenRangeReplicas(Supplier topologySupplier,
+ 
Supplier partitionerSupplier,
+ 
Supplier replicationFactorSupplier,
+ Predicate blockedInstancePredicate)
+{
+long start = System.nanoTime();
+TokenRangeReplicasResponse response = topologySupplier.get();
+long elapsedTimeNanos = System.nanoTime() - start;
+Multimap> tokenRangesByInstance = 
getTokenRangesByInstance(response.writeReplicas(),
+   
response.replicaMetadata());
+LOGGER.info("Retrieved token ranges for {} instances from write 
replica set in {} milliseconds",
+tokenRangesByInstance.size(),
+TimeUnit.NANOSECONDS.toMillis(elapsedTimeNanos));
+
 Set replacementInstances;
-Multimap> tokenRangesByInstance;
-try
-{
-long start = System.nanoTime();
-TokenRangeReplicasResponse response = 
getTokenRangesAndReplicaSets();
-long elapsedTimeNanos = System.nanoTime() - start;
-replicaMetadata = response.replicaMetadata();
-
-tokenRangesByInstance = 
getTokenRangesByInstance(response.writeReplicas(), response.replicaMetadata());
-LOGGER.info("Retrieved token ranges for {} instances from write 
replica set in {} nanoseconds",
-tokenRangesByInstance.size(),
-elapsedTimeNanos);
-
-replacementInstances = response.replicaMetadata()
-   .values()
-   .stream()
-   .filter(m -> 
m.state().equalsIgnoreCase(InstanceState.REPLACING.name()))
-   .map(RingInstance::new)
-   .collect(Collectors.toSet());
-
-blockedInstances = response.replicaMetadata()
+replacementInstances = response.replicaMetadata()
.values()
.stream()
+   .filter(m -> 
m.state().equalsIgnoreCase(InstanceState.REPLACING.name()))
.map(RingInstance::new)
-   .filter(this::instanceIsBlocked)
.collect(Collectors.toSet());
 
-Set blockedIps = blockedInstances.stream().map(i -> 
i.ringInstance().address())
- 
.collect(Collectors.toSet());
+Set blockedInstances;
+blockedInstances = response.replicaMetadata()

Review Comment:
   NIT: Since we're iterating over the entire `replicaMetadata()` collection 
and converting them all to `RingInstance` instances here, maybe move this up 
above the `replacementInstances` assignment and hold on to the complete list of 
instances? That way we only create them once, and can get rid of the 
`.values().stream()` duplication, and can filter on the instance state in the 
RingInstance rather than doing t

Re: [PR] CASSANDRA-19842: Consistency level check incorrectly passes when majo… [cassandra-analytics]

2024-08-29 Thread via GitHub


frankgh commented on code in PR #75:
URL: 
https://github.com/apache/cassandra-analytics/pull/75#discussion_r1737165237


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java:
##
@@ -478,21 +490,21 @@ private Map> 
getPendingReplicas(TokenRangeReplicasResponse r
 // Filter writeReplica entries and the value replicaSet to only 
include those with pending replicas
 return writeReplicasByDC.entrySet()
 .stream()
-.filter(e -> e.getValue().stream()
+.filter(e -> e.getValue().stream() // todo: 
transformToHostWithoutPort is called twice for entries
   .anyMatch(v -> 
pendingReplicas.contains(transformToHostWithoutPort(v
 .collect(Collectors.toMap(Map.Entry::getKey,
   e -> 
e.getValue().stream()
 .filter(v -> 
pendingReplicas.contains(transformToHostWithoutPort(v)))
 
.collect(Collectors.toSet(;
 }
 
-private String transformToHostWithoutPort(String v)
+private static String transformToHostWithoutPort(String v)

Review Comment:
   not ipv6 friendly?



##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java:
##
@@ -227,10 +230,18 @@ protected String getCurrentKeyspaceSchema() throws 
Exception
 return schemaResponse.schema();
 }
 
-private TokenRangeReplicasResponse getTokenRangesAndReplicaSets() throws 
ExecutionException, InterruptedException
+private TokenRangeReplicasResponse getTokenRangesAndReplicaSets()
 {
 CassandraContext context = getCassandraContext();
-return context.getSidecarClient().tokenRangeReplicas(new 
ArrayList<>(context.getCluster()), conf.keyspace).get();
+try
+{
+return context.getSidecarClient().tokenRangeReplicas(new 
ArrayList<>(context.getCluster()), conf.keyspace).get();
+}
+catch (ExecutionException | InterruptedException exception)
+{
+LOGGER.error("Failed to get token ranges", exception);
+throw new SidecarApiCallException("Failed to get token ranges", 
exception);

Review Comment:
   can we add the keyspace information here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19815: Decouple Cassandra types from Spark types so Cassandra types can be u… [cassandra-analytics]

2024-08-29 Thread via GitHub


yifan-c commented on code in PR #71:
URL: 
https://github.com/apache/cassandra-analytics/pull/71#discussion_r1737138827


##
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CqlField.java:
##
@@ -473,6 +462,11 @@ public void write(Kryo kryo, Output output, CqlField field)
 
 public static UnsupportedOperationException notImplemented(CqlType type)
 {
-return new UnsupportedOperationException(type.toString() + " type not 
implemented");
+return notImplemented(type.toString());
+}
+
+public static UnsupportedOperationException notImplemented(String type)
+{
+return new UnsupportedOperationException(type + " type not 
implemented");
 }

Review Comment:
   Not a blocker. Just want to avoid any confusion. As long as they are used in 
the context of "type" only, it is good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19815: Decouple Cassandra types from Spark types so Cassandra types can be u… [cassandra-analytics]

2024-08-29 Thread via GitHub


jberragan commented on PR #71:
URL: 
https://github.com/apache/cassandra-analytics/pull/71#issuecomment-2318616264

   Latest CircleCI: 
https://app.circleci.com/pipelines/github/jberragan/cassandra-analytics/73/workflows/3f5e406a-98cd-44e0-9b2a-7001975742e6


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [I] gocql RetryPolicies still don't use query idempotence [cassandra-gocql-driver]

2024-08-29 Thread via GitHub


Rikkuru commented on issue #1803:
URL: 
https://github.com/apache/cassandra-gocql-driver/issues/1803#issuecomment-2317467101

   I tested  retries with 
gocql.DowngradingConsistencyRetryPolicy{ConsistencyLevelsToTry: 
[]gocql.Consistency{ gocql.One}} and CL=Quorum with 2 nodes one of which is 
down. First attempt gets an error (read/write get the same error)
   This is logged in observer on first attempt
   ```
   gogateway[12476]: 12:14:38.938400 error: scylla [4055867439]: pool [3]: host 
[runner1***]: attempt [0]: read query failed: Cannot achieve consistency level 
for cl QUORUM. Requires 2, alive 1
   ```
   and on retry observer logs attempt with lower CL (this one is successful)
   ```
   gogateway[12476]: 12:14:38.944768 warn: scylla [4055867439]: pool [3]: host 
[runner1***]: attempt [1]: read CL downgrade QUORUM -> ONE
   ```
   
   Our Query does not set Idempotent so it shouldn't have been retried 
   ```
   func (r *Reader) initQuery(req ReadRequestInterface) *gocql.Query {
q := r.session.Query(getQueryString(req))
q.Observer(r)
q.Prefetch(*r.cfg.ReadPrefetchFactor)
q.PageSize(*r.cfg.PageSize)
q.RetryPolicy(r.retryPolicy)
   
q.SetConsistency(r.consistencyLevel)
return q
   }
   ```
   We also don't change  DefaultIdempotence in cluster config 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [I] gocql RetryPolicies still don't use query idempotence [cassandra-gocql-driver]

2024-08-29 Thread via GitHub


Rikkuru commented on issue #1803:
URL: 
https://github.com/apache/cassandra-gocql-driver/issues/1803#issuecomment-2317407935

   > `Query failed: Operation timed out for system_schema.keyspaces - received 
only 1 responses from 3 CL=THREE.`, tested with `cluster.RetryPolicy = 
&gocql.DowngradingConsistencyRetryPolicy{ConsistencyLevelsToTry: 
[]gocql.Consistency{gocql.Two, gocql.One}}` and `cluster.Consistency = 
gocql.Three`.
   
   Was this a write or read query ? Just checking because write timeout is not 
retried in DowngradingConsistencyRetryPolicy if some nodes responded.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [I] gocql RetryPolicies still don't use query idempotence [cassandra-gocql-driver]

2024-08-29 Thread via GitHub


OleksiienkoMykyta commented on issue #1803:
URL: 
https://github.com/apache/cassandra-gocql-driver/issues/1803#issuecomment-2317300586

   I have tested it and found that Queries is not idempotent by default 
`IsIdempotent = false` unless you set it explicitly. Also, I didn't found any 
retries, just getting a single error when the consistency level doesn't match 
`Query failed: Operation timed out for system_schema.keyspaces - received only 
1 responses from 3 CL=THREE.`, tested with   `cluster.RetryPolicy = 
&gocql.DowngradingConsistencyRetryPolicy{ConsistencyLevelsToTry: 
[]gocql.Consistency{gocql.Two, gocql.One}}` and `cluster.Consistency = 
gocql.Three`.
   It looks like everything works according to spec, but maybe I missed 
something.
   Please, provide more details how you are getting retries, code example, or 
smth, also would be nice if you would share the results of the logs.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19842: Consistency level check incorrectly passes when majo… [cassandra-analytics]

2024-08-28 Thread via GitHub


yifan-c commented on code in PR #75:
URL: 
https://github.com/apache/cassandra-analytics/pull/75#discussion_r1735348059


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ConsistencyLevel.java:
##
@@ -56,35 +56,38 @@ default void 
ensureNetworkTopologyStrategy(ReplicationFactor replicationFactor,
 cl.name() + " only make sense for 
NetworkTopologyStrategy keyspaces");
 }
 
+default int quorum(Set group)
+{
+return group.size() / 2 + 1;
+}
+
 /**
- * Checks if the consistency guarantees are maintained, given the failed, 
blocked and replacing instances, consistency-level and the replication-factor.
- * 
- * - QUORUM based consistency levels check for quorum using the 
write-replica-set (instead of RF) as they include healthy and pending nodes.
- *   This is done to ensure that writes go to a quorum of healthy nodes 
while accounting for potential failure in pending nodes becoming healthy.
- * - ONE and TWO consistency guarantees are maintained by ensuring that 
the failures leave us with at-least the corresponding healthy
- *   (and non-pending) nodes.
- *
- *   For both the above cases, blocked instances are also considered as 
failures while performing consistency checks.
- *   Write replicas are adjusted to exclude replacement nodes for 
consistency checks, if we have replacement nodes that are not among the failed 
instances.
- *   This is to ensure that we are writing to sufficient non-replacement 
nodes as replacements can potentially fail leaving us with fewer nodes.
- * 
+ * Check if the consistency guarantee is maintained for write
+ * 
+ * The check takes the writeReplicas, pendingReplicas and failedReplicas 
of the token range into consideration.
+ * The input parameters are expected to be prefiltered by datacenter,
+ * making the implementations of QUORUM, LOCAL_QUORUM and EACH_QUORUM 
identical.
+ * When pendingReplicas is non-empty, minimum number of success is 
increased by the size of pendingReplicas,
+ * keeping the same semantics defined in Cassandra.
+ * See https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/db/ConsistencyLevel.java#L172";>blockForWrite
+ * 
+ * For example, say RF == 3, and there is 2 pending replicas.
+ * 
+ * QUORUM write consistency requires at least 4 replicas to 
succeed, i.e. quorum(3) + 2, tolerating 1 failure
+ * ONE write consistency requires at least 3 replicas to succeed, 
i.e. 1 + 2, tolerating 2 failure
+ * TWO write consistency requires at least 4 replicas to succeed, 
i.e. 2 + 2, tolerating 1 failure
+ * 
  *
- * @param writeReplicasthe set of replicas for write operations
- * @param pendingReplicas  the set of replicas pending status
- * @param replacementInstances the set of instances that are replacing the 
other instances
- * @param blockedInstances the set of instances that have been blocked 
for the bulk operation
- * @param failedInstanceIpsthe set of instances where there were 
failures
- * @param localDC  the local datacenter used for consistency 
level, or {@code null} if not provided
- * @return {@code true} if the consistency has been met, {@code false} 
otherwise
+ * @param writeReplicasthe set of write replicas. The set includes 
the pending replicas
+ * @param pendingReplicas  the set of pending replicas, which 
increases the minimum number of success required
+ * @param failedReplicas   the set of replicas fail to write to
+ * @return true if the write consistency has been met, false otherwise
  */
-boolean checkConsistency(Set writeReplicas,
- Set pendingReplicas,
- Set replacementInstances,
- Set blockedInstances,
- Set failedInstanceIps,
- String localDC); // todo: simplify the parameter 
list. not all are required in impl
+boolean checkWriteConsistency(Set writeReplicas,
+  Set pendingReplicas,
+  Set failedReplicas);

Review Comment:
   largely refactored the classes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19842: Consistency level check incorrectly passes when majo… [cassandra-analytics]

2024-08-28 Thread via GitHub


yifan-c commented on code in PR #75:
URL: 
https://github.com/apache/cassandra-analytics/pull/75#discussion_r1735137494


##
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java:
##
@@ -82,34 +86,41 @@ public static Collection data()
 return 
clsToFailures.stream().map(List::toArray).collect(Collectors.toList());
 }
 
-private void setup(ConsistencyLevel.CL consistencyLevel, List 
failuresPerDc)
+private void setup(ConsistencyLevel.CL consistencyLevel)
 {
 digestAlgorithm = new XXHash32DigestAlgorithm();
 tableWriter = new MockTableWriter(folder);
 writerContext = new MockBulkWriterContext(TOKEN_RANGE_MAPPING, 
"cassandra-4.0.0", consistencyLevel);
 transportContext = (TransportContext.DirectDataBulkWriterContext) 
writerContext.transportContext();
 }
 
+@Test
+public void test() throws IOException, ExecutionException, 
InterruptedException

Review Comment:
   It was there to debug a specific input set. Removed it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19842: Consistency level check incorrectly passes when majo… [cassandra-analytics]

2024-08-28 Thread via GitHub


yifan-c commented on code in PR #75:
URL: 
https://github.com/apache/cassandra-analytics/pull/75#discussion_r1735122562


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ConsistencyLevel.java:
##
@@ -56,35 +56,38 @@ default void 
ensureNetworkTopologyStrategy(ReplicationFactor replicationFactor,
 cl.name() + " only make sense for 
NetworkTopologyStrategy keyspaces");
 }
 
+default int quorum(Set group)

Review Comment:
   yeah. it is refactored and forgot to remove. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19815: Decouple Cassandra types from Spark types so Cassandra types can be u… [cassandra-analytics]

2024-08-28 Thread via GitHub


jberragan commented on code in PR #71:
URL: 
https://github.com/apache/cassandra-analytics/pull/71#discussion_r1734986657


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java:
##
@@ -325,7 +329,10 @@ private void maybeRebuildClusteringKeys(@NotNull 
ByteBuffer columnNameBuf)
 {
 Object newObj = deserialize(field, 
ByteBufferUtils.extractComponent(columnNameBuf, index++));
 Object oldObj = values[field.position()];
-if (newRow || oldObj == null || newObj == null || 
!field.equals(newObj, oldObj))
+// Historically, we compare equality of clustering keys using the 
Spark types
+// to determine if we have moved to a new 'row'. We could also 
compare using the Cassandra types
+// or the raw ByteBuffers before converting to Spark types  - this 
mightb be slightly more performant.
+if (newRow || oldObj == null || newObj == null || 
!sparkSqlTypeConverter.toSparkType(field.type()).equals(newObj, oldObj))

Review Comment:
   TODO: convert CqlType to SparkType once in the constructor and re-use.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] CASSANDRA-19842: Consistency level check incorrectly passes when majo… [cassandra-analytics]

2024-08-28 Thread via GitHub


JeetKunDoug commented on code in PR #75:
URL: 
https://github.com/apache/cassandra-analytics/pull/75#discussion_r1731504484


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java:
##
@@ -381,67 +383,79 @@ private InstanceAvailability 
determineInstanceAvailability(RingInstance instance
 
 private TokenRangeMapping getTokenRangeReplicas()
 {
-Map> writeReplicasByDC;
-Map> pendingReplicasByDC;
-Map replicaMetadata;
-Set blockedInstances;
+Supplier topologySupplier = () -> {
+try
+{
+return getTokenRangesAndReplicaSets();
+}
+catch (ExecutionException | InterruptedException exception)
+{
+LOGGER.error("Failed to get token ranges, ", exception);
+throw new RuntimeException(exception);

Review Comment:
   I'd recommend moving the try/catch into `getTokenRangesAndReplicaSets` and 
throwing a new exception extending `AnalyticsException` here - that way, in the 
call itself, you can just pass `this::getTokenRangesAndReplicaSets` as the 
first parameter, which makes this feel a bit more consistent with the rest of 
the code.



##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidator.java:
##
@@ -117,6 +109,21 @@ public static void updateFailureHandler(CommitResult 
commitResult,
 });
 }
 
+private static void logFailedRanges(Logger logger, String phase,
+
List.ConsistencyFailurePerRange> 
failedRanges)
+{
+for 
(ReplicaAwareFailureHandler.ConsistencyFailurePerRange 
failedRange : failedRanges)
+{
+for (RingInstance instance : 
failedRange.failuresPerInstance.instances())
+{
+logger.error("Failed in phase {} for {} on {}",

Review Comment:
   I know this was just a refactoring of code, but logging the **reason** for 
failure here would be super-useful.



##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ConsistencyLevel.java:
##
@@ -56,35 +56,38 @@ default void 
ensureNetworkTopologyStrategy(ReplicationFactor replicationFactor,
 cl.name() + " only make sense for 
NetworkTopologyStrategy keyspaces");
 }
 
+default int quorum(Set group)

Review Comment:
   This is unused, as the static `quorum` method below is the only usage of 
`quorum` in the codebase.



##
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java:
##
@@ -82,34 +86,41 @@ public static Collection data()
 return 
clsToFailures.stream().map(List::toArray).collect(Collectors.toList());
 }
 
-private void setup(ConsistencyLevel.CL consistencyLevel, List 
failuresPerDc)
+private void setup(ConsistencyLevel.CL consistencyLevel)
 {
 digestAlgorithm = new XXHash32DigestAlgorithm();
 tableWriter = new MockTableWriter(folder);
 writerContext = new MockBulkWriterContext(TOKEN_RANGE_MAPPING, 
"cassandra-4.0.0", consistencyLevel);
 transportContext = (TransportContext.DirectDataBulkWriterContext) 
writerContext.transportContext();
 }
 
+@Test
+public void test() throws IOException, ExecutionException, 
InterruptedException

Review Comment:
   Why does this test exist separate from the parameterized one? Can you please 
add both some documentation and a more descriptive method name than `test` here 
so if it fails it's understandable what it's testing and why it is different 
than the other tests?



##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ConsistencyLevel.java:
##
@@ -56,35 +56,38 @@ default void 
ensureNetworkTopologyStrategy(ReplicationFactor replicationFactor,
 cl.name() + " only make sense for 
NetworkTopologyStrategy keyspaces");
 }
 
+default int quorum(Set group)
+{
+return group.size() / 2 + 1;
+}
+
 /**
- * Checks if the consistency guarantees are maintained, given the failed, 
blocked and replacing instances, consistency-level and the replication-factor.
- * 
- * - QUORUM based consistency levels check for quorum using the 
write-replica-set (instead of RF) as they include healthy and pending nodes.
- *   This is done to ensure that writes go to a quorum of healthy nodes 
while accounting for potential failure in pending nodes becoming healthy.
- * - ONE and TWO consistency guarantees are maintained by ensuring that 
the failures leave us with at-least the corresponding healthy
- *   (and non-pending) nodes.
- *
- *   For both the above cases, blocked instances are also considered as 
failures while performing consistency checks.
- *   Write replicas are adjusted to exc

Re: [I] gocql RetryPolicies still don't use query idempotence [cassandra-gocql-driver]

2024-08-28 Thread via GitHub


Rikkuru commented on issue #1803:
URL: 
https://github.com/apache/cassandra-gocql-driver/issues/1803#issuecomment-2315267132

   > Do you expect the retry of non-idempotent queries or not? I
   
   I expect that https://pkg.go.dev/github.com/gocql/gocql#Query with 
[IsIdempotent](https://github.com/gocql/gocql/blob/v1.6.0/session.go#L1221) 
[¶](https://pkg.go.dev/github.com/gocql/gocql#Query.IsIdempotent) returning 
false will not be retried. 
   You are right that it is probably better to check it outside of retry 
policy. But I am sure it is not checked in gocql v1.6.0 
   
   > The ticket you mentioned was implementing speculative retry policy, which 
is something different.
   
   From this issue https://github.com/apache/cassandra-gocql-driver/issues/1153 
 I understood that https://github.com/apache/cassandra-gocql-driver/issues/1083 
contains work on speculative execution and 
https://github.com/apache/cassandra-gocql-driver/issues/1154 was supposed 
support  non-idempotent  queries in retries ? Otherwise issue  
https://github.com/apache/cassandra-gocql-driver/issues/1153   should not have 
been closed.
   
   > For write operations it depends on idempotency
   
   I am not sure we should think too much about read/write ops here , gocql 
Query is the same for them and does not provide convenient way to check if 
query writes or reads data. I think gocql should just check method IsIdempotent 
and trust user to fill this field correctly  ?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



  1   2   3   4   5   6   7   8   9   10   >