Re: [I] Feature Request: Support Vector Type [cassandra-gocql-driver]
tengu-alt commented on issue #1734: URL: https://github.com/apache/cassandra-gocql-driver/issues/1734#issuecomment-2401579178 > > > @tengu-alt, did make progress on vector support? I cannot find relevant PR opened. Do you mind if I submit my nearly completed PoC? Based on your comment from 22 May, I think you are encoding length of vector element incorrectly. > > > > > > I am currently implementing vector type support. I think the error that I mentioned is caused by the cqlsh incorrect work. > > It looks like @lukasz-antoniak already has a functioning prototype, it might be more efficient to just have him open a PR with his work depending on how much progress you have on your work Sounds great! I would be glad to see the @lukasz-antoniak PR. Currently I implemented a vector type unmarshal nearly the all datatypes that are supported by the driver (the vector data has a different serialization). The Marshal remains, and the test coverage also. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19980: Remove SparkSQL dependency from CassandraBridge so t… [cassandra-analytics]
yifan-c commented on code in PR #88: URL: https://github.com/apache/cassandra-analytics/pull/88#discussion_r1792282366 ## cassandra-bridge/build.gradle: ## @@ -40,14 +40,32 @@ publishing { dependencies { api(project(':cassandra-analytics-common')) -api(project(':cassandra-analytics-spark-converter')) -compileOnly(group: "${sparkGroupId}", name: "spark-core_${scalaMajorVersion}", version: "${project.rootProject.sparkVersion}") -compileOnly(group: "${sparkGroupId}", name: "spark-sql_${scalaMajorVersion}", version: "${project.rootProject.sparkVersion}") +implementation "org.slf4j:slf4j-api:${slf4jApiVersion}" +compileOnly "com.esotericsoftware:kryo-shaded:${kryoVersion}" +compileOnly "com.google.guava:guava:${guavaVersion}" +compileOnly "com.fasterxml.jackson.core:jackson-annotations:${jacksonVersion}" +compileOnly "com.fasterxml.jackson.core:jackson-core:${jacksonVersion}" +compileOnly "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}" + +testImplementation(project(path: ":cassandra-analytics-spark-converter")) +testImplementation(group: "${sparkGroupId}", name: "spark-core_${scalaMajorVersion}", version: "${project.rootProject.sparkVersion}") +testImplementation(group: "${sparkGroupId}", name: "spark-sql_${scalaMajorVersion}", version: "${project.rootProject.sparkVersion}") testImplementation(group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: "${jacksonVersion}") testImplementation(group: 'com.google.guava', name: 'guava', version: '31.1-jre') testImplementation(group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.26') testImplementation("org.junit.jupiter:junit-jupiter-api:${project.junitVersion}") testImplementation("org.junit.jupiter:junit-jupiter-params:${project.junitVersion}") testImplementation("org.junit.jupiter:junit-jupiter-engine:${project.junitVersion}") } + +configurations { +testArtifacts +} +task testJar(type: Jar) { +archiveBaseName = "${project.name}-bridge-test" +from sourceSets.test.output +} +artifacts { +testArtifacts testJar +} Review Comment: Using test-fixture instead? https://docs.gradle.org/current/userguide/java_testing.html#sec:java_test_fixtures ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/DataLayer.java: ## @@ -155,6 +157,11 @@ public CassandraVersion version() */ public abstract CassandraBridge bridge(); +public SparkSqlTypeConverter typeConverter() Review Comment: Add java doc ## cassandra-analytics-core/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java: ## @@ -75,12 +78,32 @@ public static CassandraBridge get(@NotNull CassandraVersionFeatures features) return get(getCassandraVersion(features)); } +@NotNull +public static SparkSqlTypeConverter getSparkSql(@NotNull CassandraVersionFeatures features) +{ +return getSparkSql(getCassandraVersion(features)); +} + Review Comment: Please locate the method together with the other `getSparkSql` overloads. ## cassandra-analytics-core/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java: ## @@ -33,12 +33,15 @@ import com.google.common.base.Preconditions; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.cassandra.spark.data.converter.SparkSqlTypeConverter; import org.jetbrains.annotations.NotNull; public final class CassandraBridgeFactory { -private static final Map CASSANDRA_BRIDGES = new ConcurrentHashMap<>(CassandraVersion.values().length); +private static final Map> CASSANDRA_BRIDGES = Review Comment: nit: add a comment to indicate what is the key String value represents ## cassandra-analytics-core/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java: ## @@ -130,14 +153,20 @@ private static String typesResourceName(@NotNull String label) return jarResourceName(label, "types"); } +@NotNull +private static String sparkSqlResourceName(@NotNull String label) +{ +return jarResourceName(label, "sparksql"); +} + private static String jarResourceName(String... parts) { return "/bridges/" + String.join("-", parts) + ".jar"; } @NotNull @SuppressWarnings("unchecked") -private static CassandraBridge create(@NotNull String label) +private static Pair create(@NotNull String label) Review Comment: The method signature looks a bit odd. `CassandraBridgeFactory#create` returns a pair.. How about create a dedicated data class, say `VersionedCassandraBridge`, that wraps both `CassandraBridge` and `SparkSqlTypeConverter`? Then `create` and `get` methods return the `VersionedCassandraBridge`. `getSparkSql` may be removed. ```java public class VersionedCassandraBrid
Re: [PR] CASSANDRA-19980: Remove SparkSQL dependency from CassandraBridge so t… [cassandra-analytics]
jberragan commented on PR #88: URL: https://github.com/apache/cassandra-analytics/pull/88#issuecomment-2400445626 Latest green CI: https://app.circleci.com/pipelines/github/jberragan/cassandra-analytics/118/workflows/1dc1d879-865f-44b3-8eec-7363529bf51f -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [I] Feature Request: Support Vector Type [cassandra-gocql-driver]
joao-r-reis commented on issue #1734: URL: https://github.com/apache/cassandra-gocql-driver/issues/1734#issuecomment-2400137486 > > @tengu-alt, did make progress on vector support? I cannot find relevant PR opened. Do you mind if I submit my nearly completed PoC? Based on your comment from 22 May, I think you are encoding length of vector element incorrectly. > > I am currently implementing vector type support. I think the error that I mentioned is caused by the cqlsh incorrect work. It looks like @lukasz-antoniak already has a functioning prototype, it might be more efficient to just have him open a PR with his work depending on how much progress you have on your work -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[PR] CEP-44 Kafka integration for Cassandra CDC using Sidecar [cassandra-analytics]
jberragan opened a new pull request, #87: URL: https://github.com/apache/cassandra-analytics/pull/87 This is the initial commit for CEP-44 to introduce a standalone CDC module into the Analytics project. This module provides the foundation for CDC in the Apache Cassandra Sidecar. This module provides: - a standalone Cdc class as the entrypoint for initializing CDC. - pluggable interfaces for: listing and reading commit log segments for a token range, persisting and reading CDC state, providing the Cassandra table schema, optionally reading values from Cassandra. - read and deserialize commit log mutations. - reconcile and de-duplicate mutations across replicas. - serialize CDC state into a binary object for persistence. - a layer for converting Cassandra mutations into a standard consumable format. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19933: Support aggregated consistency validation for multiple clusters [cassandra-analytics]
yifan-c merged PR #86: URL: https://github.com/apache/cassandra-analytics/pull/86 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19933: Support aggregated consistency validation for multiple clusters [cassandra-analytics]
yifan-c commented on code in PR #86: URL: https://github.com/apache/cassandra-analytics/pull/86#discussion_r1774116231 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/SingleClusterReplicaAwareFailureHandler.java: ## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.bulkwriter.token; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import javax.annotation.concurrent.GuardedBy; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Range; +import com.google.common.collect.RangeMap; +import com.google.common.collect.TreeRangeMap; + +import org.apache.cassandra.spark.bulkwriter.ClusterInfo; +import org.apache.cassandra.spark.bulkwriter.JobInfo; +import org.apache.cassandra.spark.common.model.CassandraInstance; +import org.apache.cassandra.spark.common.model.NodeStatus; +import org.apache.cassandra.spark.data.ReplicationFactor; +import org.apache.cassandra.spark.data.partitioner.Partitioner; +import org.jetbrains.annotations.Nullable; + +/** + * ReplicaAwareFailureHandler for a single cluster + * The handler should be constructed by {@link MultiClusterReplicaAwareFailureHandler} only, hence package-private + * @param CassandraInstance type + */ +class SingleClusterReplicaAwareFailureHandler extends ReplicaAwareFailureHandler +{ +// failures captures per each range; note that failures do not necessarily fail a range, as long as consistency level is considered +@GuardedBy("this") +private final RangeMap rangeFailuresMap = TreeRangeMap.create(); + +@GuardedBy("this") +private boolean isEmpty = true; + +@Nullable +private String clusterId; + +SingleClusterReplicaAwareFailureHandler(Partitioner partitioner, String clusterId) +{ +this.clusterId = clusterId; +rangeFailuresMap.put(Range.openClosed(partitioner.minToken(), partitioner.maxToken()), new FailuresPerInstance()); +} + +/** + * Check whether the handler contains any failure + * @return true if there is at least a failure; false otherwise. + */ +public boolean isEmpty() +{ +return isEmpty; +} + +@Override +public List.ConsistencyFailurePerRange> +getFailedRanges(TokenRangeMapping tokenRangeMapping, JobInfo job, ClusterInfo cluster) +{ +return getFailedRangesInternal(tokenRangeMapping, job.getConsistencyLevel(), job.getLocalDC(), cluster.replicationFactor()); +} + +@Override +public synchronized void addFailure(Range tokenRange, I instance, String errMessage) +{ +RangeMap overlappingFailures = rangeFailuresMap.subRangeMap(tokenRange); +RangeMap mappingsToAdd = TreeRangeMap.create(); + +for (Map.Entry, FailuresPerInstance> entry : overlappingFailures.asMapOfRanges().entrySet()) +{ +FailuresPerInstance newErrorMap = entry.getValue().copy(); +newErrorMap.addErrorForInstance(instance, errMessage); +mappingsToAdd.put(entry.getKey(), newErrorMap); +} +rangeFailuresMap.putAll(mappingsToAdd); +isEmpty = false; +} + +@Override +public synchronized Set getFailedInstances() +{ +if (isEmpty) +{ +return Collections.emptySet(); +} + +return rangeFailuresMap.asMapOfRanges() + .values() + .stream() + .map(FailuresPerInstance::instances) + .flatMap(Collection::stream) + .collect(Collectors.toSet()); +} + +@Override +protected synchronized List.ConsistencyFailurePerRange> +getFailedRangesInternal(TokenRangeMapping tokenRangeMapping, +ConsistencyLevel cl, +@Nullable String localDC, +ReplicationFactor replicationFactor) +{ +Preconditions.checkArgument((
Re: [PR] CASSANDRA-19933: Support aggregated consistency validation for multiple clusters [cassandra-analytics]
yifan-c commented on code in PR #86: URL: https://github.com/apache/cassandra-analytics/pull/86#discussion_r1774114388 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/SingleClusterReplicaAwareFailureHandler.java: ## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.bulkwriter.token; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import javax.annotation.concurrent.GuardedBy; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Range; +import com.google.common.collect.RangeMap; +import com.google.common.collect.TreeRangeMap; + +import org.apache.cassandra.spark.bulkwriter.ClusterInfo; +import org.apache.cassandra.spark.bulkwriter.JobInfo; +import org.apache.cassandra.spark.common.model.CassandraInstance; +import org.apache.cassandra.spark.common.model.NodeStatus; +import org.apache.cassandra.spark.data.ReplicationFactor; +import org.apache.cassandra.spark.data.partitioner.Partitioner; +import org.jetbrains.annotations.Nullable; + +/** + * ReplicaAwareFailureHandler for a single cluster + * The handler should be constructed by {@link MultiClusterReplicaAwareFailureHandler} only, hence package-private + * @param CassandraInstance type + */ +class SingleClusterReplicaAwareFailureHandler extends ReplicaAwareFailureHandler +{ +// failures captures per each range; note that failures do not necessarily fail a range, as long as consistency level is considered +@GuardedBy("this") +private final RangeMap rangeFailuresMap = TreeRangeMap.create(); + +@GuardedBy("this") +private boolean isEmpty = true; + +@Nullable +private String clusterId; + +SingleClusterReplicaAwareFailureHandler(Partitioner partitioner, String clusterId) +{ +this.clusterId = clusterId; +rangeFailuresMap.put(Range.openClosed(partitioner.minToken(), partitioner.maxToken()), new FailuresPerInstance()); +} + +/** + * Check whether the handler contains any failure + * @return true if there is at least a failure; false otherwise. + */ +public boolean isEmpty() +{ +return isEmpty; +} + +@Override +public List.ConsistencyFailurePerRange> +getFailedRanges(TokenRangeMapping tokenRangeMapping, JobInfo job, ClusterInfo cluster) +{ +return getFailedRangesInternal(tokenRangeMapping, job.getConsistencyLevel(), job.getLocalDC(), cluster.replicationFactor()); +} + +@Override +public synchronized void addFailure(Range tokenRange, I instance, String errMessage) +{ +RangeMap overlappingFailures = rangeFailuresMap.subRangeMap(tokenRange); +RangeMap mappingsToAdd = TreeRangeMap.create(); + +for (Map.Entry, FailuresPerInstance> entry : overlappingFailures.asMapOfRanges().entrySet()) +{ +FailuresPerInstance newErrorMap = entry.getValue().copy(); +newErrorMap.addErrorForInstance(instance, errMessage); +mappingsToAdd.put(entry.getKey(), newErrorMap); +} +rangeFailuresMap.putAll(mappingsToAdd); +isEmpty = false; +} + +@Override +public synchronized Set getFailedInstances() +{ +if (isEmpty) +{ +return Collections.emptySet(); +} + +return rangeFailuresMap.asMapOfRanges() + .values() + .stream() + .map(FailuresPerInstance::instances) + .flatMap(Collection::stream) + .collect(Collectors.toSet()); +} + +@Override +protected synchronized List.ConsistencyFailurePerRange> +getFailedRangesInternal(TokenRangeMapping tokenRangeMapping, +ConsistencyLevel cl, +@Nullable String localDC, +ReplicationFactor replicationFactor) +{ +Preconditions.checkArgument((
Re: [PR] CASSANDRA-19933: Support aggregated consistency validation for multiple clusters [cassandra-analytics]
frankgh commented on code in PR #86: URL: https://github.com/apache/cassandra-analytics/pull/86#discussion_r1774074153 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/SingleClusterReplicaAwareFailureHandler.java: ## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.bulkwriter.token; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import javax.annotation.concurrent.GuardedBy; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Range; +import com.google.common.collect.RangeMap; +import com.google.common.collect.TreeRangeMap; + +import org.apache.cassandra.spark.bulkwriter.ClusterInfo; +import org.apache.cassandra.spark.bulkwriter.JobInfo; +import org.apache.cassandra.spark.common.model.CassandraInstance; +import org.apache.cassandra.spark.common.model.NodeStatus; +import org.apache.cassandra.spark.data.ReplicationFactor; +import org.apache.cassandra.spark.data.partitioner.Partitioner; +import org.jetbrains.annotations.Nullable; + +/** + * ReplicaAwareFailureHandler for a single cluster + * The handler should be constructed by {@link MultiClusterReplicaAwareFailureHandler} only, hence package-private + * @param CassandraInstance type + */ +class SingleClusterReplicaAwareFailureHandler extends ReplicaAwareFailureHandler +{ +// failures captures per each range; note that failures do not necessarily fail a range, as long as consistency level is considered +@GuardedBy("this") +private final RangeMap rangeFailuresMap = TreeRangeMap.create(); + +@GuardedBy("this") +private boolean isEmpty = true; + +@Nullable +private String clusterId; + +SingleClusterReplicaAwareFailureHandler(Partitioner partitioner, String clusterId) +{ +this.clusterId = clusterId; +rangeFailuresMap.put(Range.openClosed(partitioner.minToken(), partitioner.maxToken()), new FailuresPerInstance()); +} + +/** + * Check whether the handler contains any failure + * @return true if there is at least a failure; false otherwise. + */ +public boolean isEmpty() +{ +return isEmpty; +} + +@Override +public List.ConsistencyFailurePerRange> +getFailedRanges(TokenRangeMapping tokenRangeMapping, JobInfo job, ClusterInfo cluster) +{ +return getFailedRangesInternal(tokenRangeMapping, job.getConsistencyLevel(), job.getLocalDC(), cluster.replicationFactor()); +} + +@Override +public synchronized void addFailure(Range tokenRange, I instance, String errMessage) +{ +RangeMap overlappingFailures = rangeFailuresMap.subRangeMap(tokenRange); +RangeMap mappingsToAdd = TreeRangeMap.create(); + +for (Map.Entry, FailuresPerInstance> entry : overlappingFailures.asMapOfRanges().entrySet()) +{ +FailuresPerInstance newErrorMap = entry.getValue().copy(); +newErrorMap.addErrorForInstance(instance, errMessage); +mappingsToAdd.put(entry.getKey(), newErrorMap); +} +rangeFailuresMap.putAll(mappingsToAdd); +isEmpty = false; +} + +@Override +public synchronized Set getFailedInstances() +{ +if (isEmpty) +{ +return Collections.emptySet(); +} + +return rangeFailuresMap.asMapOfRanges() + .values() + .stream() + .map(FailuresPerInstance::instances) + .flatMap(Collection::stream) + .collect(Collectors.toSet()); +} + +@Override +protected synchronized List.ConsistencyFailurePerRange> +getFailedRangesInternal(TokenRangeMapping tokenRangeMapping, +ConsistencyLevel cl, +@Nullable String localDC, +ReplicationFactor replicationFactor) +{ +Preconditions.checkArgument((
Re: [PR] CASSANDRA-19933: Support aggregated consistency validation for multiple clusters [cassandra-analytics]
yifan-c commented on code in PR #86: URL: https://github.com/apache/cassandra-analytics/pull/86#discussion_r1773769751 ## cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java: ## @@ -220,30 +161,7 @@ public void multiMapWorksWithRingInstances() assertEquals(1, newErrorMap.keySet().size()); } -@Test -public void testMultipleFailuresSingleInstanceSucceedRF3() Review Comment: the test is moved into `SingleClusterReplicaAwareFailureHandlerTest` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [I] Donate GoCQL to Apache Cassandra [YOUR ACTION REQUIRED] [cassandra-gocql-driver]
bcrusu commented on issue #1751: URL: https://github.com/apache/cassandra-gocql-driver/issues/1751#issuecomment-2370352287 My contribution was minimal, but I'm happy to donate it. Apologies for the late reply! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] [CASSANDRA-19625] Initial Configuration for SonarCube Analysis [cassandra-analytics]
5 closed pull request #57: [CASSANDRA-19625] Initial Configuration for SonarCube Analysis URL: https://github.com/apache/cassandra-analytics/pull/57 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [I] marshalBigInt return 8 bytes slice in all cases except for big.Int, which returns a variable length slice [cassandra-gocql-driver]
OleksiienkoMykyta commented on issue #1740: URL: https://github.com/apache/cassandra-gocql-driver/issues/1740#issuecomment-2363705613 Hello, Felipe! This issue is caused by different bigint types of implementations in Cassandra and in the driver. Cassandra has a limitation in eight-byte for bigint, however, in gocql it's implemented as an arbitrary-precision type. It's already fixed, thank you for issuing it. Have a great day! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[PR] CASSANDRA-19933: Support aggregated consistency validation for multiple clusters [cassandra-analytics]
yifan-c opened a new pull request, #86: URL: https://github.com/apache/cassandra-analytics/pull/86 Patch by Yifan Cai; Reviewed by TBD for CASSANDRA-19933 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19923: Add transport extension for coordinated write [cassandra-analytics]
yifan-c merged PR #83: URL: https://github.com/apache/cassandra-analytics/pull/83 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19923: Add transport extension for coordinated write [cassandra-analytics]
frankgh commented on code in PR #83: URL: https://github.com/apache/cassandra-analytics/pull/83#discussion_r1765416450 ## scripts/build-sidecar.sh: ## @@ -24,7 +24,7 @@ else SCRIPT_DIR=$( dirname -- "$( readlink -f -- "$0"; )"; ) SIDECAR_REPO="${SIDECAR_REPO:-https://github.com/apache/cassandra-sidecar.git}"; SIDECAR_BRANCH="${SIDECAR_BRANCH:-trunk}" - SIDECAR_COMMIT="${SIDECAR_COMMIT:-55a9efee30555d3645680c6524043a6c9bc1194b}" + SIDECAR_COMMIT="${SIDECAR_COMMIT:-f07e248d0ce8303a06daf93b462190ef7be7304d}" Review Comment: any reason not to move to the current head of Sidecar? `7f8db256377ff4e449a83282b2561ce5e7b74adf` ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/StorageTransportHandler.java: ## @@ -89,6 +106,20 @@ private void updateCredentials(UUID jobId, StorageCredentialPair credentialPair) } } +private void sendCoordinationSignal(UUID jobId, RestoreJobStatus status) +{ +UpdateRestoreJobRequestPayload requestPayload = new UpdateRestoreJobRequestPayload(null, null, status, null); +try +{ +// TODO: send to all clusters\ + transportContext.dataTransferApi().updateRestoreJob(requestPayload); +} +catch (ClientException e) +{ +throw new RuntimeException("Failed to update secretes for restore job. restoreJobId: " + jobId, e); Review Comment: wrong exception verbiage here? ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/CoordinatedTransportExtension.java: ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.transports.storage.extensions; + +import org.apache.cassandra.spark.bulkwriter.CassandraBulkSourceRelation; + +/** + * Extension methods that enables coordinated write to multiple target clusters + * Package-private interface only to be extended by {@link StorageTransportExtension} + * + * Note that the methods defined in this extension run in Spark Driver only + * + * The coordinated write has 2 phases, i.e. staging phase and importing phase. In the happy path, the steps of a run are the following: + * + * Extension sets the {@link CoordinationSignalListener} on initialization. + * Extension invokes {@link CoordinationSignalListener#onStageReady(String)}, + * when it decides it is time to stage SSTables on all clusters. + * Cassandra Analytics calls Sidecars to stage data. + * {@link #onStageSucceeded(String, long, long, long)} is called per cluster to notify the extension. + * Extension invokes {@link CoordinationSignalListener#onApplyReady(String)}, + * when it decides it is time to apply/import SSTables on all clusters. + * ECassandra Analytics calls Sidecars to import data. Review Comment: ```suggestion * Cassandra Analytics calls Sidecars to import data. ``` ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/CoordinationSignalListener.java: ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.transports.storage.extensions; + +/** + * A listener interface that receives coordination signals. + * It works in cooperation with {@link CoordinatedTransportExtension} to
Re: [PR] CASSANDRA-19923: Add transport extension for coordinated write [cassandra-analytics]
yifan-c commented on code in PR #83: URL: https://github.com/apache/cassandra-analytics/pull/83#discussion_r1765431422 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/transports/storage/extensions/CoordinationSignalListener.java: ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.transports.storage.extensions; + +/** + * A listener interface that receives coordination signals. + * It works in cooperation with {@link CoordinatedTransportExtension} to enable coordinated write Review Comment: the feature name is "coordinated write" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19923: Add transport extension for coordinated write [cassandra-analytics]
yifan-c commented on code in PR #83: URL: https://github.com/apache/cassandra-analytics/pull/83#discussion_r1765428720 ## scripts/build-sidecar.sh: ## @@ -24,7 +24,7 @@ else SCRIPT_DIR=$( dirname -- "$( readlink -f -- "$0"; )"; ) SIDECAR_REPO="${SIDECAR_REPO:-https://github.com/apache/cassandra-sidecar.git}"; SIDECAR_BRANCH="${SIDECAR_BRANCH:-trunk}" - SIDECAR_COMMIT="${SIDECAR_COMMIT:-55a9efee30555d3645680c6524043a6c9bc1194b}" + SIDECAR_COMMIT="${SIDECAR_COMMIT:-f07e248d0ce8303a06daf93b462190ef7be7304d}" Review Comment: Thanks. I was not aware that there is a new commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] Ninja fix for CASSANDRA-19815 to publish jar for cassandra-analytics-… [cassandra-analytics]
yifan-c closed pull request #85: Ninja fix for CASSANDRA-19815 to publish jar for cassandra-analytics-… URL: https://github.com/apache/cassandra-analytics/pull/85 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[PR] Ninja fix for CASSANDRA-19815 to publish jar for cassandra-analytics-… [cassandra-analytics]
jberragan opened a new pull request, #85: URL: https://github.com/apache/cassandra-analytics/pull/85 …spark-converter -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]
yifan-c merged PR #80: URL: https://github.com/apache/cassandra-analytics/pull/80 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]
JeetKunDoug commented on code in PR #80: URL: https://github.com/apache/cassandra-analytics/pull/80#discussion_r1765194312 ## cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfoTest.java: ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.bulkwriter; + +import java.math.BigInteger; +import java.time.Duration; +import java.time.Instant; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Range; +import org.junit.jupiter.api.Test; + +import o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse; +import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; +import org.apache.cassandra.spark.exception.TimeSkewTooLargeException; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class CassandraClusterInfoTest +{ +@Test +void testTimeSkewAcceptable() +{ +Instant localNow = Instant.now(); +int allowanceMinutes = 10; +Instant remoteNow = localNow.plus(Duration.ofMinutes(1)); +CassandraClusterInfo ci = mockClusterInfoForTimeSkewTest(allowanceMinutes, remoteNow); + ci.validateTimeSkewWithLocalNow(Range.openClosed(BigInteger.valueOf(10), BigInteger.valueOf(20)), localNow); +} + +@Test +void testTimeSkewTooLarge() +{ +Instant localNow = Instant.now(); +int allowanceMinutes = 10; +Instant remoteNow = localNow.plus(Duration.ofMinutes(11)); // 11 > allowanceMinutes +CassandraClusterInfo ci = mockClusterInfoForTimeSkewTest(allowanceMinutes, remoteNow); +assertThatThrownBy(() -> ci.validateTimeSkewWithLocalNow(Range.openClosed(BigInteger.valueOf(10), BigInteger.valueOf(20)), localNow)) +.isExactlyInstanceOf(TimeSkewTooLargeException.class); +} Review Comment: NIT: I'm not crazy about tests that have no asserts, even if it's really just "this doesn't throw" - a suggestion to make the test just a bit more explicit about what it's testing: ```suggestion public static final Range RANGE = Range.openClosed(BigInteger.valueOf(10), BigInteger.valueOf(20)); @Test void testTimeSkewAcceptable() { Instant localNow = Instant.now(); int allowanceMinutes = 10; Instant remoteNow = localNow.plus(Duration.ofMinutes(1)); CassandraClusterInfo ci = mockClusterInfoForTimeSkewTest(allowanceMinutes, remoteNow); assertThatNoException().describedAs("Acceptable time skew should validate without exception") .isThrownBy(() -> ci.validateTimeSkewWithLocalNow(RANGE, localNow)); } @Test void testTimeSkewTooLarge() { Instant localNow = Instant.now(); int allowanceMinutes = 10; Instant remoteNow = localNow.plus(Duration.ofMinutes(11)); // 11 > allowanceMinutes CassandraClusterInfo ci = mockClusterInfoForTimeSkewTest(allowanceMinutes, remoteNow); assertThatException().describedAs("Time skew with too large a value should throw TimeSkewTooLargeException") .isThrownBy(() -> ci.validateTimeSkewWithLocalNow(RANGE, localNow)) .isExactlyInstanceOf(TimeSkewTooLargeException.class); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19927: Remove old compression cache and move to using cache of Compressio… [cassandra-analytics]
frankgh merged PR #84: URL: https://github.com/apache/cassandra-analytics/pull/84 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19927: Remove old compression cache and move to using cache of Compressio… [cassandra-analytics]
jberragan commented on PR #84: URL: https://github.com/apache/cassandra-analytics/pull/84#issuecomment-2356869431 Green CI: https://app.circleci.com/pipelines/github/jberragan/cassandra-analytics/107/workflows/9cb134d8-67c2-404b-b4f8-88bacbef36dc -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [I] Trying to get cassandra active, idle, inUse connection [cassandra-gocql-driver]
md-robiul-hassan-kr commented on issue #1774: URL: https://github.com/apache/cassandra-gocql-driver/issues/1774#issuecomment-2356860789 hi @testisnullus, your assumption is correct. I want to be able to know how many active, in-use, or idle connect I have to the database at a given time, so that I can use those metrics. A good example is [here](https://docs.datastax.com/en/developer/java-driver/4.9/manual/core/metrics/index.html) where those metrics are available through JVM, and I was wondering if there could be possibility to use from the gocql lib. ``` datastax-java-driver.advanced.metrics { session.enabled = [ connected-nodes, cql-requests ] node.enabled = [ pool.open-connections, pool.in-flight ] } ``` I would like to get the information for the connection pool mostly, so that can be used in grafana with prometheus. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19927: Remove old compression cache and move to using cache of Compressio… [cassandra-analytics]
yifan-c commented on code in PR #84: URL: https://github.com/apache/cassandra-analytics/pull/84#discussion_r1763758614 ## cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java: ## @@ -59,15 +64,32 @@ public class SSTableCache propOrDefault("sbr.cache.stats.expireAfterMins", 60)); private final Cache filter = buildCache(propOrDefault("sbr.cache.filter.maxEntries", 16384), propOrDefault("sbr.cache.filter.expireAfterMins", 60)); +private final Cache> compressionMetadata = buildCache(propOrDefault("sbr.cache.compressionInfo.maxEntries", 128), Review Comment: Now looking at the GitHub comment, I realized that it can be confusing. I meant to ask for adding java code comment to explain the cache value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19927: Remove old compression cache and move to using cache of Compressio… [cassandra-analytics]
yifan-c commented on code in PR #84: URL: https://github.com/apache/cassandra-analytics/pull/84#discussion_r1763750927 ## cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java: ## @@ -20,10 +20,13 @@ package org.apache.cassandra.spark.reader; import java.io.IOException; +import java.io.InputStream; import java.util.Map; +import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import com.google.common.cache.Cache; Review Comment: As a potential follow-up, can we migrate to Caffeine for caching? ## cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java: ## @@ -59,15 +64,32 @@ public class SSTableCache propOrDefault("sbr.cache.stats.expireAfterMins", 60)); private final Cache filter = buildCache(propOrDefault("sbr.cache.filter.maxEntries", 16384), propOrDefault("sbr.cache.filter.expireAfterMins", 60)); +private final Cache> compressionMetadata = buildCache(propOrDefault("sbr.cache.compressionInfo.maxEntries", 128), Review Comment: Can you comment why the cache value is `Optional`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]
yifan-c commented on code in PR #80: URL: https://github.com/apache/cassandra-analytics/pull/80#discussion_r1763681330 ## cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CassandraClusterInfoGroupTest.java: ## @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.bulkwriter.coordinatedwrite; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.Test; + +import o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse; +import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse; +import org.apache.cassandra.spark.bulkwriter.CassandraClusterInfo; +import org.apache.cassandra.spark.bulkwriter.ClusterInfo; +import org.apache.cassandra.spark.bulkwriter.RingInstance; +import org.apache.cassandra.spark.bulkwriter.TokenRangeMappingUtils; +import org.apache.cassandra.spark.bulkwriter.WriteAvailability; +import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; +import org.apache.cassandra.spark.data.partitioner.Partitioner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class CassandraClusterInfoGroupTest +{ +@Test +void testCreateGroupFailWithEmptyList() +{ +assertThatThrownBy(() -> new CassandraClusterInfoGroup(null)) +.isExactlyInstanceOf(IllegalArgumentException.class) +.hasMessage("clusterInfos cannot be null or empty"); + +assertThatThrownBy(() -> new CassandraClusterInfoGroup(Collections.emptyList())) +.isExactlyInstanceOf(IllegalArgumentException.class) +.hasMessage("clusterInfos cannot be null or empty"); +} + +@Test +void testLookupCluster() +{ +CassandraClusterInfoGroup group = mockClusterGroup(2, index -> mockClusterInfo("cluster" + index)); +List clusters = group.clusters(); +assertThat(clusters).hasSize(2); +assertThat(group.cluster("cluster0")).isSameAs(clusters.get(0)); +assertThat(group.cluster("cluster1")).isSameAs(clusters.get(1)); +assertThat(group.cluster("cluster2")).isNull(); +} + +@Test +void testClusterId() +{ +CassandraClusterInfoGroup group = mockClusterGroup(2, index -> mockClusterInfo("cluster" + index)); +assertThat(group.clusterId()).isEqualTo("ClusterInfoGroup: [cluster0, cluster1]"); + +group = mockClusterGroup(1, index -> mockClusterInfo("cluster" + index)); +assertThat(group.clusterId()).isEqualTo("ClusterInfoGroup: [cluster0]"); +} + +@Test +void testDelegationOfSingleCluster() +{ +CassandraClusterInfo clusterInfo = mockClusterInfo("cluster0"); +TokenRangeReplicasResponse response = TokenRangeMappingUtils.mockSimpleTokenRangeReplicasResponse(10, 3); +TokenRangeMapping expectedTokenRangeMapping = TokenRangeMapping.create(() -> response, + () -> Partitioner.Murmur3Partitioner, + RingInstance::new); + when(clusterInfo.getTokenRangeMapping(anyBoolean())).thenReturn(expectedTokenRangeMapping); + when(clusterInfo.timeSkew(any())).thenReturn(mock(TimeSkewResponse.class)); + when(clusterInfo.getLowestCassandraVersion()).thenReturn("lowestCassandraVersion"); + when(clusterInfo.clusterWriteAvailability()).thenReturn(Collections.emptyMap()); +CassandraClusterInfoGroup group = mockClusterGroup(1, index -> clusterInfo); +// Since there is a single clusterInfo in the group. It behaves as a
Re: [PR] CASSANDRA-19927: Deprecate old compression cache and move to using cache of Compressio… [cassandra-analytics]
yifan-c commented on code in PR #84: URL: https://github.com/apache/cassandra-analytics/pull/84#discussion_r1763634919 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java: ## @@ -52,11 +52,12 @@ public class SidecarProvisionedSSTable extends SSTable { private static final long serialVersionUID = 6452703925812602832L; +@Deprecated // use SSTableCache @VisibleForTesting public static final Cache COMPRESSION_CACHE = CacheBuilder.newBuilder() - .expireAfterAccess(1, TimeUnit.HOURS) - .maximumSize(2048) - .build(); + .expireAfterAccess(1, TimeUnit.HOURS) + .maximumSize(2048) + .build(); Review Comment: Can we just remove it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19927: Deprecate old compression cache and move to using cache of Compressio… [cassandra-analytics]
yifan-c commented on code in PR #84: URL: https://github.com/apache/cassandra-analytics/pull/84#discussion_r1763624222 ## cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableCache.java: ## @@ -108,6 +111,27 @@ public BloomFilter bloomFilter(@NotNull SSTable ssTable, Descriptor descriptor) return get(filter, ssTable, () -> ReaderUtils.readFilter(ssTable, descriptor.version.hasOldBfFormat())); } +public CompressionMetadata compressionMetaData(@NotNull SSTable ssTable, boolean hasMaxCompressedLength) throws IOException +{ +if (!"true".equalsIgnoreCase(System.getProperty("sbr.cache.compression.enabled", "true"))) Review Comment: How about naming the property `sbr.cache.compressionInfo.enabled`? matching the name of the component (compressionInfo) I think there is utils to read from map but not properties yet. We should refactor to unify the SBW and SBR code further. It is a future action item. Just mention it here. ## cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/Properties.java: ## @@ -40,7 +40,9 @@ public final class Properties public static final int DEFAULT_MAX_RETRIES = 10; public static final long DEFAULT_MILLIS_TO_SLEEP = 500; public static final int DEFAULT_MAX_POOL_SIZE = 64; -public static final boolean DEFAULT_CACHE_COMPRESSION_METADATA = true; +@Deprecated +public static final boolean DEFAULT_CACHE_COMPRESSION_METADATA = false; // use org.apache.cassandra.spark.reader.SSTable cache +@Deprecated public static final long DEFAULT_MAX_SIZE_CACHE_COMPRESSION_METADATA_BYTES = 8 * MEBI_BYTES; // 8MiB Review Comment: Let's simply remove the fields that are newly introduced in the recent patch of CASSANDRA-19900 ## cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java: ## @@ -628,22 +627,19 @@ public class SSTableStreamReader implements ISSTableScanner SSTableStreamReader() throws IOException { lastToken = sparkRangeFilter != null ? sparkRangeFilter.tokenRange().upperEndpoint() : null; -try (@Nullable InputStream compressionInfoInputStream = ssTable.openCompressionStream()) -{ -DataInputStream dataInputStream = new DataInputStream(ssTable.openDataStream()); +@Nullable CompressionMetadata compressionMetadata = SSTableCache.INSTANCE.compressionMetaData(ssTable, version.hasMaxCompressedLength()); Review Comment: nit: `metadata` is often used as a single word -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]
yifan-c commented on code in PR #80: URL: https://github.com/apache/cassandra-analytics/pull/80#discussion_r1763626933 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CassandraClusterInfoGroup.java: ## @@ -161,40 +160,22 @@ public void checkBulkWriterIsEnabledOrThrow() } /** - * @return the largest time skew retrieved from the target replicas + * @return the largest time skew retrieved from the target clusters */ @Override -public TimeSkewResponse getTimeSkew(List instances) +public TimeSkewResponse timeSkew(Range range) { if (clusterInfos.size() == 1) { -return clusterInfos.get(0).getTimeSkew(instances); +return clusterInfos.get(0).timeSkew(range); } -Map> instancesByClusterId = instances.stream().collect(Collectors.groupingBy(instance -> { -String clusterId = instance.clusterId(); -Preconditions.checkState(clusterId != null, - "RingInstance must define its clusterId for coordinated write"); -return clusterId; -})); -long localNow = System.currentTimeMillis(); -long maxDiff = 0; -TimeSkewResponse largestSkew = null; -for (Map.Entry> entry : instancesByClusterId.entrySet()) -{ -String clusterId = entry.getKey(); -List instancesOfCluster = entry.getValue(); -ClusterInfo clusterInfo = cluster(clusterId); -Preconditions.checkState(clusterInfo != null, "ClusterInfo not found with clusterId: " + clusterId); -TimeSkewResponse response = clusterInfo.getTimeSkew(instancesOfCluster); -long d = Math.abs(response.currentTime - localNow); -if (Math.abs(response.currentTime - localNow) > maxDiff) -{ -maxDiff = d; -largestSkew = response; -} -} -return largestSkew; +return clusterInfos.stream() + .map(clusterInfo -> clusterInfo.timeSkew(range)) +// Find the timeSkew with the lowest remote currentTime, i.e. largest difference with the local current time. Review Comment: Talked offline. We are going to refactor the method to do the validation, instead of exposing a response object. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]
JeetKunDoug commented on code in PR #80: URL: https://github.com/apache/cassandra-analytics/pull/80#discussion_r1763592085 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CassandraClusterInfoGroup.java: ## @@ -161,40 +160,22 @@ public void checkBulkWriterIsEnabledOrThrow() } /** - * @return the largest time skew retrieved from the target replicas + * @return the largest time skew retrieved from the target clusters */ @Override -public TimeSkewResponse getTimeSkew(List instances) +public TimeSkewResponse timeSkew(Range range) { if (clusterInfos.size() == 1) { -return clusterInfos.get(0).getTimeSkew(instances); +return clusterInfos.get(0).timeSkew(range); } -Map> instancesByClusterId = instances.stream().collect(Collectors.groupingBy(instance -> { -String clusterId = instance.clusterId(); -Preconditions.checkState(clusterId != null, - "RingInstance must define its clusterId for coordinated write"); -return clusterId; -})); -long localNow = System.currentTimeMillis(); -long maxDiff = 0; -TimeSkewResponse largestSkew = null; -for (Map.Entry> entry : instancesByClusterId.entrySet()) -{ -String clusterId = entry.getKey(); -List instancesOfCluster = entry.getValue(); -ClusterInfo clusterInfo = cluster(clusterId); -Preconditions.checkState(clusterInfo != null, "ClusterInfo not found with clusterId: " + clusterId); -TimeSkewResponse response = clusterInfo.getTimeSkew(instancesOfCluster); -long d = Math.abs(response.currentTime - localNow); -if (Math.abs(response.currentTime - localNow) > maxDiff) -{ -maxDiff = d; -largestSkew = response; -} -} -return largestSkew; +return clusterInfos.stream() + .map(clusterInfo -> clusterInfo.timeSkew(range)) +// Find the timeSkew with the lowest remote currentTime, i.e. largest difference with the local current time. Review Comment: This isn't really right, as it would be possible for the current time on a remote host to be much _greater_ than the current time on the host running the task. We really need to do the whole "take the absolute value of the difference between the times" thing, not just look for the min currentTime. I realize also that for multi-cluster scenarios, the other field in the TimeSkewResponse (`allowableSkewInMinutes`) can, in fact, be different (although in theory I'd doubt that it would be). We should really grab 2 things: 1) The time skew response with the maximum _difference_ between localNow and the remote time, where localNow is calculated each time you get a response back (per my previous suggestion) 2) The _minimum_ `allowableSkewInMinutes` value And then craft the "combined" time skew response from those two things. ## cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CassandraClusterInfoGroupTest.java: ## @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.bulkwriter.coordinatedwrite; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.Test; + +import o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse; +import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse; +import org.apache.cassandra.spark.bulkwriter.CassandraClusterInfo; +import org.apache.cassandra.spark.bulkwriter.ClusterInfo; +import org.apache.cassandra.spark.bulkwriter.RingInstance; +import org.apache.cassandra.spark.bulkwriter.TokenRangeMappingUtils; +import org.apache.cassandra.spark.bulkwriter.WriteAv
[PR] Deprecate old compression cache and move to using cache of Compressio… [cassandra-analytics]
jberragan opened a new pull request, #84: URL: https://github.com/apache/cassandra-analytics/pull/84 …nMetadata, so that: - we no longer cache an entire byte array on heap - we cache and re-use the CompressionMetadata object so that only one BigLongArray object is allocated for the chunk offsets -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] Ninja fix for CASSANDRA-19815 [cassandra-analytics]
jberragan commented on PR #81: URL: https://github.com/apache/cassandra-analytics/pull/81#issuecomment-2353411319 Merged in. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] Ninja fix for CASSANDRA-19815 [cassandra-analytics]
jberragan closed pull request #81: Ninja fix for CASSANDRA-19815 URL: https://github.com/apache/cassandra-analytics/pull/81 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19923: Add transport extension for coordinated write [cassandra-analytics]
yifan-c commented on PR #83: URL: https://github.com/apache/cassandra-analytics/pull/83#issuecomment-2350465869 After #80 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[PR] CASSANDRA-19923: Add transport extension for coordinated write [cassandra-analytics]
yifan-c opened a new pull request, #83: URL: https://github.com/apache/cassandra-analytics/pull/83 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]
yifan-c commented on code in PR #80: URL: https://github.com/apache/cassandra-analytics/pull/80#discussion_r1759270780 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CassandraClusterInfoGroup.java: ## @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.bulkwriter.coordinatedwrite; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse; +import org.apache.cassandra.bridge.CassandraVersionFeatures; +import org.apache.cassandra.spark.bulkwriter.CassandraContext; +import org.apache.cassandra.spark.bulkwriter.ClusterInfo; +import org.apache.cassandra.spark.bulkwriter.WriteAvailability; +import org.apache.cassandra.spark.bulkwriter.RingInstance; +import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; +import org.apache.cassandra.spark.data.ReplicationFactor; +import org.apache.cassandra.spark.data.partitioner.Partitioner; +import org.apache.cassandra.spark.utils.MapUtils; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +public class CassandraClusterInfoGroup implements ClusterInfo, MultiClusterInfoProvider +{ +private static final Logger LOGGER = LoggerFactory.getLogger(CassandraClusterInfoGroup.class); + +private static final long serialVersionUID = 5337884321245616172L; + +// immutable +private final List clusterInfos; +private transient volatile Map clusterInfoById; +private transient volatile TokenRangeMapping consolidatedTokenRangeMapping; + +public CassandraClusterInfoGroup(List clusterInfos) +{ +Preconditions.checkArgument(clusterInfos != null && !clusterInfos.isEmpty(), +"clusterInfos cannot be null or empty"); +this.clusterInfos = Collections.unmodifiableList(clusterInfos); +buildClusterInfoById(); +} + +@Override +public void refreshClusterInfo() +{ +runOnEach(ClusterInfo::refreshClusterInfo); +} + +@Override +public TokenRangeMapping getTokenRangeMapping(boolean cached) +{ +if (clusterInfos.size() == 1) +{ +return clusterInfos.get(0).getTokenRangeMapping(cached); +} + +if (!cached || consolidatedTokenRangeMapping == null) +{ +synchronized (this) +{ +// return immediately if consolidatedTokenRangeMapping has been initialized and call-site asks for the cached value +if (cached && consolidatedTokenRangeMapping != null) +{ +return consolidatedTokenRangeMapping; +} +Map> aggregated = applyOnEach(c -> c.getTokenRangeMapping(cached)); +consolidatedTokenRangeMapping = TokenRangeMapping.consolidate(new ArrayList<>(aggregated.values())); +} +} + +return consolidatedTokenRangeMapping; +} + +/** + * @return the lowest cassandra version among all clusters + */ +@Override +public String getLowestCassandraVersion() +{ +if (clusterInfos.size() == 1) +{ +return clusterInfos.get(0).getLowestCassandraVersion(); +} + +Map aggregated = applyOnEach(ClusterInfo::getLowestCassandraVersion); +List versions = aggregated.values() +.stream() + .map(CassandraVersionFeatures::cassandraVersionFeaturesFromCassandraVersion) +.sorted() + .collect(Collectors.toList()); +CassandraVersionFeatures first = versions
Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]
yifan-c commented on code in PR #80: URL: https://github.com/apache/cassandra-analytics/pull/80#discussion_r1757684085 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CassandraClusterInfoGroup.java: ## @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.bulkwriter.coordinatedwrite; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse; +import org.apache.cassandra.bridge.CassandraVersionFeatures; +import org.apache.cassandra.spark.bulkwriter.CassandraContext; +import org.apache.cassandra.spark.bulkwriter.ClusterInfo; +import org.apache.cassandra.spark.bulkwriter.WriteAvailability; +import org.apache.cassandra.spark.bulkwriter.RingInstance; +import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; +import org.apache.cassandra.spark.data.ReplicationFactor; +import org.apache.cassandra.spark.data.partitioner.Partitioner; +import org.apache.cassandra.spark.utils.MapUtils; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +public class CassandraClusterInfoGroup implements ClusterInfo, MultiClusterInfoProvider +{ +private static final Logger LOGGER = LoggerFactory.getLogger(CassandraClusterInfoGroup.class); + +private static final long serialVersionUID = 5337884321245616172L; + +// immutable +private final List clusterInfos; +private transient volatile Map clusterInfoById; +private transient volatile TokenRangeMapping consolidatedTokenRangeMapping; + +public CassandraClusterInfoGroup(List clusterInfos) +{ +Preconditions.checkArgument(clusterInfos != null && !clusterInfos.isEmpty(), +"clusterInfos cannot be null or empty"); +this.clusterInfos = Collections.unmodifiableList(clusterInfos); +buildClusterInfoById(); +} + +@Override +public void refreshClusterInfo() +{ +runOnEach(ClusterInfo::refreshClusterInfo); +} + +@Override +public TokenRangeMapping getTokenRangeMapping(boolean cached) +{ +if (clusterInfos.size() == 1) +{ +return clusterInfos.get(0).getTokenRangeMapping(cached); +} + +if (!cached || consolidatedTokenRangeMapping == null) +{ +synchronized (this) +{ +// return immediately if consolidatedTokenRangeMapping has been initialized and call-site asks for the cached value +if (cached && consolidatedTokenRangeMapping != null) +{ +return consolidatedTokenRangeMapping; +} +Map> aggregated = applyOnEach(c -> c.getTokenRangeMapping(cached)); +consolidatedTokenRangeMapping = TokenRangeMapping.consolidate(new ArrayList<>(aggregated.values())); +} +} + +return consolidatedTokenRangeMapping; +} + +/** + * @return the lowest cassandra version among all clusters + */ +@Override +public String getLowestCassandraVersion() +{ +if (clusterInfos.size() == 1) +{ +return clusterInfos.get(0).getLowestCassandraVersion(); +} + +Map aggregated = applyOnEach(ClusterInfo::getLowestCassandraVersion); +List versions = aggregated.values() +.stream() + .map(CassandraVersionFeatures::cassandraVersionFeaturesFromCassandraVersion) +.sorted() + .collect(Collectors.toList()); +CassandraVersionFeatures first = versions
Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]
yifan-c commented on code in PR #80: URL: https://github.com/apache/cassandra-analytics/pull/80#discussion_r1757680508 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CassandraClusterInfoGroup.java: ## @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.bulkwriter.coordinatedwrite; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse; +import org.apache.cassandra.bridge.CassandraVersionFeatures; +import org.apache.cassandra.spark.bulkwriter.CassandraContext; +import org.apache.cassandra.spark.bulkwriter.ClusterInfo; +import org.apache.cassandra.spark.bulkwriter.WriteAvailability; +import org.apache.cassandra.spark.bulkwriter.RingInstance; +import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; +import org.apache.cassandra.spark.data.ReplicationFactor; +import org.apache.cassandra.spark.data.partitioner.Partitioner; +import org.apache.cassandra.spark.utils.MapUtils; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +public class CassandraClusterInfoGroup implements ClusterInfo, MultiClusterInfoProvider Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]
yifan-c commented on code in PR #80: URL: https://github.com/apache/cassandra-analytics/pull/80#discussion_r1757679225 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CassandraClusterInfoGroup.java: ## @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.bulkwriter.coordinatedwrite; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse; +import org.apache.cassandra.bridge.CassandraVersionFeatures; +import org.apache.cassandra.spark.bulkwriter.CassandraContext; +import org.apache.cassandra.spark.bulkwriter.ClusterInfo; +import org.apache.cassandra.spark.bulkwriter.WriteAvailability; +import org.apache.cassandra.spark.bulkwriter.RingInstance; +import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; +import org.apache.cassandra.spark.data.ReplicationFactor; +import org.apache.cassandra.spark.data.partitioner.Partitioner; +import org.apache.cassandra.spark.utils.MapUtils; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +public class CassandraClusterInfoGroup implements ClusterInfo, MultiClusterInfoProvider +{ +private static final Logger LOGGER = LoggerFactory.getLogger(CassandraClusterInfoGroup.class); + +private static final long serialVersionUID = 5337884321245616172L; + +// immutable +private final List clusterInfos; +private transient volatile Map clusterInfoById; +private transient volatile TokenRangeMapping consolidatedTokenRangeMapping; + +public CassandraClusterInfoGroup(List clusterInfos) +{ +Preconditions.checkArgument(clusterInfos != null && !clusterInfos.isEmpty(), +"clusterInfos cannot be null or empty"); +this.clusterInfos = Collections.unmodifiableList(clusterInfos); +buildClusterInfoById(); +} + +@Override +public void refreshClusterInfo() +{ +runOnEach(ClusterInfo::refreshClusterInfo); +} + +@Override +public TokenRangeMapping getTokenRangeMapping(boolean cached) +{ +if (clusterInfos.size() == 1) +{ +return clusterInfos.get(0).getTokenRangeMapping(cached); +} + +if (!cached || consolidatedTokenRangeMapping == null) +{ +synchronized (this) +{ +// return immediately if consolidatedTokenRangeMapping has been initialized and call-site asks for the cached value +if (cached && consolidatedTokenRangeMapping != null) +{ +return consolidatedTokenRangeMapping; +} +Map> aggregated = applyOnEach(c -> c.getTokenRangeMapping(cached)); +consolidatedTokenRangeMapping = TokenRangeMapping.consolidate(new ArrayList<>(aggregated.values())); +} +} + +return consolidatedTokenRangeMapping; +} + +/** + * @return the lowest cassandra version among all clusters + */ +@Override +public String getLowestCassandraVersion() +{ +if (clusterInfos.size() == 1) +{ +return clusterInfos.get(0).getLowestCassandraVersion(); +} + +Map aggregated = applyOnEach(ClusterInfo::getLowestCassandraVersion); +List versions = aggregated.values() +.stream() + .map(CassandraVersionFeatures::cassandraVersionFeaturesFromCassandraVersion) +.sorted() + .collect(Collectors.toList()); +CassandraVersionFeatures first = versions
Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]
yifan-c commented on code in PR #80: URL: https://github.com/apache/cassandra-analytics/pull/80#discussion_r1757674951 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/MultiClusterInfoProvider.java: ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.bulkwriter.coordinatedwrite; + +import java.util.List; + +import org.apache.cassandra.spark.bulkwriter.ClusterInfo; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Provider for multiple ClusterInfo and lookup + */ +public interface MultiClusterInfoProvider Review Comment: I think `Provider` qualifies here. The interface is to provide the getters for clusters. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]
yifan-c commented on code in PR #80: URL: https://github.com/apache/cassandra-analytics/pull/80#discussion_r1757673435 ## cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/MapUtils.java: ## @@ -279,4 +282,19 @@ public static T resolveDeprecated(Map options, String option return deprecatedOptionValue == null ? resolver.apply(null) : deprecatedOptionValue; } + +/** + * Get the first map entry from the map + * + * @return the first map entry, if there are at least one entry in the map + * @throws NoSuchElementException if the map is emtpy + */ +public static Map.Entry firstEntry(@NotNull Map map) Review Comment: ok. removing the code. It is not as useful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]
yifan-c commented on code in PR #80: URL: https://github.com/apache/cassandra-analytics/pull/80#discussion_r1757673277 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CassandraClusterInfoGroup.java: ## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.bulkwriter.coordinatedwrite; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse; +import org.apache.cassandra.bridge.CassandraVersionFeatures; +import org.apache.cassandra.spark.bulkwriter.CassandraContext; +import org.apache.cassandra.spark.bulkwriter.ClusterInfo; +import org.apache.cassandra.spark.bulkwriter.WriteAvailability; +import org.apache.cassandra.spark.bulkwriter.RingInstance; +import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; +import org.apache.cassandra.spark.data.ReplicationFactor; +import org.apache.cassandra.spark.data.partitioner.Partitioner; +import org.apache.cassandra.spark.utils.MapUtils; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +public class CassandraClusterInfoGroup implements ClusterInfo, MultiClusterInfoProvider +{ +private static final Logger LOGGER = LoggerFactory.getLogger(CassandraClusterInfoGroup.class); + +private static final long serialVersionUID = 5337884321245616172L; + +// immutable +private final List clusterInfos; +private final Map clusterInfoById; +// map key might be stale, e.g. instance is removed from cluster, but it remains in the map. In such case, we do not expect call-sites use the stale key +private final Map instanceToClusterMap = new HashMap<>(); + +public CassandraClusterInfoGroup(List clusterInfos) +{ +Preconditions.checkArgument(clusterInfos != null && !clusterInfos.isEmpty(), +"clusterInfos cannot be null or empty"); +this.clusterInfos = Collections.unmodifiableList(clusterInfos); +this.clusterInfoById = clusterInfos.stream().collect(Collectors.toMap(ClusterInfo::clusterId, Function.identity())); +} + +@Override +public void refreshClusterInfo() +{ +runOnEach(ClusterInfo::refreshClusterInfo); +} + +@Override +public TokenRangeMapping getTokenRangeMapping(boolean cached) +{ +if (clusterInfos.size() == 1) +{ +return clusterInfos.get(0).getTokenRangeMapping(cached); +} + +Map> aggregated = applyOnEach(c -> c.getTokenRangeMapping(cached)); +// When there are multiple clusters, populate the reverse lookup map when fetching latest or initializing +if (!cached || aggregated.isEmpty()) +{ +// todo: synchronize and clear the reverse lookup map? +instanceToClusterMap.clear(); +aggregated.forEach((clusterId, mapping) -> mapping.getTokenRanges() + .keySet() + .forEach(instance -> instanceToClusterMap.put(instance, clusterId))); +} + +return TokenRangeMapping.consolidate(new ArrayList<>(aggregated.values())); +} + +/** + * @return the lowest cassandra version among all clusters + */ +@Override +public String getLowestCassandraVersion() +{ +if (clusterInfos.size() == 1) +{ +return clusterInfos.get(0).getLowestCassandraVersion(); +} + +Map aggregated = applyOnEach(ClusterInfo::getLowestCassandraVersion); +List versions = aggregated.values() +.stream() + .map(CassandraVersionF
Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]
yifan-c commented on code in PR #80: URL: https://github.com/apache/cassandra-analytics/pull/80#discussion_r1757669477 ## cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMappingTest.java: ## @@ -105,16 +101,145 @@ void testCreateTokenRangeMappingWithPending() } TokenRangeMapping topology = TokenRangeMapping.create(() -> response, () -> Partitioner.Murmur3Partitioner, -() -> rf, RingInstance::new); + RingInstance::new); assertThat(topology.pendingInstances()).hasSize(pendingCount); } +@Test +void testConsolidateListOfTokenRangeMappingFails() Review Comment: The `TokenRangeMapping` covers the entire ring and contains only the unwrapped ranges already. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]
JeetKunDoug commented on code in PR #80: URL: https://github.com/apache/cassandra-analytics/pull/80#discussion_r1757195386 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/MultiClusterInfoProvider.java: ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.bulkwriter.coordinatedwrite; + +import java.util.List; + +import org.apache.cassandra.spark.bulkwriter.ClusterInfo; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Provider for multiple ClusterInfo and lookup + */ +public interface MultiClusterInfoProvider Review Comment: NIT: this is just `MultiClusterInfo` like the previous PR - no need for `Provider` in the interface name. ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CassandraClusterInfoGroup.java: ## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.bulkwriter.coordinatedwrite; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse; +import org.apache.cassandra.bridge.CassandraVersionFeatures; +import org.apache.cassandra.spark.bulkwriter.CassandraContext; +import org.apache.cassandra.spark.bulkwriter.ClusterInfo; +import org.apache.cassandra.spark.bulkwriter.WriteAvailability; +import org.apache.cassandra.spark.bulkwriter.RingInstance; +import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; +import org.apache.cassandra.spark.data.ReplicationFactor; +import org.apache.cassandra.spark.data.partitioner.Partitioner; +import org.apache.cassandra.spark.utils.MapUtils; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +public class CassandraClusterInfoGroup implements ClusterInfo, MultiClusterInfoProvider +{ +private static final Logger LOGGER = LoggerFactory.getLogger(CassandraClusterInfoGroup.class); + +private static final long serialVersionUID = 5337884321245616172L; + +// immutable +private final List clusterInfos; +private final Map clusterInfoById; +// map key might be stale, e.g. instance is removed from cluster, but it remains in the map. In such case, we do not expect call-sites use the stale key +private final Map instanceToClusterMap = new HashMap<>(); + +public CassandraClusterInfoGroup(List clusterInfos) +{ +Preconditions.checkArgument(clusterInfos != null && !clusterInfos.isEmpty(), +"clusterInfos cannot be null or empty"); +this.clusterInfos = Collections.unmodifiableList(clusterInfos); +this.clusterInfoById = clusterInfos.stream().collect(Collectors.toMap(ClusterInfo::clusterId, Function.identity())); +} + +@Override +public void refreshClusterInfo() +{ +runOnEach(ClusterInfo::refreshClusterInfo); +} + +@Override +public TokenRangeMapping getTokenRangeMapping(boolean
[PR] Improve build-dependencies.sh script to set CASSANDRA_USE_JDK11 if an… [cassandra-analytics]
jberragan opened a new pull request, #82: URL: https://github.com/apache/cassandra-analytics/pull/82 …alytics is running on JDK11 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]
JeetKunDoug commented on code in PR #80: URL: https://github.com/apache/cassandra-analytics/pull/80#discussion_r1757005825 ## cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMappingTest.java: ## @@ -105,16 +101,145 @@ void testCreateTokenRangeMappingWithPending() } TokenRangeMapping topology = TokenRangeMapping.create(() -> response, () -> Partitioner.Murmur3Partitioner, -() -> rf, RingInstance::new); + RingInstance::new); assertThat(topology.pendingInstances()).hasSize(pendingCount); } +@Test +void testConsolidateListOfTokenRangeMappingFails() Review Comment: Is it possible to add some tests (or modify the replica building logic) to cover the entire token range of the partitioner so we have wrap-around ranges in these tests please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]
yifan-c commented on code in PR #80: URL: https://github.com/apache/cassandra-analytics/pull/80#discussion_r1756085954 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMapping.java: ## @@ -113,12 +139,10 @@ Multimap> tokenRangesByInstance(List writeRepl } public TokenRangeMapping(Partitioner partitioner, - ReplicationFactor replicationFactor, Review Comment: Since `TokenRangeMapping` now can be consolidated, it no longer makes sense to contain ReplicationFactor, which is tie to cluster. The `TokenRangeMapping` or topology is only to describe the ring layout. `ReplicationFactor` is for consistency coordination. It is not part of `TokenRangeMapping` at the first place. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]
yifan-c commented on code in PR #80: URL: https://github.com/apache/cassandra-analytics/pull/80#discussion_r1756080437 ## cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CqlTable.java: ## @@ -39,7 +39,7 @@ @SuppressWarnings({ "WeakerAccess", "unused" }) public class CqlTable implements Serializable { -public static final long serialVersionUID = 42L; +private static final long serialVersionUID = 1018995207366817661L; Review Comment: 42 is the ultimate answer, but likely a common one. Changed to a random value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]
yifan-c commented on code in PR #80: URL: https://github.com/apache/cassandra-analytics/pull/80#discussion_r1756079450 ## cassandra-analytics-common/build.gradle: ## @@ -41,20 +41,21 @@ publishing { } dependencies { -implementation "org.slf4j:slf4j-api:${slf4jApiVersion}" -compileOnly "com.esotericsoftware:kryo-shaded:${kryoVersion}" -compileOnly "com.google.guava:guava:${guavaVersion}" -compileOnly "com.fasterxml.jackson.core:jackson-annotations:${jacksonVersion}" -compileOnly "com.fasterxml.jackson.core:jackson-core:${jacksonVersion}" -compileOnly "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}" +implementation("org.slf4j:slf4j-api:${slf4jApiVersion}") +compileOnly("com.esotericsoftware:kryo-shaded:${kryoVersion}") +compileOnly("com.google.guava:guava:${guavaVersion}") + compileOnly("com.fasterxml.jackson.core:jackson-annotations:${jacksonVersion}") +compileOnly("com.fasterxml.jackson.core:jackson-core:${jacksonVersion}") + compileOnly("com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}") -testImplementation "com.google.guava:guava:${guavaVersion}" -testImplementation "com.fasterxml.jackson.core:jackson-annotations:${jacksonVersion}" -testImplementation "com.fasterxml.jackson.core:jackson-core:${jacksonVersion}" -testImplementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}" - testImplementation("org.junit.jupiter:junit-jupiter-api:${project.junitVersion}") - testImplementation("org.junit.jupiter:junit-jupiter-params:${project.junitVersion}") - testImplementation("org.junit.jupiter:junit-jupiter-engine:${project.junitVersion}") +testImplementation("com.google.guava:guava:${guavaVersion}") + testImplementation("com.fasterxml.jackson.core:jackson-annotations:${jacksonVersion}") + testImplementation("com.fasterxml.jackson.core:jackson-core:${jacksonVersion}") + testImplementation("com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}") +testImplementation("org.junit.jupiter:junit-jupiter-api:${junitVersion}") + testImplementation("org.junit.jupiter:junit-jupiter-params:${junitVersion}") + testImplementation("org.junit.jupiter:junit-jupiter-engine:${junitVersion}") +testImplementation("org.assertj:assertj-core:${assertjCoreVersion}") Review Comment: the actual change is this line, adding assertj-core dependency. Other lines in the file are changed to unify the dependency declaration style. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]
yifan-c commented on PR #80: URL: https://github.com/apache/cassandra-analytics/pull/80#issuecomment-2345227918 > I will rebase once #79 is merged #79 is merged. Rebase is done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19909: Add writer options COORDINATED_WRITE_CONFIG to define coordinated write to multiple Cassandra clusters [cassandra-analytics]
yifan-c merged PR #79: URL: https://github.com/apache/cassandra-analytics/pull/79 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] Ninja fix for CASSANDRA-19815 [cassandra-analytics]
jberragan commented on PR #81: URL: https://github.com/apache/cassandra-analytics/pull/81#issuecomment-2345106427 Green CircleCI: https://app.circleci.com/pipelines/github/jberragan/cassandra-analytics/100/workflows/813b00fb-1807-4471-bb94-d2685bbe7c91 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19909: Add writer options COORDINATED_WRITE_CONFIG to define coordinated write to multiple Cassandra clusters [cassandra-analytics]
JeetKunDoug commented on code in PR #79: URL: https://github.com/apache/cassandra-analytics/pull/79#discussion_r1755318108 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java: ## @@ -223,7 +227,8 @@ public BulkSparkConf(SparkConf conf, Map options) } this.jobTimeoutSeconds = MapUtils.getLong(options, WriterOptions.JOB_TIMEOUT_SECONDS.name(), -1L); this.configuredJobId = MapUtils.getOrDefault(options, WriterOptions.JOB_ID.name(), null); - +this.coordinatedWriteConfJson = MapUtils.getOrDefault(options, WriterOptions.COORDINATED_WRITE_CONFIG.name(), null); +this.coordinatedWriteConf = buildCoordinatedWriteConf(); // must only call the build method at this step, after the related fields have been resolved Review Comment: If, instead, you passed the necessary parameters to `buildCoordinatedWriteConf` you could ensure the order was preserved, rather than just having a comment here. Move the build method to CoordinatedWriteConf as a static method and call it where necessary. ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java: ## @@ -282,6 +287,63 @@ Set sidecarContactPoints() return sidecarContactPoints; } +public boolean isCoordinatedWriteConfigured() +{ +return coordinatedWriteConf != null; +} + +public CoordinatedWriteConf coordinatedWriteConf() +{ +if (coordinatedWriteConf == null) +{ +coordinatedWriteConf = buildCoordinatedWriteConf(); +} + +return coordinatedWriteConf; +} + +@Nullable +protected CoordinatedWriteConf buildCoordinatedWriteConf() Review Comment: See above - this logic I think really belongs in CoordinatedWriteConf.build() or something. ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CoordinatedWriteConf.java: ## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.bulkwriter.coordinatedwrite; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.StringUtils; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.type.TypeFactory; +import org.apache.cassandra.sidecar.client.SidecarInstance; +import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel; +import org.apache.cassandra.spark.common.SidecarInstanceFactory; +import org.jetbrains.annotations.Nullable; + +import static java.util.stream.Collectors.collectingAndThen; +import static java.util.stream.Collectors.toSet; + +/** + * Data class containing the configurations required for coordinated write. + * The serialization format is JSON string. The class takes care of serialization and deserialization. + */ +public class CoordinatedWriteConf +{ +private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); +// The runtime type of ClusterConfProvider is erased; use clustersOf method to read the desired type back +private final Map clusters; + +/** + * Parse JSON string and create a CoordinatedWriteConf object with the specified ClusterConfProvider format + * + * @param json JSON string + * @param clusterConfType concrete type of ClusterConfProvider that can be used for JSON serialization and deserialization + * @return CoordinatedWriteConf object + * @param subtype of ClusterConfProvider + */ +public static +CoordinatedWriteConf fromJson(String json, Class clusterConfType) +{ +JavaType javaType = TypeFactory.defaultInstance().constructMapType(Map.class, String.class, clusterConfType); +
[PR] Ninja fix for CASSANDRA-19815 [cassandra-analytics]
jberragan opened a new pull request, #81: URL: https://github.com/apache/cassandra-analytics/pull/81 Fix Scala 2.13 build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19815: Decouple Cassandra types from Spark types so Cassandra types can be u… [cassandra-analytics]
frankgh merged PR #71: URL: https://github.com/apache/cassandra-analytics/pull/71 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [I] Donate GoCQL to Apache Cassandra [YOUR ACTION REQUIRED] [cassandra-gocql-driver]
mattheath commented on issue #1751: URL: https://github.com/apache/cassandra-gocql-driver/issues/1751#issuecomment-234838 @michaelsembwever I'm happy to assign any contributions I've made to this to the ASF. Sorry for the delay! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]
yifan-c commented on PR #80: URL: https://github.com/apache/cassandra-analytics/pull/80#issuecomment-2342424833 I will rebase once https://github.com/apache/cassandra-analytics/pull/79 is merged -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[PR] CASSANDRA-19910: Support data partitioning for multiple clusters coordinated write [cassandra-analytics]
yifan-c opened a new pull request, #80: URL: https://github.com/apache/cassandra-analytics/pull/80 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19909: Add writer options COORDINATED_WRITE_CONF to define coordinated write to multiple Cassandra clusters [cassandra-analytics]
yifan-c commented on code in PR #79: URL: https://github.com/apache/cassandra-analytics/pull/79#discussion_r1752673693 ## cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/coordinatedwrite/CoordinatedWriteConfTest.java: ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.bulkwriter.coordinatedwrite; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.exc.ValueInstantiationException; +import org.apache.cassandra.sidecar.client.SidecarInstance; +import org.apache.cassandra.sidecar.client.SidecarInstanceImpl; +import org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf.ClusterConfProvider; +import org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf.SimpleClusterConf; +import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel.CL; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class CoordinatedWriteConfTest Review Comment: I believe the test case is already there. ## cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java: ## @@ -275,6 +277,70 @@ void testSidecarContactPoints() .isNull(); } +@Test +void testReadCoordinatedWriteConfFails() Review Comment: I believe the test case is already there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19815: Decouple Cassandra types from Spark types so Cassandra types can be u… [cassandra-analytics]
jberragan commented on code in PR #71: URL: https://github.com/apache/cassandra-analytics/pull/71#discussion_r1752638296 ## cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java: ## @@ -176,16 +176,18 @@ public CassandraBridgeImplementation() kryoSerializers.put(CqlField.class, new CqlField.Serializer(cassandraTypes())); kryoSerializers.put(CqlTable.class, new CqlTable.Serializer(cassandraTypes())); kryoSerializers.put(CqlUdt.class, new CqlUdt.Serializer(cassandraTypes())); - -nativeTypes = allTypes().stream().collect(Collectors.toMap(CqlField.CqlType::name, Function.identity())); } -@Override public CassandraTypes cassandraTypes() { return CassandraTypesImplementation.INSTANCE; } +public SparkSqlTypeConverter typeConverter() +{ +return SparkSqlTypeConverterImplementation.INSTANCE; Review Comment: We could remove the Spark dependency from `cassandra-bridge` but it would be a lot more work and probably better to do outside this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19815: Decouple Cassandra types from Spark types so Cassandra types can be u… [cassandra-analytics]
jberragan commented on code in PR #71: URL: https://github.com/apache/cassandra-analytics/pull/71#discussion_r1752631902 ## cassandra-bridge/build.gradle: ## @@ -40,6 +40,7 @@ publishing { dependencies { api(project(':cassandra-analytics-common')) +api(project(':cassandra-analytics-spark-converter')) Review Comment: I think bridge is the connecting point between Cassandra and Spark. It already depends on the `spark-core`, `spark_sql` jars. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[PR] CASSANDRA-19909: Add writer options COORDINATED_WRITE_CONF to define coordinated write to multiple Cassandra clusters [cassandra-analytics]
yifan-c opened a new pull request, #79: URL: https://github.com/apache/cassandra-analytics/pull/79 The option specifies the configuration (in JSON) for coordinated write. See org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf. When the option is present, SIDECAR_CONTACT_POINTS, SIDECAR_INSTANCES and LOCAL_DC are ignored if they are present. Patch by Yifan Cai; Reviewed by TBD for CASSANDRA-19909 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [I] How to set limit for pool connection [cassandra-gocql-driver]
OleksiienkoMykyta commented on issue #1741: URL: https://github.com/apache/cassandra-gocql-driver/issues/1741#issuecomment-2340521705 NumConns sets an initial number of connections per node( by default 2) but not the maximum number of connections per node. Also by default, gocql maintains a minimum number of connections, but as demand increases, it will open more connections to handle the load. I think you are looking for similar limitation functionality as Python and Java drivers have. So we can add MaxNumberOfConnections or something to set the wanted limit, and we should add a detailed NumConns description to the documentation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19900: Make the compression cache configurable to reduce he… [cassandra-analytics]
belliottsmith commented on code in PR #77: URL: https://github.com/apache/cassandra-analytics/pull/77#discussion_r1749723810 ## cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/Properties.java: ## @@ -40,6 +40,8 @@ public final class Properties public static final int DEFAULT_MAX_RETRIES = 10; public static final long DEFAULT_MILLIS_TO_SLEEP = 500; public static final int DEFAULT_MAX_POOL_SIZE = 64; +public static final boolean DEFAULT_CACHE_COMPRESSION_METADATA_KEY = true; +public static final int DEFAULT_MAX_SIZE_CACHE_COMPRESSION_METADATA_KEY = 8388608; // 8MiB Review Comment: for future reference, shifting left in multiples of 10 multiplies by KiB. so 8 << 20 is a clean way to say 8 Mebibytes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19901: Refactor TokenRangeMapping to use proper types instead of Strings [cassandra-analytics]
frankgh commented on code in PR #78: URL: https://github.com/apache/cassandra-analytics/pull/78#discussion_r1747575920 ## scripts/build-dtest-jars.sh: ## @@ -118,4 +118,6 @@ else exit ${RETURN} fi done + # always delete the Cassandra source after dtest.jar is built to avoid confusing IDE Review Comment: can we alternatively exclude these files from the IDE's indexing? ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMapping.java: ## @@ -22,55 +22,131 @@ import java.io.Serializable; import java.math.BigInteger; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import com.google.common.base.Preconditions; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; import com.google.common.collect.Range; import com.google.common.collect.RangeMap; import com.google.common.collect.TreeRangeMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse; +import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse.ReplicaInfo; import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse.ReplicaMetadata; -import org.apache.cassandra.spark.bulkwriter.RingInstance; import org.apache.cassandra.spark.common.model.CassandraInstance; import org.apache.cassandra.spark.data.ReplicationFactor; import org.apache.cassandra.spark.data.partitioner.Partitioner; import org.jetbrains.annotations.Nullable; -// TODO: refactor to improve the return types of methods to use `Instance` instead of String and cleanup -public class TokenRangeMapping implements Serializable +public class TokenRangeMapping implements Serializable { private static final long serialVersionUID = -7284933683815811160L; +private static final Logger LOGGER = LoggerFactory.getLogger(TokenRangeMapping.class); + private final Partitioner partitioner; private final ReplicationFactor replicationFactor; -private final transient Set replacementInstances; -private final transient RangeMap> replicasByTokenRange; -private final transient Multimap> tokenRangeMap; -private final transient Map> writeReplicasByDC; -private final transient Map> pendingReplicasByDC; -private final transient List replicaMetadata; +private final transient Set allInstances; +private final transient RangeMap> replicasByTokenRange; +private final transient Multimap> tokenRangeMap; +private final transient Map> writeReplicasByDC; +private final transient Map> pendingReplicasByDC; + +public static +TokenRangeMapping create(Supplier topologySupplier, +Supplier partitionerSupplier, +Supplier replicationFactorSupplier, +Function instanceCreator) +{ +TokenRangeReplicasResponse response = topologySupplier.get(); +Map instanceByIpAddress = new HashMap<>(response.replicaMetadata().size()); +response.replicaMetadata() +.forEach((ipAddress, metadata) -> instanceByIpAddress.put(ipAddress, instanceCreator.apply(metadata))); + +Multimap> tokenRangesByInstance = tokenRangesByInstance(response.writeReplicas(), + instanceByIpAddress); + +// Each token range has hosts by DC. We collate them across all ranges into all hosts by DC +Map> writeReplicasByDC = new HashMap<>(); +Map> pendingReplicasByDC = new HashMap<>(); +Set allInstances = new HashSet<>(instanceByIpAddress.values()); +for (I instance : allInstances) +{ +Set dc = writeReplicasByDC.computeIfAbsent(instance.datacenter(), k -> new HashSet<>()); +dc.add(instance); +if (instance.nodeState().isPending) +{ +Set pendingInDc = pendingReplicasByDC.computeIfAbsent(instance.datacenter(), k -> new HashSet<>()); +pendingInDc.add(instance); +} +} + +if (LOGGER.isDebugEnabled()) +{ +LOGGER.debug("Fetched token-ranges with dcs={}, write_replica_count={}, pending_replica_count={}", + writeReplicasByDC.keySet(), + writeReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size(), + pendingReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size()); +} + +Map replicaMetadata = response.r
Re: [PR] CASSANDRA-19900: Make the compression cache configurable to reduce he… [cassandra-analytics]
frankgh commented on code in PR #77: URL: https://github.com/apache/cassandra-analytics/pull/77#discussion_r1746183779 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java: ## @@ -210,6 +212,8 @@ public static final class ClientConfig public static final String CHUNK_BUFFER_SIZE_BYTES_KEY = "chunkBufferSizeBytes"; public static final String MAX_POOL_SIZE_KEY = "maxPoolSize"; public static final String TIMEOUT_SECONDS_KEY = "timeoutSeconds"; +public static final String CACHE_COMPRESSION_METADATA_KEY = "cacheCompressionMetadata"; +public static final String MAX_SIZE_CACHE_COMPRESSION_METADATA_KEY = "maxCacheCompressionMetadata"; Review Comment: Can we add the units for this param? ```suggestion public static final String MAX_SIZE_CACHE_COMPRESSION_METADATA_KEY = "maxCacheCompressionMetadataMiB"; ``` ## cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/Properties.java: ## @@ -40,6 +40,8 @@ public final class Properties public static final int DEFAULT_MAX_RETRIES = 10; public static final long DEFAULT_MILLIS_TO_SLEEP = 500; public static final int DEFAULT_MAX_POOL_SIZE = 64; +public static final boolean DEFAULT_CACHE_COMPRESSION_METADATA_KEY = true; +public static final int DEFAULT_MAX_SIZE_CACHE_COMPRESSION_METADATA_KEY = 8388608; // 8MiB Review Comment: ```suggestion public static final int DEFAULT_MAX_SIZE_CACHE_COMPRESSION_METADATA_KEY = 8 * MEBI_BYTES; // 8MiB ``` ## cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java: ## @@ -398,7 +414,19 @@ public static ClientConfig create(int userProvidedPort, maxPoolSize, timeoutSeconds, maxBufferOverride, -chunkBufferOverride); +chunkBufferOverride, +cacheCompressionMetadata, +maxSizeCacheCompressionMetadata); +} + +public int maxSizeCacheCompressionMetadata() Review Comment: can we add the units here as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19873 Removes checks for blocked instances from bulk-write … [cassandra-analytics]
frankgh merged PR #76: URL: https://github.com/apache/cassandra-analytics/pull/76 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19815: Decouple Cassandra types from Spark types so Cassandra types can be u… [cassandra-analytics]
jberragan commented on code in PR #71: URL: https://github.com/apache/cassandra-analytics/pull/71#discussion_r1745910617 ## cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/TypeConverter.java: ## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.data; + +import org.jetbrains.annotations.NotNull; + +public interface TypeConverter +{ +default Object convert(CqlField.CqlType cqlType, @NotNull Object value) +{ +return convert(cqlType, value, false); +} Review Comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [I] Donate GoCQL to Apache Cassandra [YOUR ACTION REQUIRED] [cassandra-gocql-driver]
Drahflow commented on issue #1751: URL: https://github.com/apache/cassandra-gocql-driver/issues/1751#issuecomment-2329826936 @michaelsembwever I have received copyright-assignment back from the client who paid for my (minor) contributions, and am happy to donate said copyright to the ASF. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19815: Decouple Cassandra types from Spark types so Cassandra types can be u… [cassandra-analytics]
jberragan commented on code in PR #71: URL: https://github.com/apache/cassandra-analytics/pull/71#discussion_r1742768323 ## cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/reader/DataTypeSerializationTests.java: ## @@ -152,151 +153,153 @@ public void testUUID() public void testLong() { qt().forAll(TestUtils.bridges()).checkAssert(bridge -> { - assertTrue(bridge.bigint().deserialize(bridge.bigint().serialize(Long.MAX_VALUE)) instanceof Long); -assertEquals(Long.MAX_VALUE, bridge.bigint().deserialize( -ByteBuffer.allocate(8).putLong(0, Long.MAX_VALUE))); + assertTrue(bridge.bigint().deserializeToType(bridge.typeConverter(), bridge.bigint().serialize(Long.MAX_VALUE)) instanceof Long); +assertEquals(Long.MAX_VALUE, bridge.bigint().deserializeToType(bridge.typeConverter(), + ByteBuffer.allocate(8).putLong(0, Long.MAX_VALUE))); qt().forAll(integers().all()) -.checkAssert(integer -> assertEquals((long) integer, bridge.bigint().deserialize( -bridge.bigint().serialize((long) integer; -assertEquals(Long.MAX_VALUE, bridge.bigint().deserialize(bridge.bigint().serialize(Long.MAX_VALUE))); -assertEquals(Long.MIN_VALUE, bridge.bigint().deserialize(bridge.bigint().serialize(Long.MIN_VALUE))); +.checkAssert(integer -> assertEquals((long) integer, bridge.bigint().deserializeToType(bridge.typeConverter(), + bridge.bigint().serialize((long) integer; +assertEquals(Long.MAX_VALUE, bridge.bigint().deserializeToJavaType(bridge.bigint().serialize(Long.MAX_VALUE))); +assertEquals(Long.MIN_VALUE, bridge.bigint().deserializeToJavaType(bridge.bigint().serialize(Long.MIN_VALUE))); qt().withExamples(MAX_TESTS) .forAll(longs().all()) -.checkAssert(aLong -> assertEquals(aLong, bridge.bigint().deserialize( -bridge.bigint().serialize(aLong; +.checkAssert(aLong -> assertEquals(aLong, bridge.bigint().deserializeToType(bridge.typeConverter(), + bridge.bigint().serialize(aLong; }); } @Test public void testDecimal() { qt().forAll(TestUtils.bridges()).checkAssert(bridge -> { -assertTrue(bridge.decimal().deserialize( -bridge.decimal().serialize(BigDecimal.valueOf(500L))) instanceof Decimal); -assertEquals(Decimal.apply(500), bridge.decimal().deserialize( -bridge.decimal().serialize(BigDecimal.valueOf(500L; -assertNotSame(Decimal.apply(501), bridge.decimal().deserialize( -bridge.decimal().serialize(BigDecimal.valueOf(500L; -assertEquals(Decimal.apply(-1), bridge.decimal().deserialize( -bridge.decimal().serialize(BigDecimal.valueOf(-1L; -assertEquals(Decimal.apply(Long.MAX_VALUE), bridge.decimal().deserialize( - bridge.decimal().serialize(BigDecimal.valueOf(Long.MAX_VALUE; -assertEquals(Decimal.apply(Long.MIN_VALUE), bridge.decimal().deserialize( - bridge.decimal().serialize(BigDecimal.valueOf(Long.MIN_VALUE; -assertEquals(Decimal.apply(Integer.MAX_VALUE), bridge.decimal().deserialize( - bridge.decimal().serialize(BigDecimal.valueOf(Integer.MAX_VALUE; -assertEquals(Decimal.apply(Integer.MIN_VALUE), bridge.decimal().deserialize( - bridge.decimal().serialize(BigDecimal.valueOf(Integer.MIN_VALUE; + assertTrue(bridge.decimal().deserializeToType(bridge.typeConverter(), + bridge.decimal().serialize(BigDecimal.valueOf(500L))) instanceof Decimal); +assertEquals(Decimal.apply(500), bridge.decimal().deserializeToType(bridge.typeConverter(), + bridge.decimal().serialize(BigDecimal.valueOf(500L; +assertNotSame(Decimal.apply(501), bridge.decimal().deserializeToType(bridge.typeConverter(), + bridge.decimal().serialize(BigDecimal.valueOf(500L; +assertEquals(Decimal.apply(-1), bridge.decimal().deserializeToType(bridge.typeConverter(), + bridge.decimal().serialize(BigDecimal.valueOf(-1L; +assertEquals(Decimal.apply(Long.MAX_VALUE), bridge.decimal().deserializeToT
Re: [I] Feature Request: Support Vector Type [cassandra-gocql-driver]
rcosnita commented on issue #1734: URL: https://github.com/apache/cassandra-gocql-driver/issues/1734#issuecomment-2327419047 Hello everyone, For people who are using version 4 of the protocol but need to be able to write vectors from their code base here is a possible idea: ```go package warm import ( "fmt" "github.com/gocql/gocql" ) func encInt(v int32) []byte { return []byte{byte(v >> 24), byte(v >> 16), byte(v >> 8), byte(v)} } func encFloat(v float32) []byte { return encInt(int32(math.Float32bits(v))) } type Float32Vector struct { value []float32 dimensions int } func (m *Float32Vector) MarshalCQL(info gocql.TypeInfo) ([]byte, error) { if len(m.value) != m.dimensions { return nil, fmt.Errorf("float32vector expects size %d but received size %d", m.dimensions, len(m.value)) } var results []byte for _, part := range m.value { results = append(results, encFloat(part)...) } return results, nil } func (m *Float32Vector) UnmarshalCQL(info gocql.TypeInfo, data []byte) error { panic("unmarshalling vector is not fully implemented") } func (m *Float32Vector) Value() []float32 { return m.value } func ensureVectorDimension(values []float32, dimensions int) []float32 { if len(values) == dimensions { return values } delta := dimensions - len(values) result := make([]float32, dimensions) for idx, v := range values { if idx < dimensions { result[idx] = v } else { break } } for idx := len(values); idx < delta+len(values); idx++ { result[idx] = 0.0 } return result } func FromVectorFloat32(value []float32, dimensions int) *Float32Vector { return &Float32Vector{ value: ensureVectorDimension(value, dimensions), dimensions: dimensions, } } ``` You can opt to skip the padding completely and return an error if that feels more natural for your use case. The encoding functions are extracted from the existing gocql codebase. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [I] gocql RetryPolicies still don't use query idempotence [cassandra-gocql-driver]
OleksiienkoMykyta commented on issue #1803: URL: https://github.com/apache/cassandra-gocql-driver/issues/1803#issuecomment-2326628144 @Rikkuru Thank you for providing the details. I reproduced the issue you faced, and as you said, according to documentation it's misbehavior. I added a small fix, retested it and it works properly now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [I] gocql RetryPolicies still don't use query idempotence [cassandra-gocql-driver]
Rikkuru commented on issue #1803: URL: https://github.com/apache/cassandra-gocql-driver/issues/1803#issuecomment-2325019139 > Please share an exec query you are using to reproduce the issue, if you tried multiple queries provide them as well. Hi I have an axample main.go (go 1.21.11 , github.com/gocql/gocql v1.6.0) there is a loop to wait for scylladb start and second loop to have time to down one of the nodes ``` package main import ( "context" "fmt" "time" "github.com/gocql/gocql" ) type MyRetryPolicy struct { } func (*MyRetryPolicy) Attempt(q gocql.RetryableQuery) bool { if q.Attempts() > 2 { return false } return true } func (*MyRetryPolicy) GetRetryType(error) gocql.RetryType { return gocql.Retry } type LoggingObserver struct{} func (*LoggingObserver) ObserveQuery(ctx context.Context, q gocql.ObservedQuery) { fmt.Printf("observer attempt: %d err: %s\n", q.Attempt, q.Err) } func main() { cluster := gocql.NewCluster("some-scylla", "some-scylla3", "some-scylla2") var err error var s *gocql.Session for i := 1; i <= 100; i++ { fmt.Printf("%d attempt\n", i) s, err = gocql.NewSession(*cluster) if err == nil { break } time.Sleep(1 * time.Second) } if err != nil { fmt.Printf("session err: %s\n", err) return } defer s.Close() fmt.Printf("CONNECTED!") time.Sleep(60 * time.Second) for i := 1; i <= 100; i++ { q := s.Query("INSERT INTO mykeyspace.events(event_id, time, args) VALUES (?,?,?)", 1, gocql.UUIDFromTime(time.Now()), "test") q.RetryPolicy(&MyRetryPolicy{}) q.Observer(&LoggingObserver{}) q.Consistency(gocql.All) fmt.Printf("%d QUERY IsIdempotent: %v\n", i, q.IsIdempotent()) err = q.Exec() if err != nil { fmt.Printf("exec err: %s\n", err) } time.Sleep(1 * time.Second) } } ``` cql init for scylla ``` CREATE KEYSPACE mykeyspace WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1': '3'} AND durable_writes = true; CREATE TABLE mykeyspace.events (event_id int, time timeuuid, args text, PRIMARY KEY ((event_id), time)); ``` docker-compose for nodes and main.go ``` version: '3' services: some-scylla: image: scylladb/scylla:5.1 container_name: some-scylla command: --overprovisioned 1 --smp 1 ports: - 9042:9042 some-scylla2: image: scylladb/scylla:5.1 container_name: some-scylla2 command: --seeds=some-scylla --overprovisioned 1 --smp 1 some-scylla3: image: scylladb/scylla:5.1 container_name: some-scylla3 command: --seeds=some-scylla --overprovisioned 1 --smp 1 myapp2: image: golang:1.21.11 container_name: myapp2 command: go run /app/main.go working_dir: /app volumes: - ./:/app ``` I can see in log that query is retried ``` 2024-09-02 18:40:30 90 QUERY IsIdempotent: false 2024-09-02 18:40:30 observer attempt: 0 err: Cannot achieve consistency level for cl ALL. Requires 3, alive 2 2024-09-02 18:40:30 observer attempt: 1 err: Cannot achieve consistency level for cl ALL. Requires 3, alive 2 2024-09-02 18:40:30 observer attempt: 2 err: Cannot achieve consistency level for cl ALL. Requires 3, alive 2 2024-09-02 18:40:30 exec err: Cannot achieve consistency level for cl ALL. Requires 3, alive 2 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [I] Retry type Ignore does not clear error [cassandra-gocql-driver]
Rikkuru commented on issue #1808: URL: https://github.com/apache/cassandra-gocql-driver/issues/1808#issuecomment-2324611544 Example (on empty cluster , keyspace error is ok for our test ) MyRetryPolicy can return Ignore or Rethrow - it changes nothing ``` package main import ( "context" "fmt" "time" "github.com/gocql/gocql" ) type MyRetryPolicy struct { } func (*MyRetryPolicy) Attempt(q gocql.RetryableQuery) bool { if q.Attempts() > 2 { return false } return true } func (*MyRetryPolicy) GetRetryType(error) gocql.RetryType { return gocql.Ignore } type LoggingObserver struct{} func (*LoggingObserver) ObserveQuery(ctx context.Context, q gocql.ObservedQuery) { fmt.Printf("observer attempt: %d err: %s\n", q.Attempt, q.Err) } func main() { cluster := gocql.NewCluster("node-1") cluster.Authenticator = gocql.PasswordAuthenticator{ Username: "scylla", Password: "***", } s, err := gocql.NewSession(*cluster) if err != nil { fmt.Printf("session err: %s\n", err) } defer s.Close() q := s.Query("INSERT INTO my.events(event_id, time, args) VALUES (?,?,?)", 1, gocql.UUIDFromTime(time.Now()), "test") q.RetryPolicy(&MyRetryPolicy{}) q.Observer(&LoggingObserver{}) fmt.Printf("QUERY IsIdempotent: %v\n", q.IsIdempotent()) err = q.Exec() if err != nil { fmt.Printf("exec err: %s\n", err) } } ``` Output ``` ➜ test go run main.go QUERY IsIdempotent: false observer attempt: 0 err: Keyspace my does not exist exec err: Keyspace my does not exist ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[I] Retry type Ignore does not clear error [cassandra-gocql-driver]
Rikkuru opened a new issue, #1808: URL: https://github.com/apache/cassandra-gocql-driver/issues/1808 ### What version of Cassandra are you using? Scylla Enterprise 2024.1.8 ### What version of Gocql are you using? github.com/gocql/gocql v1.6.0 ### What version of Go are you using? go 1.21.11 ### What did you do? used RetryType Ignore (https://pkg.go.dev/github.com/gocql/gocql#RetryType) on write query ### What did you expect to see? I expected that query.Exec() would not return error ### What did you see instead? the error is returned. --- Maybe the documentation is not clear on how Ignore and Rethrow are different? I thought Ignore would make it so that Exec does not return error ? Maybe this would be strange on read ops but it can be used on writes (in Downgrading CL retry policy for example ). But they both return error and I don't see any difference between those retry types Is there any difference in Rethrow and Ignore retry types ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19842: Consistency level check incorrectly passes when majo… [cassandra-analytics]
yifan-c merged PR #75: URL: https://github.com/apache/cassandra-analytics/pull/75 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19815: Decouple Cassandra types from Spark types so Cassandra types can be u… [cassandra-analytics]
yifan-c commented on code in PR #71: URL: https://github.com/apache/cassandra-analytics/pull/71#discussion_r1739179691 ## cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/data/converter/types/BinaryTraits.java: ## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.data.converter.types; + +import java.nio.ByteBuffer; + +import org.apache.cassandra.bridge.BigNumberConfig; +import org.apache.cassandra.spark.data.CqlField; +import org.apache.cassandra.spark.utils.ByteBufferUtils; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.jetbrains.annotations.NotNull; + +public interface BinaryTraits extends SparkType Review Comment: Thank you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19815: Decouple Cassandra types from Spark types so Cassandra types can be u… [cassandra-analytics]
jberragan commented on code in PR #71: URL: https://github.com/apache/cassandra-analytics/pull/71#discussion_r1739087029 ## cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/data/converter/types/BinaryTraits.java: ## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.data.converter.types; + +import java.nio.ByteBuffer; + +import org.apache.cassandra.bridge.BigNumberConfig; +import org.apache.cassandra.spark.data.CqlField; +import org.apache.cassandra.spark.utils.ByteBufferUtils; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.jetbrains.annotations.NotNull; + +public interface BinaryTraits extends SparkType Review Comment: I'll go ahead and remove it but I think trait or mixin is a common term not necessarily tied to any particular language -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [I] gocql RetryPolicies still don't use query idempotence [cassandra-gocql-driver]
Rikkuru commented on issue #1803: URL: https://github.com/apache/cassandra-gocql-driver/issues/1803#issuecomment-2320933671 ``` SELECT event_id, time, args FROM my.events WHERE user = ? AND project_id = ? AND year = ? AND week = ? ORDER BY time DESC LIMIT ? ``` Something like that on read with ``` CREATE TABLE my.events ( user text, year smallint, week tinyint, project_id smallint, time timeuuid, args map, event_id smallint, ip inet, PRIMARY KEY ((user, year, week, project_id), time) ``` I will try to find time on weekends to create an easier example in some sandbox -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[I] Trouble with DCAware or RackAware HostSelectionPolicy [cassandra-gocql-driver]
ut0mt8 opened a new issue, #1807: URL: https://github.com/apache/cassandra-gocql-driver/issues/1807 ### What version of Cassandra are you using? Cassandra 3.11.14 | CQL spec 3.4.4 | Native protocol v4 ### What version of Gocql are you using? v1.6.0 ### What version of Go are you using? go version go1.22.2 ### What did you do? I got a C$ worlwide cluster divide in 3 regions (DC) and multiple AZ (Rack). I want to connect and read the most locally possible (rack) So I'm testing ; just connecting and few select to test where read and connection are distributed ### What did you expect to see? I'm expecting that query only goes to either my local DC or local Rack. ### What did you see instead? instead they are randomly distributed among the worldwide nodes randomly Note that with Hostfilter it's work (but it's not convenient at all). It's like the driver didn't understand my cluster topology; see clues in debug (2 below nodes IP means global if read correctly the code) --- ### Describe your Cassandra cluster please provide the following information - output of `nodetool status` ` Datacenter: ap-southeast-1 == Status=Up/Down |/ State=Normal/Leaving/Joining/Moving -- AddressLoad Tokens Owns (effective) Host ID Rack UN 10.80.197.48 84.27 GiB 256 17.1% 4d1a1159-fc5d-431c-a420-8bff72ef2484 ap-southeast-1b UN 10.80.111.193 79.07 GiB 256 16.6% 0a69d219-f42f-4de9-9c8d-7efc06355e59 ap-southeast-1a UN 10.80.197.197 85.49 GiB 256 17.7% bb87ff84-4b68-46d9-87a3-e1e8d0d6fe07 ap-southeast-1b UN 10.80.197.198 78.92 GiB 256 17.6% db91859b-4759-4d69-ab99-65ffe97a9deb ap-southeast-1b UN 10.80.197.103 79.37 GiB 256 16.5% ab22efbd-585a-4c4e-adb9-b0ce4a56e8f0 ap-southeast-1b UN 10.80.197.104 66.52 GiB 256 14.6% 552d1660-fec5-4205-bfa6-992a30ec702c ap-southeast-1b UN 10.80.197.121 77.52 GiB 256 16.6% d326d322-ca5e-4d6d-b052-2971c6ea57cf ap-southeast-1b UN 10.80.111.234 77.78 GiB 256 16.1% e0974eb1-7b9b-487c-bcbe-8e16f50d6508 ap-southeast-1a UN 10.80.111.235 75.12 GiB 256 16.7% 29d1bcc3-4a60-477d-8980-d6b7083d2541 ap-southeast-1a UN 10.80.111.189 84.49 GiB 256 17.5% 6d7ea647-5a14-4462-9118-ce20ce6b4602 ap-southeast-1a UN 10.80.111.126 84.62 GiB 256 17.1% 544ce9c7-71ea-47c7-80c7-32dc06ed6b5e ap-southeast-1a UN 10.80.111.190 74.33 GiB 256 16.1% 650975be-09cf-4db5-9a45-3f99a03bdfea ap-southeast-1a Datacenter: eu-west-1 = Status=Up/Down |/ State=Normal/Leaving/Joining/Moving -- AddressLoad Tokens Owns (effective) Host ID Rack UN 10.0.111.3299.37 GiB 256 33.6% 7f54a4b0-b30a-4043-854a-7efec3604463 eu-west-1a UN 10.0.111.145 96.86 GiB 256 32.3% 1222059b-bf71-4df9-ab78-2d9fd420f29b eu-west-1a UN 10.0.111.17105.54 GiB 256 35.0% 42d01053-1bbe-4fef-aed8-8f3bc201fab5 eu-west-1a UN 10.0.111.6586.03 GiB 256 32.0% f4a7b146-80a1-408e-a31a-893e3465a271 eu-west-1a UN 10.0.111.146 91.01 GiB 256 32.9% 485334d7-11c6-4716-bbe1-4f09db7c41eb eu-west-1a UN 10.0.111.147 98.04 GiB 256 33.9% 62377c8a-61bd-4bb2-b434-c607a5eba1d2 eu-west-1a UN 10.0.197.246 95.4 GiB 256 32.4% 31905c62-dd32-4acc-b247-90899f8069ca eu-west-1b UN 10.0.197.8898.83 GiB 256 34.6% 8b42330a-e209-4e43-9174-e7b8c23eaa80 eu-west-1b UN 10.0.197.122 106.38 GiB 256 34.8% 1336edb6-5dec-4826-b263-4263b7ff24d3 eu-west-1b UN 10.0.197.7494.17 GiB 256 32.4% fe63b3cb-f129-4bd8-b879-63b74730d324 eu-west-1b UN 10.0.197.123 92.88 GiB 256 33.7% 440d4fa5-2f45-472d-8721-fbda44401804 eu-west-1b UN 10.0.197.124 89.06 GiB 256 32.4% 45ba9039-14e4-4082-9243-e5b7465be36f eu-west-1b Datacenter: us-east-1 = Status=Up/Down |/ State=Normal/Leaving/Joining/Moving -- AddressLoad Tokens Owns (effective) Host ID Rack UN 10.66.201.210 102.07 GiB 256 24.8% 7f369bcf-12c1-4d3a-aae2-53fe06bbff06 us-east-1c UN 10.66.200.192 104.88 GiB 256 25.3% dbd379f8-545a-412c-82e1-c218f19906c3 us-east-1d UN 10.66.111.5102.18 GiB 256 24.4% 0148c92a-482a-4b5d-98df-d1ac88b40ffe us-eas
Re: [I] gocql RetryPolicies still don't use query idempotence [cassandra-gocql-driver]
OleksiienkoMykyta commented on issue #1803: URL: https://github.com/apache/cassandra-gocql-driver/issues/1803#issuecomment-2320418177 Please share an exec query you are using to reproduce the issue, if you tried multiple queries provide them as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19842: Consistency level check incorrectly passes when majo… [cassandra-analytics]
yifan-c commented on code in PR #75: URL: https://github.com/apache/cassandra-analytics/pull/75#discussion_r1737411842 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java: ## @@ -227,10 +230,18 @@ protected String getCurrentKeyspaceSchema() throws Exception return schemaResponse.schema(); } -private TokenRangeReplicasResponse getTokenRangesAndReplicaSets() throws ExecutionException, InterruptedException +private TokenRangeReplicasResponse getTokenRangesAndReplicaSets() { CassandraContext context = getCassandraContext(); -return context.getSidecarClient().tokenRangeReplicas(new ArrayList<>(context.getCluster()), conf.keyspace).get(); +try +{ +return context.getSidecarClient().tokenRangeReplicas(new ArrayList<>(context.getCluster()), conf.keyspace).get(); +} +catch (ExecutionException | InterruptedException exception) +{ +LOGGER.error("Failed to get token ranges", exception); +throw new SidecarApiCallException("Failed to get token ranges", exception); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19815: Decouple Cassandra types from Spark types so Cassandra types can be u… [cassandra-analytics]
yifan-c commented on code in PR #71: URL: https://github.com/apache/cassandra-analytics/pull/71#discussion_r1737158816 ## cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/data/converter/types/BinaryTraits.java: ## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.data.converter.types; + +import java.nio.ByteBuffer; + +import org.apache.cassandra.bridge.BigNumberConfig; +import org.apache.cassandra.spark.data.CqlField; +import org.apache.cassandra.spark.utils.ByteBufferUtils; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.jetbrains.annotations.NotNull; + +public interface BinaryTraits extends SparkType Review Comment: I was confused by the name. At the first sight, I thought it was a `scala` source code, since `trait` is the scala's interface. Can you please not use the word `trait` for java interface? This patch add many traits. ## cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/reader/DataTypeSerializationTests.java: ## @@ -152,151 +153,153 @@ public void testUUID() public void testLong() { qt().forAll(TestUtils.bridges()).checkAssert(bridge -> { - assertTrue(bridge.bigint().deserialize(bridge.bigint().serialize(Long.MAX_VALUE)) instanceof Long); -assertEquals(Long.MAX_VALUE, bridge.bigint().deserialize( -ByteBuffer.allocate(8).putLong(0, Long.MAX_VALUE))); + assertTrue(bridge.bigint().deserializeToType(bridge.typeConverter(), bridge.bigint().serialize(Long.MAX_VALUE)) instanceof Long); +assertEquals(Long.MAX_VALUE, bridge.bigint().deserializeToType(bridge.typeConverter(), + ByteBuffer.allocate(8).putLong(0, Long.MAX_VALUE))); qt().forAll(integers().all()) -.checkAssert(integer -> assertEquals((long) integer, bridge.bigint().deserialize( -bridge.bigint().serialize((long) integer; -assertEquals(Long.MAX_VALUE, bridge.bigint().deserialize(bridge.bigint().serialize(Long.MAX_VALUE))); -assertEquals(Long.MIN_VALUE, bridge.bigint().deserialize(bridge.bigint().serialize(Long.MIN_VALUE))); +.checkAssert(integer -> assertEquals((long) integer, bridge.bigint().deserializeToType(bridge.typeConverter(), + bridge.bigint().serialize((long) integer; +assertEquals(Long.MAX_VALUE, bridge.bigint().deserializeToJavaType(bridge.bigint().serialize(Long.MAX_VALUE))); +assertEquals(Long.MIN_VALUE, bridge.bigint().deserializeToJavaType(bridge.bigint().serialize(Long.MIN_VALUE))); qt().withExamples(MAX_TESTS) .forAll(longs().all()) -.checkAssert(aLong -> assertEquals(aLong, bridge.bigint().deserialize( -bridge.bigint().serialize(aLong; +.checkAssert(aLong -> assertEquals(aLong, bridge.bigint().deserializeToType(bridge.typeConverter(), + bridge.bigint().serialize(aLong; }); } @Test public void testDecimal() { qt().forAll(TestUtils.bridges()).checkAssert(bridge -> { -assertTrue(bridge.decimal().deserialize( -bridge.decimal().serialize(BigDecimal.valueOf(500L))) instanceof Decimal); -assertEquals(Decimal.apply(500), bridge.decimal().deserialize( -bridge.decimal().serialize(BigDecimal.valueOf(500L; -assertNotSame(Decimal.apply(501), bridge.decimal().deserialize( -bridge.decimal().serialize(BigDecimal.valueOf(500L; -assertEquals(Decimal.apply(-1), bridge.decimal().deserialize( -bridge.decimal().serialize(BigDecimal.val
Re: [PR] CASSANDRA-19842: Consistency level check incorrectly passes when majo… [cassandra-analytics]
yifan-c commented on code in PR #75: URL: https://github.com/apache/cassandra-analytics/pull/75#discussion_r1737326950 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java: ## @@ -381,67 +392,68 @@ private InstanceAvailability determineInstanceAvailability(RingInstance instance private TokenRangeMapping getTokenRangeReplicas() { -Map> writeReplicasByDC; -Map> pendingReplicasByDC; -Map replicaMetadata; -Set blockedInstances; +return getTokenRangeReplicas(this::getTokenRangesAndReplicaSets, + this::getPartitioner, + this::getReplicationFactor, + this::instanceIsBlocked); +} + +@VisibleForTesting +static TokenRangeMapping getTokenRangeReplicas(Supplier topologySupplier, + Supplier partitionerSupplier, + Supplier replicationFactorSupplier, + Predicate blockedInstancePredicate) +{ +long start = System.nanoTime(); +TokenRangeReplicasResponse response = topologySupplier.get(); +long elapsedTimeNanos = System.nanoTime() - start; +Multimap> tokenRangesByInstance = getTokenRangesByInstance(response.writeReplicas(), + response.replicaMetadata()); +LOGGER.info("Retrieved token ranges for {} instances from write replica set in {} milliseconds", +tokenRangesByInstance.size(), +TimeUnit.NANOSECONDS.toMillis(elapsedTimeNanos)); + Set replacementInstances; -Multimap> tokenRangesByInstance; -try -{ -long start = System.nanoTime(); -TokenRangeReplicasResponse response = getTokenRangesAndReplicaSets(); -long elapsedTimeNanos = System.nanoTime() - start; -replicaMetadata = response.replicaMetadata(); - -tokenRangesByInstance = getTokenRangesByInstance(response.writeReplicas(), response.replicaMetadata()); -LOGGER.info("Retrieved token ranges for {} instances from write replica set in {} nanoseconds", -tokenRangesByInstance.size(), -elapsedTimeNanos); - -replacementInstances = response.replicaMetadata() - .values() - .stream() - .filter(m -> m.state().equalsIgnoreCase(InstanceState.REPLACING.name())) - .map(RingInstance::new) - .collect(Collectors.toSet()); - -blockedInstances = response.replicaMetadata() +replacementInstances = response.replicaMetadata() .values() .stream() + .filter(m -> m.state().equalsIgnoreCase(InstanceState.REPLACING.name())) .map(RingInstance::new) - .filter(this::instanceIsBlocked) .collect(Collectors.toSet()); -Set blockedIps = blockedInstances.stream().map(i -> i.ringInstance().address()) - .collect(Collectors.toSet()); +Set blockedInstances; +blockedInstances = response.replicaMetadata() Review Comment: done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19842: Consistency level check incorrectly passes when majo… [cassandra-analytics]
yifan-c commented on code in PR #75: URL: https://github.com/apache/cassandra-analytics/pull/75#discussion_r1737305375 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java: ## @@ -478,21 +490,21 @@ private Map> getPendingReplicas(TokenRangeReplicasResponse r // Filter writeReplica entries and the value replicaSet to only include those with pending replicas return writeReplicasByDC.entrySet() .stream() -.filter(e -> e.getValue().stream() +.filter(e -> e.getValue().stream() // todo: transformToHostWithoutPort is called twice for entries .anyMatch(v -> pendingReplicas.contains(transformToHostWithoutPort(v .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().stream() .filter(v -> pendingReplicas.contains(transformToHostWithoutPort(v))) .collect(Collectors.toSet(; } -private String transformToHostWithoutPort(String v) +private static String transformToHostWithoutPort(String v) Review Comment: talked offline. decide to not touch it as it is unrelated with the patch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19842: Consistency level check incorrectly passes when majo… [cassandra-analytics]
JeetKunDoug commented on code in PR #75: URL: https://github.com/apache/cassandra-analytics/pull/75#discussion_r1737113324 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java: ## @@ -164,7 +164,13 @@ public BulkSparkConf(SparkConf conf, Map options) this.skipExtendedVerify = MapUtils.getBoolean(options, WriterOptions.SKIP_EXTENDED_VERIFY.name(), true, "skip extended verification of SSTables by Cassandra"); this.consistencyLevel = ConsistencyLevel.CL.valueOf(MapUtils.getOrDefault(options, WriterOptions.BULK_WRITER_CL.name(), "EACH_QUORUM")); -this.localDC = MapUtils.getOrDefault(options, WriterOptions.LOCAL_DC.name(), null); +String dc = MapUtils.getOrDefault(options, WriterOptions.LOCAL_DC.name(), null); Review Comment: 👍 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java: ## @@ -381,67 +392,68 @@ private InstanceAvailability determineInstanceAvailability(RingInstance instance private TokenRangeMapping getTokenRangeReplicas() { -Map> writeReplicasByDC; -Map> pendingReplicasByDC; -Map replicaMetadata; -Set blockedInstances; +return getTokenRangeReplicas(this::getTokenRangesAndReplicaSets, + this::getPartitioner, + this::getReplicationFactor, + this::instanceIsBlocked); +} + +@VisibleForTesting +static TokenRangeMapping getTokenRangeReplicas(Supplier topologySupplier, + Supplier partitionerSupplier, + Supplier replicationFactorSupplier, + Predicate blockedInstancePredicate) +{ +long start = System.nanoTime(); +TokenRangeReplicasResponse response = topologySupplier.get(); +long elapsedTimeNanos = System.nanoTime() - start; +Multimap> tokenRangesByInstance = getTokenRangesByInstance(response.writeReplicas(), + response.replicaMetadata()); +LOGGER.info("Retrieved token ranges for {} instances from write replica set in {} milliseconds", +tokenRangesByInstance.size(), +TimeUnit.NANOSECONDS.toMillis(elapsedTimeNanos)); + Set replacementInstances; -Multimap> tokenRangesByInstance; -try -{ -long start = System.nanoTime(); -TokenRangeReplicasResponse response = getTokenRangesAndReplicaSets(); -long elapsedTimeNanos = System.nanoTime() - start; -replicaMetadata = response.replicaMetadata(); - -tokenRangesByInstance = getTokenRangesByInstance(response.writeReplicas(), response.replicaMetadata()); -LOGGER.info("Retrieved token ranges for {} instances from write replica set in {} nanoseconds", -tokenRangesByInstance.size(), -elapsedTimeNanos); - -replacementInstances = response.replicaMetadata() - .values() - .stream() - .filter(m -> m.state().equalsIgnoreCase(InstanceState.REPLACING.name())) - .map(RingInstance::new) - .collect(Collectors.toSet()); - -blockedInstances = response.replicaMetadata() +replacementInstances = response.replicaMetadata() .values() .stream() + .filter(m -> m.state().equalsIgnoreCase(InstanceState.REPLACING.name())) .map(RingInstance::new) - .filter(this::instanceIsBlocked) .collect(Collectors.toSet()); -Set blockedIps = blockedInstances.stream().map(i -> i.ringInstance().address()) - .collect(Collectors.toSet()); +Set blockedInstances; +blockedInstances = response.replicaMetadata() Review Comment: NIT: Since we're iterating over the entire `replicaMetadata()` collection and converting them all to `RingInstance` instances here, maybe move this up above the `replacementInstances` assignment and hold on to the complete list of instances? That way we only create them once, and can get rid of the `.values().stream()` duplication, and can filter on the instance state in the RingInstance rather than doing t
Re: [PR] CASSANDRA-19842: Consistency level check incorrectly passes when majo… [cassandra-analytics]
frankgh commented on code in PR #75: URL: https://github.com/apache/cassandra-analytics/pull/75#discussion_r1737165237 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java: ## @@ -478,21 +490,21 @@ private Map> getPendingReplicas(TokenRangeReplicasResponse r // Filter writeReplica entries and the value replicaSet to only include those with pending replicas return writeReplicasByDC.entrySet() .stream() -.filter(e -> e.getValue().stream() +.filter(e -> e.getValue().stream() // todo: transformToHostWithoutPort is called twice for entries .anyMatch(v -> pendingReplicas.contains(transformToHostWithoutPort(v .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().stream() .filter(v -> pendingReplicas.contains(transformToHostWithoutPort(v))) .collect(Collectors.toSet(; } -private String transformToHostWithoutPort(String v) +private static String transformToHostWithoutPort(String v) Review Comment: not ipv6 friendly? ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java: ## @@ -227,10 +230,18 @@ protected String getCurrentKeyspaceSchema() throws Exception return schemaResponse.schema(); } -private TokenRangeReplicasResponse getTokenRangesAndReplicaSets() throws ExecutionException, InterruptedException +private TokenRangeReplicasResponse getTokenRangesAndReplicaSets() { CassandraContext context = getCassandraContext(); -return context.getSidecarClient().tokenRangeReplicas(new ArrayList<>(context.getCluster()), conf.keyspace).get(); +try +{ +return context.getSidecarClient().tokenRangeReplicas(new ArrayList<>(context.getCluster()), conf.keyspace).get(); +} +catch (ExecutionException | InterruptedException exception) +{ +LOGGER.error("Failed to get token ranges", exception); +throw new SidecarApiCallException("Failed to get token ranges", exception); Review Comment: can we add the keyspace information here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19815: Decouple Cassandra types from Spark types so Cassandra types can be u… [cassandra-analytics]
yifan-c commented on code in PR #71: URL: https://github.com/apache/cassandra-analytics/pull/71#discussion_r1737138827 ## cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CqlField.java: ## @@ -473,6 +462,11 @@ public void write(Kryo kryo, Output output, CqlField field) public static UnsupportedOperationException notImplemented(CqlType type) { -return new UnsupportedOperationException(type.toString() + " type not implemented"); +return notImplemented(type.toString()); +} + +public static UnsupportedOperationException notImplemented(String type) +{ +return new UnsupportedOperationException(type + " type not implemented"); } Review Comment: Not a blocker. Just want to avoid any confusion. As long as they are used in the context of "type" only, it is good. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19815: Decouple Cassandra types from Spark types so Cassandra types can be u… [cassandra-analytics]
jberragan commented on PR #71: URL: https://github.com/apache/cassandra-analytics/pull/71#issuecomment-2318616264 Latest CircleCI: https://app.circleci.com/pipelines/github/jberragan/cassandra-analytics/73/workflows/3f5e406a-98cd-44e0-9b2a-7001975742e6 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [I] gocql RetryPolicies still don't use query idempotence [cassandra-gocql-driver]
Rikkuru commented on issue #1803: URL: https://github.com/apache/cassandra-gocql-driver/issues/1803#issuecomment-2317467101 I tested retries with gocql.DowngradingConsistencyRetryPolicy{ConsistencyLevelsToTry: []gocql.Consistency{ gocql.One}} and CL=Quorum with 2 nodes one of which is down. First attempt gets an error (read/write get the same error) This is logged in observer on first attempt ``` gogateway[12476]: 12:14:38.938400 error: scylla [4055867439]: pool [3]: host [runner1***]: attempt [0]: read query failed: Cannot achieve consistency level for cl QUORUM. Requires 2, alive 1 ``` and on retry observer logs attempt with lower CL (this one is successful) ``` gogateway[12476]: 12:14:38.944768 warn: scylla [4055867439]: pool [3]: host [runner1***]: attempt [1]: read CL downgrade QUORUM -> ONE ``` Our Query does not set Idempotent so it shouldn't have been retried ``` func (r *Reader) initQuery(req ReadRequestInterface) *gocql.Query { q := r.session.Query(getQueryString(req)) q.Observer(r) q.Prefetch(*r.cfg.ReadPrefetchFactor) q.PageSize(*r.cfg.PageSize) q.RetryPolicy(r.retryPolicy) q.SetConsistency(r.consistencyLevel) return q } ``` We also don't change DefaultIdempotence in cluster config -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [I] gocql RetryPolicies still don't use query idempotence [cassandra-gocql-driver]
Rikkuru commented on issue #1803: URL: https://github.com/apache/cassandra-gocql-driver/issues/1803#issuecomment-2317407935 > `Query failed: Operation timed out for system_schema.keyspaces - received only 1 responses from 3 CL=THREE.`, tested with `cluster.RetryPolicy = &gocql.DowngradingConsistencyRetryPolicy{ConsistencyLevelsToTry: []gocql.Consistency{gocql.Two, gocql.One}}` and `cluster.Consistency = gocql.Three`. Was this a write or read query ? Just checking because write timeout is not retried in DowngradingConsistencyRetryPolicy if some nodes responded. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [I] gocql RetryPolicies still don't use query idempotence [cassandra-gocql-driver]
OleksiienkoMykyta commented on issue #1803: URL: https://github.com/apache/cassandra-gocql-driver/issues/1803#issuecomment-2317300586 I have tested it and found that Queries is not idempotent by default `IsIdempotent = false` unless you set it explicitly. Also, I didn't found any retries, just getting a single error when the consistency level doesn't match `Query failed: Operation timed out for system_schema.keyspaces - received only 1 responses from 3 CL=THREE.`, tested with `cluster.RetryPolicy = &gocql.DowngradingConsistencyRetryPolicy{ConsistencyLevelsToTry: []gocql.Consistency{gocql.Two, gocql.One}}` and `cluster.Consistency = gocql.Three`. It looks like everything works according to spec, but maybe I missed something. Please, provide more details how you are getting retries, code example, or smth, also would be nice if you would share the results of the logs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19842: Consistency level check incorrectly passes when majo… [cassandra-analytics]
yifan-c commented on code in PR #75: URL: https://github.com/apache/cassandra-analytics/pull/75#discussion_r1735348059 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ConsistencyLevel.java: ## @@ -56,35 +56,38 @@ default void ensureNetworkTopologyStrategy(ReplicationFactor replicationFactor, cl.name() + " only make sense for NetworkTopologyStrategy keyspaces"); } +default int quorum(Set group) +{ +return group.size() / 2 + 1; +} + /** - * Checks if the consistency guarantees are maintained, given the failed, blocked and replacing instances, consistency-level and the replication-factor. - * - * - QUORUM based consistency levels check for quorum using the write-replica-set (instead of RF) as they include healthy and pending nodes. - * This is done to ensure that writes go to a quorum of healthy nodes while accounting for potential failure in pending nodes becoming healthy. - * - ONE and TWO consistency guarantees are maintained by ensuring that the failures leave us with at-least the corresponding healthy - * (and non-pending) nodes. - * - * For both the above cases, blocked instances are also considered as failures while performing consistency checks. - * Write replicas are adjusted to exclude replacement nodes for consistency checks, if we have replacement nodes that are not among the failed instances. - * This is to ensure that we are writing to sufficient non-replacement nodes as replacements can potentially fail leaving us with fewer nodes. - * + * Check if the consistency guarantee is maintained for write + * + * The check takes the writeReplicas, pendingReplicas and failedReplicas of the token range into consideration. + * The input parameters are expected to be prefiltered by datacenter, + * making the implementations of QUORUM, LOCAL_QUORUM and EACH_QUORUM identical. + * When pendingReplicas is non-empty, minimum number of success is increased by the size of pendingReplicas, + * keeping the same semantics defined in Cassandra. + * See https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/db/ConsistencyLevel.java#L172";>blockForWrite + * + * For example, say RF == 3, and there is 2 pending replicas. + * + * QUORUM write consistency requires at least 4 replicas to succeed, i.e. quorum(3) + 2, tolerating 1 failure + * ONE write consistency requires at least 3 replicas to succeed, i.e. 1 + 2, tolerating 2 failure + * TWO write consistency requires at least 4 replicas to succeed, i.e. 2 + 2, tolerating 1 failure + * * - * @param writeReplicasthe set of replicas for write operations - * @param pendingReplicas the set of replicas pending status - * @param replacementInstances the set of instances that are replacing the other instances - * @param blockedInstances the set of instances that have been blocked for the bulk operation - * @param failedInstanceIpsthe set of instances where there were failures - * @param localDC the local datacenter used for consistency level, or {@code null} if not provided - * @return {@code true} if the consistency has been met, {@code false} otherwise + * @param writeReplicasthe set of write replicas. The set includes the pending replicas + * @param pendingReplicas the set of pending replicas, which increases the minimum number of success required + * @param failedReplicas the set of replicas fail to write to + * @return true if the write consistency has been met, false otherwise */ -boolean checkConsistency(Set writeReplicas, - Set pendingReplicas, - Set replacementInstances, - Set blockedInstances, - Set failedInstanceIps, - String localDC); // todo: simplify the parameter list. not all are required in impl +boolean checkWriteConsistency(Set writeReplicas, + Set pendingReplicas, + Set failedReplicas); Review Comment: largely refactored the classes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19842: Consistency level check incorrectly passes when majo… [cassandra-analytics]
yifan-c commented on code in PR #75: URL: https://github.com/apache/cassandra-analytics/pull/75#discussion_r1735137494 ## cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java: ## @@ -82,34 +86,41 @@ public static Collection data() return clsToFailures.stream().map(List::toArray).collect(Collectors.toList()); } -private void setup(ConsistencyLevel.CL consistencyLevel, List failuresPerDc) +private void setup(ConsistencyLevel.CL consistencyLevel) { digestAlgorithm = new XXHash32DigestAlgorithm(); tableWriter = new MockTableWriter(folder); writerContext = new MockBulkWriterContext(TOKEN_RANGE_MAPPING, "cassandra-4.0.0", consistencyLevel); transportContext = (TransportContext.DirectDataBulkWriterContext) writerContext.transportContext(); } +@Test +public void test() throws IOException, ExecutionException, InterruptedException Review Comment: It was there to debug a specific input set. Removed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19842: Consistency level check incorrectly passes when majo… [cassandra-analytics]
yifan-c commented on code in PR #75: URL: https://github.com/apache/cassandra-analytics/pull/75#discussion_r1735122562 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ConsistencyLevel.java: ## @@ -56,35 +56,38 @@ default void ensureNetworkTopologyStrategy(ReplicationFactor replicationFactor, cl.name() + " only make sense for NetworkTopologyStrategy keyspaces"); } +default int quorum(Set group) Review Comment: yeah. it is refactored and forgot to remove. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19815: Decouple Cassandra types from Spark types so Cassandra types can be u… [cassandra-analytics]
jberragan commented on code in PR #71: URL: https://github.com/apache/cassandra-analytics/pull/71#discussion_r1734986657 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java: ## @@ -325,7 +329,10 @@ private void maybeRebuildClusteringKeys(@NotNull ByteBuffer columnNameBuf) { Object newObj = deserialize(field, ByteBufferUtils.extractComponent(columnNameBuf, index++)); Object oldObj = values[field.position()]; -if (newRow || oldObj == null || newObj == null || !field.equals(newObj, oldObj)) +// Historically, we compare equality of clustering keys using the Spark types +// to determine if we have moved to a new 'row'. We could also compare using the Cassandra types +// or the raw ByteBuffers before converting to Spark types - this mightb be slightly more performant. +if (newRow || oldObj == null || newObj == null || !sparkSqlTypeConverter.toSparkType(field.type()).equals(newObj, oldObj)) Review Comment: TODO: convert CqlType to SparkType once in the constructor and re-use. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] CASSANDRA-19842: Consistency level check incorrectly passes when majo… [cassandra-analytics]
JeetKunDoug commented on code in PR #75: URL: https://github.com/apache/cassandra-analytics/pull/75#discussion_r1731504484 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java: ## @@ -381,67 +383,79 @@ private InstanceAvailability determineInstanceAvailability(RingInstance instance private TokenRangeMapping getTokenRangeReplicas() { -Map> writeReplicasByDC; -Map> pendingReplicasByDC; -Map replicaMetadata; -Set blockedInstances; +Supplier topologySupplier = () -> { +try +{ +return getTokenRangesAndReplicaSets(); +} +catch (ExecutionException | InterruptedException exception) +{ +LOGGER.error("Failed to get token ranges, ", exception); +throw new RuntimeException(exception); Review Comment: I'd recommend moving the try/catch into `getTokenRangesAndReplicaSets` and throwing a new exception extending `AnalyticsException` here - that way, in the call itself, you can just pass `this::getTokenRangesAndReplicaSets` as the first parameter, which makes this feel a bit more consistent with the rest of the code. ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidator.java: ## @@ -117,6 +109,21 @@ public static void updateFailureHandler(CommitResult commitResult, }); } +private static void logFailedRanges(Logger logger, String phase, + List.ConsistencyFailurePerRange> failedRanges) +{ +for (ReplicaAwareFailureHandler.ConsistencyFailurePerRange failedRange : failedRanges) +{ +for (RingInstance instance : failedRange.failuresPerInstance.instances()) +{ +logger.error("Failed in phase {} for {} on {}", Review Comment: I know this was just a refactoring of code, but logging the **reason** for failure here would be super-useful. ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ConsistencyLevel.java: ## @@ -56,35 +56,38 @@ default void ensureNetworkTopologyStrategy(ReplicationFactor replicationFactor, cl.name() + " only make sense for NetworkTopologyStrategy keyspaces"); } +default int quorum(Set group) Review Comment: This is unused, as the static `quorum` method below is the only usage of `quorum` in the codebase. ## cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java: ## @@ -82,34 +86,41 @@ public static Collection data() return clsToFailures.stream().map(List::toArray).collect(Collectors.toList()); } -private void setup(ConsistencyLevel.CL consistencyLevel, List failuresPerDc) +private void setup(ConsistencyLevel.CL consistencyLevel) { digestAlgorithm = new XXHash32DigestAlgorithm(); tableWriter = new MockTableWriter(folder); writerContext = new MockBulkWriterContext(TOKEN_RANGE_MAPPING, "cassandra-4.0.0", consistencyLevel); transportContext = (TransportContext.DirectDataBulkWriterContext) writerContext.transportContext(); } +@Test +public void test() throws IOException, ExecutionException, InterruptedException Review Comment: Why does this test exist separate from the parameterized one? Can you please add both some documentation and a more descriptive method name than `test` here so if it fails it's understandable what it's testing and why it is different than the other tests? ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ConsistencyLevel.java: ## @@ -56,35 +56,38 @@ default void ensureNetworkTopologyStrategy(ReplicationFactor replicationFactor, cl.name() + " only make sense for NetworkTopologyStrategy keyspaces"); } +default int quorum(Set group) +{ +return group.size() / 2 + 1; +} + /** - * Checks if the consistency guarantees are maintained, given the failed, blocked and replacing instances, consistency-level and the replication-factor. - * - * - QUORUM based consistency levels check for quorum using the write-replica-set (instead of RF) as they include healthy and pending nodes. - * This is done to ensure that writes go to a quorum of healthy nodes while accounting for potential failure in pending nodes becoming healthy. - * - ONE and TWO consistency guarantees are maintained by ensuring that the failures leave us with at-least the corresponding healthy - * (and non-pending) nodes. - * - * For both the above cases, blocked instances are also considered as failures while performing consistency checks. - * Write replicas are adjusted to exc
Re: [I] gocql RetryPolicies still don't use query idempotence [cassandra-gocql-driver]
Rikkuru commented on issue #1803: URL: https://github.com/apache/cassandra-gocql-driver/issues/1803#issuecomment-2315267132 > Do you expect the retry of non-idempotent queries or not? I I expect that https://pkg.go.dev/github.com/gocql/gocql#Query with [IsIdempotent](https://github.com/gocql/gocql/blob/v1.6.0/session.go#L1221) [¶](https://pkg.go.dev/github.com/gocql/gocql#Query.IsIdempotent) returning false will not be retried. You are right that it is probably better to check it outside of retry policy. But I am sure it is not checked in gocql v1.6.0 > The ticket you mentioned was implementing speculative retry policy, which is something different. From this issue https://github.com/apache/cassandra-gocql-driver/issues/1153 I understood that https://github.com/apache/cassandra-gocql-driver/issues/1083 contains work on speculative execution and https://github.com/apache/cassandra-gocql-driver/issues/1154 was supposed support non-idempotent queries in retries ? Otherwise issue https://github.com/apache/cassandra-gocql-driver/issues/1153 should not have been closed. > For write operations it depends on idempotency I am not sure we should think too much about read/write ops here , gocql Query is the same for them and does not provide convenient way to check if query writes or reads data. I think gocql should just check method IsIdempotent and trust user to fill this field correctly ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org