This is an automated email from the ASF dual-hosted git repository. ycai pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push: new 4624a17 CASSANDRA-19933: Support aggregated consistency validation for multiple clusters (#86) 4624a17 is described below commit 4624a17098e055e0abf9a6025451d4352cb9c147 Author: Yifan Cai <y...@apache.org> AuthorDate: Tue Sep 24 23:50:35 2024 -0700 CASSANDRA-19933: Support aggregated consistency validation for multiple clusters (#86) Patch by Yifan Cai; Reviewed by Francisco Guerrero for CASSANDRA-19933 --- CHANGES.txt | 1 + .../spark/common/model/CassandraInstance.java | 5 - cassandra-analytics-core/build.gradle | 4 + .../spark/bulkwriter/BulkWriteValidator.java | 8 +- .../spark/bulkwriter/BulkWriterContextFactory.java | 12 +- .../bulkwriter/CassandraBulkSourceRelation.java | 3 +- .../cassandra/spark/bulkwriter/RecordWriter.java | 3 +- .../cassandra/spark/bulkwriter/RingInstance.java | 1 + .../MultiClusterReplicaAwareFailureHandler.java | 135 ++++++++++++++ .../token/ReplicaAwareFailureHandler.java | 152 +++++----------- .../SingleClusterReplicaAwareFailureHandler.java | 196 +++++++++++++++++++++ .../spark/bulkwriter/token/TokenRangeMapping.java | 24 ++- .../spark/sparksql/CassandraDataSink.java | 2 +- .../java/org/apache/cassandra/spark/TestUtils.java | 94 ++++++---- .../spark/bulkwriter/BulkWriteValidatorTest.java | 3 +- .../spark/bulkwriter/CassandraClusterInfoTest.java | 8 +- .../spark/bulkwriter/DirectStreamSessionTest.java | 3 +- .../ImportCompletionCoordinatorTest.java | 4 +- .../spark/bulkwriter/MockBulkWriterContext.java | 1 + .../spark/bulkwriter/RingInstanceTest.java | 102 ++--------- .../bulkwriter/StreamSessionConsistencyTest.java | 4 +- .../spark/bulkwriter/TokenRangeMappingUtils.java | 4 +- .../blobupload/BlobStreamSessionTest.java | 4 +- .../token/FailureHandlerTextContext.java | 38 ++++ ...MultiClusterReplicaAwareFailureHandlerTest.java | 155 ++++++++++++++++ ...ingleClusterReplicaAwareFailureHandlerTest.java | 121 +++++++++++++ .../bulkwriter/token/TokenRangeMappingTest.java | 8 +- 27 files changed, 821 insertions(+), 274 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index a45595e..2cb65f5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.0.0 + * Support aggregated consistency validation for multiple clusters (CASSANDRA-19933) * Add transport extension for coordinated write (CASSANDRA-19923) * Support data partitioning for multiple clusters coordinated write (CASSANDRA-19910) * Add writer option COORDINATED_WRITE_CONFIG to define coordinated write to multiple Cassandra clusters (CASSANDRA-19909) diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/CassandraInstance.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/CassandraInstance.java index c69b6eb..fed2f2d 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/CassandraInstance.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/CassandraInstance.java @@ -29,11 +29,6 @@ public interface CassandraInstance extends TokenOwner */ @Nullable String clusterId(); - default boolean hasClusterId() - { - return clusterId() != null; - } - String nodeName(); String datacenter(); diff --git a/cassandra-analytics-core/build.gradle b/cassandra-analytics-core/build.gradle index b0f9c9c..e33a1ac 100644 --- a/cassandra-analytics-core/build.gradle +++ b/cassandra-analytics-core/build.gradle @@ -147,6 +147,10 @@ test { destination = destDir } } + + testLogging { + events "started", "passed", "skipped", "failed" + } } /* Start: JaCoCo check */ diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidator.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidator.java index 56e1989..a8377a6 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidator.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidator.java @@ -32,16 +32,18 @@ import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler; import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; import org.apache.cassandra.spark.exception.ConsistencyNotSatisfiedException; +/** + * A validator for bulk write result against the target cluster(s). + */ public class BulkWriteValidator { private static final Logger LOGGER = LoggerFactory.getLogger(BulkWriteValidator.class); + private final ClusterInfo cluster; private final ReplicaAwareFailureHandler<RingInstance> failureHandler; private final JobInfo job; private String phase = "Initializing"; - private final ClusterInfo cluster; - public BulkWriteValidator(BulkWriterContext bulkWriterContext, ReplicaAwareFailureHandler<RingInstance> failureHandler) { @@ -58,7 +60,7 @@ public class BulkWriteValidator ClusterInfo cluster) { List<ReplicaAwareFailureHandler<RingInstance>.ConsistencyFailurePerRange> failedRanges = - failureHandler.getFailedRanges(tokenRangeMapping, job.getConsistencyLevel(), job.getLocalDC(), cluster.replicationFactor()); + failureHandler.getFailedRanges(tokenRangeMapping, job, cluster); if (failedRanges.isEmpty()) { diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContextFactory.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContextFactory.java index 9d710bd..72b360c 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContextFactory.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContextFactory.java @@ -38,8 +38,6 @@ public class BulkWriterContextFactory { private static final Logger LOGGER = LoggerFactory.getLogger(BulkWriterContextFactory.class); - public static final BulkWriterContextFactory INSTANCE = new BulkWriterContextFactory(); - @NotNull public BulkWriterContext createBulkWriterContext(@NotNull SparkContext sparkContext, @NotNull Map<String, String> options, @@ -47,9 +45,9 @@ public class BulkWriterContextFactory { Preconditions.checkNotNull(schema); - BulkWriterContext bulkWriterContext; - BulkSparkConf conf = new BulkSparkConf(sparkContext.getConf(), options); + BulkSparkConf conf = createBulkSparkConf(sparkContext, options); int sparkDefaultParallelism = sparkContext.defaultParallelism(); + BulkWriterContext bulkWriterContext; if (conf.isCoordinatedWriteConfigured()) { LOGGER.info("Initializing bulk writer context for multi-clusters coordinated write"); @@ -67,6 +65,12 @@ public class BulkWriterContextFactory return bulkWriterContext; } + @NotNull + protected BulkSparkConf createBulkSparkConf(@NotNull SparkContext sparkContext, @NotNull Map<String, String> options) + { + return new BulkSparkConf(sparkContext.getConf(), options); + } + @NotNull protected BulkWriterContext createBulkWriterContext(@NotNull BulkSparkConf conf, StructType schema, int sparkDefaultParallelism) { diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java index 3eb39c1..51d18b8 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java @@ -39,6 +39,7 @@ import o.a.c.sidecar.client.shaded.common.data.RestoreJobStatus; import o.a.c.sidecar.client.shaded.common.request.data.CreateRestoreJobRequestPayload; import o.a.c.sidecar.client.shaded.common.request.data.UpdateRestoreJobRequestPayload; import org.apache.cassandra.spark.bulkwriter.blobupload.BlobStreamResult; +import org.apache.cassandra.spark.bulkwriter.token.MultiClusterReplicaAwareFailureHandler; import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler; import org.apache.cassandra.spark.common.client.ClientException; import org.apache.cassandra.spark.transports.storage.extensions.StorageTransportConfiguration; @@ -78,7 +79,7 @@ public class CassandraBulkSourceRelation extends BaseRelation implements Inserta this.sqlContext = sqlContext; this.sparkContext = JavaSparkContext.fromSparkContext(sqlContext.sparkContext()); this.broadcastContext = sparkContext.<BulkWriterContext>broadcast(writerContext); - ReplicaAwareFailureHandler<RingInstance> failureHandler = new ReplicaAwareFailureHandler<>(writerContext.cluster().getPartitioner()); + ReplicaAwareFailureHandler<RingInstance> failureHandler = new MultiClusterReplicaAwareFailureHandler<>(writerContext.cluster().getPartitioner()); this.writeValidator = new BulkWriteValidator(writerContext, failureHandler); this.simpleTaskScheduler = new SimpleTaskScheduler(); } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java index 2cd6abf..fe73b7b 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java @@ -46,6 +46,7 @@ import com.google.common.collect.Range; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.spark.bulkwriter.token.MultiClusterReplicaAwareFailureHandler; import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler; import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; import org.apache.cassandra.spark.bulkwriter.util.TaskContextUtils; @@ -96,7 +97,7 @@ public class RecordWriter this.columnNames = columnNames; this.taskContextSupplier = taskContextSupplier; this.tableWriterFactory = tableWriterFactory; - this.failureHandler = new ReplicaAwareFailureHandler<>(writerContext.cluster().getPartitioner()); + this.failureHandler = new MultiClusterReplicaAwareFailureHandler<>(writerContext.cluster().getPartitioner()); this.writeValidator = new BulkWriteValidator(writerContext, failureHandler); this.digestAlgorithm = this.writerContext.job().digestAlgorithmSupplier().get(); this.streamFutures = new HashMap<>(); diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java index 362e04c..2304b55 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java @@ -80,6 +80,7 @@ public class RingInstance implements CassandraInstance, Serializable } @Override + @Nullable public String clusterId() { return clusterId; diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/MultiClusterReplicaAwareFailureHandler.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/MultiClusterReplicaAwareFailureHandler.java new file mode 100644 index 0000000..36c0e93 --- /dev/null +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/MultiClusterReplicaAwareFailureHandler.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.bulkwriter.token; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Range; + +import org.apache.cassandra.spark.bulkwriter.ClusterInfo; +import org.apache.cassandra.spark.bulkwriter.JobInfo; +import org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CassandraClusterInfoGroup; +import org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf; +import org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf.ClusterConf; +import org.apache.cassandra.spark.common.model.CassandraInstance; +import org.apache.cassandra.spark.data.ReplicationFactor; +import org.apache.cassandra.spark.data.partitioner.Partitioner; +import org.jetbrains.annotations.Nullable; + +/** + * A ReplicaAwareFailureHandler that can handle multiple clusters, including the case of single cluster. + * + * @param <I> CassandraInstance type + */ +public class MultiClusterReplicaAwareFailureHandler<I extends CassandraInstance> extends ReplicaAwareFailureHandler<I> +{ + // default failure handler for CassandraInstance w/o specifying the belonging clusterId + private final SingleClusterReplicaAwareFailureHandler<I> defaultFailureHandler; + private final Map<String, SingleClusterReplicaAwareFailureHandler<I>> failureHandlerPerCluster = new HashMap<>(); + + private final Partitioner partitioner; + + public MultiClusterReplicaAwareFailureHandler(Partitioner partitioner) + { + this.partitioner = partitioner; + this.defaultFailureHandler = new SingleClusterReplicaAwareFailureHandler<>(partitioner, null); + } + + @Override + public synchronized void addFailure(Range<BigInteger> tokenRange, I instance, String errMessage) + { + String clusterId = instance.clusterId(); + if (clusterId != null) + { + Preconditions.checkState(defaultFailureHandler.isEmpty(), + "Cannot track failures from both instances with and without clusterId"); + ReplicaAwareFailureHandler<I> handler = failureHandlerPerCluster + .computeIfAbsent(clusterId, k -> new SingleClusterReplicaAwareFailureHandler<>(partitioner, clusterId)); + handler.addFailure(tokenRange, instance, errMessage); + } + else + { + Preconditions.checkState(failureHandlerPerCluster.isEmpty(), + "Cannot track failures from both instances with and without clusterId"); + defaultFailureHandler.addFailure(tokenRange, instance, errMessage); + } + } + + @Override + public synchronized Set<I> getFailedInstances() + { + if (failureHandlerPerCluster.isEmpty()) + { + return defaultFailureHandler.getFailedInstances(); + } + + // failed instances merged from all clusters + HashSet<I> failedInstances = new HashSet<>(); + failureHandlerPerCluster.values().forEach(handler -> failedInstances.addAll(handler.getFailedInstances())); + return failedInstances; + } + + @Override + public synchronized List<ReplicaAwareFailureHandler<I>.ConsistencyFailurePerRange> + getFailedRanges(TokenRangeMapping<I> tokenRangeMapping, + JobInfo job, + ClusterInfo cluster) + { + if (failureHandlerPerCluster.isEmpty()) + { + return defaultFailureHandler.getFailedRanges(tokenRangeMapping, job, cluster); + } + + List<ReplicaAwareFailureHandler<I>.ConsistencyFailurePerRange> failurePerRanges = new ArrayList<>(); + CoordinatedWriteConf coordinatedWriteConf = job.coordinatedWriteConf(); + Preconditions.checkState(coordinatedWriteConf != null, + "CoordinatedWriteConf is absent for multi-cluster write"); + Preconditions.checkState(cluster instanceof CassandraClusterInfoGroup, + "Not a CassandraClusterInfoGroup for multi-cluster write"); + CassandraClusterInfoGroup group = (CassandraClusterInfoGroup) cluster; + failureHandlerPerCluster.forEach((clusterId, handler) -> { + ClusterConf clusterConf = Objects.requireNonNull(coordinatedWriteConf.cluster(clusterId), + () -> "ClusterConf is not found for " + clusterId); + ClusterInfo clusterInfo = Objects.requireNonNull(group.cluster(clusterId), + () -> "ClusterInfo is not found for " + clusterId); + ReplicationFactor rf = clusterInfo.replicationFactor(); + failurePerRanges.addAll(handler.getFailedRangesInternal(tokenRangeMapping, job.getConsistencyLevel(), clusterConf.localDc(), rf)); + }); + return failurePerRanges; + } + + @Override + protected List<ReplicaAwareFailureHandler<I>.ConsistencyFailurePerRange> + getFailedRangesInternal(TokenRangeMapping<I> tokenRangeMapping, + ConsistencyLevel cl, + @Nullable String localDC, + ReplicationFactor replicationFactor) + { + throw new UnsupportedOperationException("Not implemented for MultiClusterReplicaAwareFailureHandler"); + } +} diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ReplicaAwareFailureHandler.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ReplicaAwareFailureHandler.java index f821ba9..45067b2 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ReplicaAwareFailureHandler.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ReplicaAwareFailureHandler.java @@ -20,39 +20,37 @@ package org.apache.cassandra.spark.bulkwriter.token; import java.math.BigInteger; -import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.function.BiConsumer; -import java.util.stream.Collectors; -import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; import com.google.common.collect.Range; -import com.google.common.collect.RangeMap; -import com.google.common.collect.TreeRangeMap; +import org.apache.cassandra.spark.bulkwriter.ClusterInfo; +import org.apache.cassandra.spark.bulkwriter.JobInfo; import org.apache.cassandra.spark.common.model.CassandraInstance; -import org.apache.cassandra.spark.common.model.NodeStatus; import org.apache.cassandra.spark.data.ReplicationFactor; -import org.apache.cassandra.spark.data.partitioner.Partitioner; import org.jetbrains.annotations.Nullable; -public class ReplicaAwareFailureHandler<Instance extends CassandraInstance> +/** + * Handles write failures of a single cluster + * @param <I> CassandraInstance type + */ +public abstract class ReplicaAwareFailureHandler<I extends CassandraInstance> { public class FailuresPerInstance { - private final Multimap<Instance, String> errorMessagesPerInstance; + private final Multimap<I, String> errorMessagesPerInstance; public FailuresPerInstance() { this.errorMessagesPerInstance = ArrayListMultimap.create(); } - public FailuresPerInstance(Multimap<Instance, String> errorMessagesPerInstance) + public FailuresPerInstance(Multimap<I, String> errorMessagesPerInstance) { this.errorMessagesPerInstance = ArrayListMultimap.create(errorMessagesPerInstance); } @@ -62,26 +60,32 @@ public class ReplicaAwareFailureHandler<Instance extends CassandraInstance> return new FailuresPerInstance(this.errorMessagesPerInstance); } - public Set<Instance> instances() + public Set<I> instances() { return errorMessagesPerInstance.keySet(); } - public void addErrorForInstance(Instance instance, String errorMessage) + public void addErrorForInstance(I instance, String errorMessage) { errorMessagesPerInstance.put(instance, errorMessage); } - public boolean hasError(Instance instance) + public boolean hasError(I instance) { return errorMessagesPerInstance.containsKey(instance) && !errorMessagesPerInstance.get(instance).isEmpty(); } - public void forEachInstance(BiConsumer<Instance, Collection<String>> instanceErrorsConsumer) + public void forEachInstance(BiConsumer<I, Collection<String>> instanceErrorsConsumer) { errorMessagesPerInstance.asMap().forEach(instanceErrorsConsumer); } + + @Override + public String toString() + { + return errorMessagesPerInstance.toString(); + } } public class ConsistencyFailurePerRange @@ -96,13 +100,18 @@ public class ReplicaAwareFailureHandler<Instance extends CassandraInstance> } } - // failures captures per each range; note that failures do not necessarily fail a range, as long as consistency level is considered - private final RangeMap<BigInteger, FailuresPerInstance> rangeFailuresMap = TreeRangeMap.create(); - - public ReplicaAwareFailureHandler(Partitioner partitioner) - { - rangeFailuresMap.put(Range.openClosed(partitioner.minToken(), partitioner.maxToken()), new FailuresPerInstance()); - } + /** + * Given the number of failed instances for each token range, validates if the consistency guarantees are maintained for the job + * + * @param tokenRangeMapping the mapping of token ranges to a Cassandra instance + * @param job the job to verify + * @param cluster cluster info + * @return list of failed token ranges that break consistency. This should ideally be empty for a + * successful operation. + */ + public abstract List<ConsistencyFailurePerRange> getFailedRanges(TokenRangeMapping<I> tokenRangeMapping, + JobInfo job, + ClusterInfo cluster); /** * Adds a new token range as a failed token range, with errors on given instance. @@ -115,32 +124,16 @@ public class ReplicaAwareFailureHandler<Instance extends CassandraInstance> * one returned from failedRangesMap map. As our range could be overlapping partially and the map could be used * by other range. * - * @param tokenRange the range which failed - * @param casInstance the instance on which the range failed - * @param errMessage the error that occurred for this particular range/instance pair + * @param tokenRange the range which failed + * @param instance the instance on which the range failed + * @param errMessage the error that occurred for this particular range/instance pair */ - public synchronized void addFailure(Range<BigInteger> tokenRange, Instance casInstance, String errMessage) - { - RangeMap<BigInteger, FailuresPerInstance> overlappingFailures = rangeFailuresMap.subRangeMap(tokenRange); - RangeMap<BigInteger, FailuresPerInstance> mappingsToAdd = TreeRangeMap.create(); + public abstract void addFailure(Range<BigInteger> tokenRange, I instance, String errMessage); - for (Map.Entry<Range<BigInteger>, FailuresPerInstance> entry : overlappingFailures.asMapOfRanges().entrySet()) - { - FailuresPerInstance newErrorMap = entry.getValue().copy(); - newErrorMap.addErrorForInstance(casInstance, errMessage); - mappingsToAdd.put(entry.getKey(), newErrorMap); - } - rangeFailuresMap.putAll(mappingsToAdd); - } - - public Set<Instance> getFailedInstances() - { - return rangeFailuresMap.asMapOfRanges().values() - .stream() - .map(FailuresPerInstance::instances) - .flatMap(Collection::stream) - .collect(Collectors.toSet()); - } + /** + * @return the set of all failed instances + */ + public abstract Set<I> getFailedInstances(); /** * Given the number of failed instances for each token range, validates if the consistency guarantees are maintained @@ -149,69 +142,12 @@ public class ReplicaAwareFailureHandler<Instance extends CassandraInstance> * @param tokenRangeMapping the mapping of token ranges to a Cassandra instance * @param cl the desired consistency level * @param localDC the local datacenter + * @param replicationFactor replication of the enclosing keyspace * @return list of failed token ranges that break consistency. This should ideally be empty for a * successful operation. */ - public synchronized List<ConsistencyFailurePerRange> - getFailedRanges(TokenRangeMapping<Instance> tokenRangeMapping, - ConsistencyLevel cl, - @Nullable String localDC, - ReplicationFactor replicationFactor) - { - Preconditions.checkArgument((cl.isLocal() && localDC != null) || (!cl.isLocal() && localDC == null), - "Not a valid pair of consistency level configuration. " + - "Consistency level: " + cl + " localDc: " + localDC); - List<ConsistencyFailurePerRange> failedRanges = new ArrayList<>(); - - for (Map.Entry<Range<BigInteger>, FailuresPerInstance> failedRangeEntry : rangeFailuresMap.asMapOfRanges() - .entrySet()) - { - Range<BigInteger> range = failedRangeEntry.getKey(); - FailuresPerInstance errorMap = failedRangeEntry.getValue(); - Set<Instance> failedReplicas = errorMap.instances() - .stream() - .filter(errorMap::hasError) - .collect(Collectors.toSet()); - - // no failures found for the range; skip consistency check on this one and move on - if (failedReplicas.isEmpty()) - { - continue; - } - - tokenRangeMapping.getWriteReplicasOfRange(range, localDC) - .forEach((subrange, liveAndDown) -> { - if (!checkSubrange(cl, localDC, replicationFactor, liveAndDown, failedReplicas)) - { - failedRanges.add(new ConsistencyFailurePerRange(subrange, errorMap)); - } - }); - } - - return failedRanges; - } - - /** - * Check whether a CL can be satisfied for each sub-range. - * @return true if consistency is satisfied; false otherwise. - */ - private boolean checkSubrange(ConsistencyLevel cl, - @Nullable String localDC, - ReplicationFactor replicationFactor, - Set<Instance> liveAndDown, - Set<Instance> failedReplicas) - { - Set<Instance> liveReplicas = liveAndDown.stream() - .filter(instance -> instance.nodeStatus() == NodeStatus.UP) - .collect(Collectors.toSet()); - Set<Instance> pendingReplicas = liveAndDown.stream() - .filter(instance -> instance.nodeState().isPending) - .collect(Collectors.toSet()); - // success is assumed if not failed - Set<Instance> succeededReplicas = liveReplicas.stream() - .filter(instance -> !failedReplicas.contains(instance)) - .collect(Collectors.toSet()); - - return cl.canBeSatisfied(succeededReplicas, pendingReplicas, replicationFactor, localDC); - } + protected abstract List<ConsistencyFailurePerRange> getFailedRangesInternal(TokenRangeMapping<I> tokenRangeMapping, + ConsistencyLevel cl, + @Nullable String localDC, + ReplicationFactor replicationFactor); } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/SingleClusterReplicaAwareFailureHandler.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/SingleClusterReplicaAwareFailureHandler.java new file mode 100644 index 0000000..7a35091 --- /dev/null +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/SingleClusterReplicaAwareFailureHandler.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.bulkwriter.token; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import javax.annotation.concurrent.GuardedBy; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Range; +import com.google.common.collect.RangeMap; +import com.google.common.collect.TreeRangeMap; + +import org.apache.cassandra.spark.bulkwriter.ClusterInfo; +import org.apache.cassandra.spark.bulkwriter.JobInfo; +import org.apache.cassandra.spark.common.model.CassandraInstance; +import org.apache.cassandra.spark.common.model.NodeStatus; +import org.apache.cassandra.spark.data.ReplicationFactor; +import org.apache.cassandra.spark.data.partitioner.Partitioner; +import org.jetbrains.annotations.Nullable; + +/** + * ReplicaAwareFailureHandler for a single cluster + * The handler should be constructed by {@link MultiClusterReplicaAwareFailureHandler} only, hence package-private + * @param <I> CassandraInstance type + */ +class SingleClusterReplicaAwareFailureHandler<I extends CassandraInstance> extends ReplicaAwareFailureHandler<I> +{ + // failures captures per each range; note that failures do not necessarily fail a range, as long as consistency level is considered + @GuardedBy("this") + private final RangeMap<BigInteger, FailuresPerInstance> rangeFailuresMap = TreeRangeMap.create(); + + @GuardedBy("this") + private boolean isEmpty = true; + + @Nullable + private final String clusterId; + + SingleClusterReplicaAwareFailureHandler(Partitioner partitioner, @Nullable String clusterId) + { + this.clusterId = clusterId; + rangeFailuresMap.put(Range.openClosed(partitioner.minToken(), partitioner.maxToken()), new FailuresPerInstance()); + } + + /** + * Check whether the handler contains any failure + * @return true if there is at least a failure; false otherwise. + */ + public boolean isEmpty() + { + return isEmpty; + } + + @Override + public List<ReplicaAwareFailureHandler<I>.ConsistencyFailurePerRange> + getFailedRanges(TokenRangeMapping<I> tokenRangeMapping, JobInfo job, ClusterInfo cluster) + { + return getFailedRangesInternal(tokenRangeMapping, job.getConsistencyLevel(), job.getLocalDC(), cluster.replicationFactor()); + } + + @Override + public synchronized void addFailure(Range<BigInteger> tokenRange, I instance, String errMessage) + { + RangeMap<BigInteger, FailuresPerInstance> overlappingFailures = rangeFailuresMap.subRangeMap(tokenRange); + RangeMap<BigInteger, FailuresPerInstance> mappingsToAdd = TreeRangeMap.create(); + + for (Map.Entry<Range<BigInteger>, FailuresPerInstance> entry : overlappingFailures.asMapOfRanges().entrySet()) + { + FailuresPerInstance newErrorMap = entry.getValue().copy(); + newErrorMap.addErrorForInstance(instance, errMessage); + mappingsToAdd.put(entry.getKey(), newErrorMap); + } + rangeFailuresMap.putAll(mappingsToAdd); + isEmpty = false; + } + + @Override + public synchronized Set<I> getFailedInstances() + { + if (isEmpty) + { + return Collections.emptySet(); + } + + return rangeFailuresMap.asMapOfRanges() + .values() + .stream() + .map(FailuresPerInstance::instances) + .flatMap(Collection::stream) + .collect(Collectors.toSet()); + } + + @Override + protected synchronized List<ReplicaAwareFailureHandler<I>.ConsistencyFailurePerRange> + getFailedRangesInternal(TokenRangeMapping<I> tokenRangeMapping, + ConsistencyLevel cl, + @Nullable String localDC, + ReplicationFactor replicationFactor) + { + Preconditions.checkArgument((cl.isLocal() && localDC != null) || (!cl.isLocal() && localDC == null), + "Not a valid pair of consistency level configuration. " + + "Consistency level: " + cl + " localDc: " + localDC); + List<ConsistencyFailurePerRange> failedRanges = new ArrayList<>(); + + if (isEmpty) + { + return failedRanges; + } + + for (Map.Entry<Range<BigInteger>, FailuresPerInstance> failedRangeEntry : rangeFailuresMap.asMapOfRanges().entrySet()) + { + Range<BigInteger> range = failedRangeEntry.getKey(); + FailuresPerInstance errorMap = failedRangeEntry.getValue(); + Set<I> failedReplicas = errorMap.instances() + .stream() + .filter(errorMap::hasError) + .collect(Collectors.toSet()); + + // no failures found for the range; skip consistency check on this one and move on + if (failedReplicas.isEmpty()) + { + continue; + } + + tokenRangeMapping.getWriteReplicasOfRange(range, instance -> { + boolean shouldKeep = localDC == null || instance.datacenter().equalsIgnoreCase(localDC); + if (shouldKeep && clusterId != null) + { + shouldKeep = clusterId.equalsIgnoreCase(instance.clusterId()); + } + return shouldKeep; + }) + .forEach((subrange, liveAndDown) -> { + if (!checkSubrange(cl, localDC, replicationFactor, liveAndDown, failedReplicas)) + { + failedRanges.add(new ConsistencyFailurePerRange(subrange, errorMap)); + } + }); + } + + return failedRanges; + } + + /** + * Check whether a CL can be satisfied for each sub-range. + * @return true if consistency is satisfied; false otherwise. + */ + private boolean checkSubrange(ConsistencyLevel cl, + @Nullable String localDC, + ReplicationFactor replicationFactor, + Set<I> liveAndDown, + Set<I> failedReplicas) + { + Set<I> pendingReplicas = new HashSet<>(); + // success is assumed if not failed + Set<I> succeededReplicas = new HashSet<>(); + for (I instance : liveAndDown) + { + if (instance.nodeStatus() == NodeStatus.UP && !failedReplicas.contains(instance)) + { + succeededReplicas.add(instance); + } + if (instance.nodeState().isPending) + { + pendingReplicas.add(instance); + } + } + + return cl.canBeSatisfied(succeededReplicas, pendingReplicas, replicationFactor, localDC); + } +} diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMapping.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMapping.java index 367c385..caf1f65 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMapping.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMapping.java @@ -29,9 +29,11 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Function; +import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; @@ -204,21 +206,35 @@ public class TokenRangeMapping<I extends CassandraInstance> implements Serializa * @param localDc local DC name to filter out non-local-DC instances. The parameter is optional. When not present, i.e. null, no filtering is applied * @return the write replicas of sub-ranges */ + @VisibleForTesting public Map<Range<BigInteger>, Set<I>> getWriteReplicasOfRange(Range<BigInteger> range, @Nullable String localDc) + { + return getWriteReplicasOfRange(range, instance -> instance.datacenter().equalsIgnoreCase(localDc)); + } + + /** + * Get write replica-sets of sub-ranges that overlap with the input range. + * + * @param range range to check. The range can potentially overlap with multiple ranges. + * For example, a down node adds one failure of a token range that covers multiple primary token ranges that replicate to it. + * @param instanceFilter predicate to filter the instances + * @return the write replicas of sub-ranges + */ + public Map<Range<BigInteger>, Set<I>> getWriteReplicasOfRange(Range<BigInteger> range, @Nullable Predicate<I> instanceFilter) { Map<Range<BigInteger>, List<I>> subRangeReplicas = instancesByTokenRange.subRangeMap(range).asMapOfRanges(); - Function<List<I>, Set<I>> inDcInstances = instances -> { - if (localDc != null) + Function<List<I>, Set<I>> filterAndTransform = instances -> { + if (instanceFilter != null) { return instances.stream() - .filter(instance -> instance.datacenter().equalsIgnoreCase(localDc)) + .filter(instanceFilter) .collect(Collectors.toSet()); } return new HashSet<>(instances); }; return subRangeReplicas.entrySet() .stream() - .collect(Collectors.toMap(Map.Entry::getKey, entry -> inDcInstances.apply(entry.getValue()))); + .collect(Collectors.toMap(Map.Entry::getKey, entry -> filterAndTransform.apply(entry.getValue()))); } // Used for writes diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/CassandraDataSink.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/CassandraDataSink.java index bdd6bc1..1791dae 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/CassandraDataSink.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/CassandraDataSink.java @@ -103,6 +103,6 @@ public class CassandraDataSink implements DataSourceRegister, CreatableRelationP @NotNull protected BulkWriterContextFactory factory() { - return BulkWriterContextFactory.INSTANCE; + return new BulkWriterContextFactory(); } } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java index 60266f0..d7c6bdf 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java @@ -38,6 +38,7 @@ import java.util.stream.Stream; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Range; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; @@ -67,13 +68,26 @@ import static org.quicktheories.generators.SourceDSL.arbitrary; public final class TestUtils { - private static final SparkSession SPARK = SparkSession.builder() - .appName("Java Test") - .config("spark.master", "local") - // Spark is not case-sensitive by default, but we want to make it case-sensitive for - // the quoted identifiers tests where we test mixed case - .config("spark.sql.caseSensitive", "True") - .getOrCreate(); + private static class Holder + { + private static final SparkSession SPARK_SESSION = createSession(); + + static SparkSession createSession() + { + return SparkSession.builder() + .appName("Java Test") + .config("spark.master", "local") + // Spark is not case-sensitive by default, but we want to make it case-sensitive for + // the quoted identifiers tests where we test mixed case + .config("spark.sql.caseSensitive", "True") + .getOrCreate(); + } + } + + private static SparkSession session() + { + return Holder.SPARK_SESSION; + } private TestUtils() { @@ -162,15 +176,17 @@ public final class TestUtils Set<CqlField.CqlUdt> udts, @Nullable String statsClass) { - DataFrameReader frameReader = SPARK.read().format("org.apache.cassandra.spark.sparksql.LocalPartitionSizeSource") - .option("keyspace", keyspace) - .option("createStmt", createStmt) - .option("dirs", dir.toAbsolutePath().toString()) - .option("version", version.toString()) - .option("useBufferingInputStream", true) // Use in the test system to test the BufferingInputStream - .option("partitioner", partitioner.name()) - .option("udts", udts.stream().map(f -> f.createStatement(bridge.cassandraTypes(), keyspace)) - .collect(Collectors.joining("\n"))); + DataFrameReader frameReader = session() + .read() + .format("org.apache.cassandra.spark.sparksql.LocalPartitionSizeSource") + .option("keyspace", keyspace) + .option("createStmt", createStmt) + .option("dirs", dir.toAbsolutePath().toString()) + .option("version", version.toString()) + .option("useBufferingInputStream", true) // Use in the test system to test the BufferingInputStream + .option("partitioner", partitioner.name()) + .option("udts", udts.stream().map(f -> f.createStatement(bridge.cassandraTypes(), keyspace)) + .collect(Collectors.joining("\n"))); if (statsClass != null) { frameReader = frameReader.option("statsClass", statsClass); @@ -180,11 +196,12 @@ public final class TestUtils public static Dataset<Row> read(Path path, StructType schema) { - return SPARK.read() - .format("parquet") - .option("path", path.toString()) - .schema(schema) - .load(); + return session() + .read() + .format("parquet") + .option("path", path.toString()) + .schema(schema) + .load(); } // CHECKSTYLE IGNORE: Method with many parameters @@ -200,18 +217,19 @@ public final class TestUtils @Nullable String filterExpression, @Nullable String... columns) { - DataFrameReader frameReader = SPARK.read() - .format("org.apache.cassandra.spark.sparksql.LocalDataSource") - .option("keyspace", keyspace) - .option("createStmt", createStatement) - .option("dirs", directory.toAbsolutePath().toString()) - .option("version", version.toString()) - .option("useBufferingInputStream", true) // Use in the test system to test the BufferingInputStream - .option("partitioner", partitioner.name()) - .option(SchemaFeatureSet.LAST_MODIFIED_TIMESTAMP.optionName(), addLastModifiedTimestampColumn) - .option("udts", udts.stream() - .map(udt -> udt.createStatement(bridge.cassandraTypes(), keyspace)) - .collect(Collectors.joining("\n"))); + DataFrameReader frameReader = session() + .read() + .format("org.apache.cassandra.spark.sparksql.LocalDataSource") + .option("keyspace", keyspace) + .option("createStmt", createStatement) + .option("dirs", directory.toAbsolutePath().toString()) + .option("version", version.toString()) + .option("useBufferingInputStream", true) // Use in the test system to test the BufferingInputStream + .option("partitioner", partitioner.name()) + .option(SchemaFeatureSet.LAST_MODIFIED_TIMESTAMP.optionName(), addLastModifiedTimestampColumn) + .option("udts", udts.stream() + .map(udt -> udt.createStatement(bridge.cassandraTypes(), keyspace)) + .collect(Collectors.joining("\n"))); if (statsClass != null) { frameReader = frameReader.option("statsClass", statsClass); @@ -364,4 +382,14 @@ public final class TestUtils .substring(0, size) .getBytes(StandardCharsets.UTF_8); } + + public static Range<BigInteger> range(long start, long end) + { + return range(BigInteger.valueOf(start), BigInteger.valueOf(end)); + } + + public static Range<BigInteger> range(BigInteger start, BigInteger end) + { + return Range.openClosed(start, end); + } } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidatorTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidatorTest.java index ef6cb13..5b33859 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidatorTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidatorTest.java @@ -26,6 +26,7 @@ import java.util.UUID; import org.junit.jupiter.api.Test; import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel; +import org.apache.cassandra.spark.bulkwriter.token.MultiClusterReplicaAwareFailureHandler; import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler; import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; import org.apache.cassandra.spark.data.QualifiedTableName; @@ -77,7 +78,7 @@ class BulkWriteValidatorTest when(mockJobInfo.jobKeepAliveMinutes()).thenReturn(-1); when(mockWriterContext.job()).thenReturn(mockJobInfo); - ReplicaAwareFailureHandler<RingInstance> failureHandler = new ReplicaAwareFailureHandler<>(Partitioner.Murmur3Partitioner); + ReplicaAwareFailureHandler<RingInstance> failureHandler = new MultiClusterReplicaAwareFailureHandler<>(Partitioner.Murmur3Partitioner); BulkWriteValidator writerValidator = new BulkWriteValidator(mockWriterContext, failureHandler); assertThatThrownBy(() -> writerValidator.validateClOrFail(topology)) .isExactlyInstanceOf(ConsistencyNotSatisfiedException.class) diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfoTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfoTest.java index 846b897..8497f81 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfoTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfoTest.java @@ -19,20 +19,19 @@ package org.apache.cassandra.spark.bulkwriter; -import java.math.BigInteger; import java.time.Duration; import java.time.Instant; import java.util.Collections; import java.util.concurrent.CompletableFuture; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Range; import org.junit.jupiter.api.Test; import o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse; import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; import org.apache.cassandra.spark.exception.TimeSkewTooLargeException; +import static org.apache.cassandra.spark.TestUtils.range; import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; @@ -76,11 +75,6 @@ public class CassandraClusterInfoTest return new MockClusterInfoForTimeSkew(allowanceMinutes, remoteNow); } - private static Range<BigInteger> range(long start, long end) - { - return Range.openClosed(BigInteger.valueOf(start), BigInteger.valueOf(end)); - } - private static class MockClusterInfoForTimeSkew extends CassandraClusterInfo { private CassandraContext cassandraContext; diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/DirectStreamSessionTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/DirectStreamSessionTest.java index 95af8d9..2c85e75 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/DirectStreamSessionTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/DirectStreamSessionTest.java @@ -39,6 +39,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.apache.cassandra.spark.bulkwriter.token.MultiClusterReplicaAwareFailureHandler; import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler; import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; import org.apache.cassandra.spark.common.model.CassandraInstance; @@ -278,7 +279,7 @@ public class DirectStreamSessionTest @NotNull private ReplicaAwareFailureHandler<RingInstance> replicaAwareFailureHandler() { - return new ReplicaAwareFailureHandler<>(writerContext.cluster().getPartitioner()); + return new MultiClusterReplicaAwareFailureHandler<>(writerContext.cluster().getPartitioner()); } private DirectStreamSession createStreamSession(MockTableWriter.Creator writerCreator) diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinatorTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinatorTest.java index 7386c69..5a15198 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinatorTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinatorTest.java @@ -56,7 +56,7 @@ import org.apache.cassandra.spark.bulkwriter.blobupload.BlobStreamResult; import org.apache.cassandra.spark.bulkwriter.blobupload.CreatedRestoreSlice; import org.apache.cassandra.spark.bulkwriter.blobupload.StorageClient; import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel; -import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler; +import org.apache.cassandra.spark.bulkwriter.token.MultiClusterReplicaAwareFailureHandler; import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; import org.apache.cassandra.spark.data.QualifiedTableName; import org.apache.cassandra.spark.data.ReplicationFactor; @@ -124,7 +124,7 @@ class ImportCompletionCoordinatorTest when(mockClusterInfo.getTokenRangeMapping(anyBoolean())).thenReturn(topology); when(mockWriterContext.job()).thenReturn(mockJobInfo); - writerValidator = new BulkWriteValidator(mockWriterContext, new ReplicaAwareFailureHandler<>(Partitioner.Murmur3Partitioner)); + writerValidator = new BulkWriteValidator(mockWriterContext, new MultiClusterReplicaAwareFailureHandler<>(Partitioner.Murmur3Partitioner)); // clients will not be used in this test class; mock is at the API method level BlobDataTransferApi api = new BlobDataTransferApi(mockJobInfo, mock(SidecarClient.class), mock(StorageClient.class)); diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java index 9fd90ad..6f33183 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java @@ -227,6 +227,7 @@ public class MockBulkWriterContext implements BulkWriterContext, ClusterInfo, Jo return consistencyLevel; } + // todo: yifan - unused? public void setConsistencyLevel(ConsistencyLevel.CL consistencyLevel) { this.consistencyLevel = consistencyLevel; diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java index 022b44e..bd9efbb 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java @@ -25,45 +25,27 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.math.BigInteger; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.Maps; import com.google.common.collect.Multimap; -import com.google.common.collect.Range; import org.junit.jupiter.api.Test; import o.a.c.sidecar.client.shaded.common.response.data.RingEntry; -import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel.CL; -import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler; -import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; -import org.apache.cassandra.spark.data.ReplicationFactor; -import org.apache.cassandra.spark.data.partitioner.Partitioner; import org.jetbrains.annotations.NotNull; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; public class RingInstanceTest { - public static final String DATACENTER_1 = "DATACENTER1"; - - static List<RingInstance> getInstances(BigInteger[] tokens, String datacenter) + public static RingInstance instance(BigInteger token, String nodeName, String datacenter) { - List<RingInstance> instances = new ArrayList<>(); - for (int token = 0; token < tokens.length; token++) - { - instances.add(instance(tokens[token], "node-" + token, datacenter)); - } - return instances; + return instance(token, nodeName, datacenter, null); } - static RingInstance instance(BigInteger token, String nodeName, String datacenter) + public static RingInstance instance(BigInteger token, String nodeName, String datacenter, String clusterId) { return new RingInstance(new RingEntry.Builder() .datacenter(datacenter) @@ -77,49 +59,8 @@ public class RingInstanceTest .owns("") .load("") .hostId("") - .build()); - } - - static BigInteger[] getTokens(Partitioner partitioner, int nodes) - { - BigInteger[] tokens = new BigInteger[nodes]; - - for (int node = 0; node < nodes; node++) - { - tokens[node] = partitioner == Partitioner.Murmur3Partitioner - ? getMurmur3Token(nodes, node) - : getRandomToken(nodes, node); - } - return tokens; - } - - static BigInteger getRandomToken(int nodes, int index) - { - // ((2^127 / nodes) * i) - return ((BigInteger.valueOf(2).pow(127)).divide(BigInteger.valueOf(nodes))).multiply(BigInteger.valueOf(index)); - } - - static BigInteger getMurmur3Token(int nodes, int index) - { - // (((2^64 / n) * i) - 2^63) - return (((BigInteger.valueOf(2).pow(64)).divide(BigInteger.valueOf(nodes))) - .multiply(BigInteger.valueOf(index))).subtract(BigInteger.valueOf(2).pow(63)); - } - - private static ReplicaAwareFailureHandler<RingInstance> ntsStrategyHandler(Partitioner partitioner) - { - return new ReplicaAwareFailureHandler<>(partitioner); - } - - private static Map<String, Integer> ntsOptions(String[] names, int[] values) - { - assert names.length == values.length : "Invalid replication options - array lengths do not match"; - Map<String, Integer> options = Maps.newHashMap(); - for (int name = 0; name < names.length; name++) - { - options.put(names[name], values[name]); - } - return options; + .build(), + clusterId); } @Test @@ -177,14 +118,14 @@ public class RingInstanceTest } @Test - public void testHasClusterId() + public void testGetClusterId() { RingEntry ringEntry = mockRingEntry(); RingInstance instance = new RingInstance(ringEntry); - assertFalse(instance.hasClusterId()); + assertNull(instance.clusterId()); RingInstance instanceWithClusterId = new RingInstance(ringEntry, "cluster1"); - assertTrue(instanceWithClusterId.hasClusterId()); + assertNotNull(instanceWithClusterId.clusterId()); assertEquals("cluster1", instanceWithClusterId.clusterId()); } @@ -220,30 +161,7 @@ public class RingInstanceTest assertEquals(1, newErrorMap.keySet().size()); } - @Test - public void testMultipleFailuresSingleInstanceSucceedRF3() - { - Partitioner partitioner = Partitioner.Murmur3Partitioner; - BigInteger[] tokens = getTokens(partitioner, 5); - List<RingInstance> instances = getInstances(tokens, DATACENTER_1); - RingInstance instance1 = instances.get(0); - RingInstance instance2 = instance(tokens[0], instance1.nodeName(), instance1.datacenter()); - ReplicaAwareFailureHandler<RingInstance> replicationFactor3 = ntsStrategyHandler(partitioner); - ReplicationFactor repFactor = new ReplicationFactor(ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy, - ntsOptions(new String[]{DATACENTER_1 }, new int[]{3 })); - Multimap<RingInstance, Range<BigInteger>> tokenRanges = TokenRangeMappingUtils.setupTokenRangeMap(partitioner, repFactor, instances); - TokenRangeMapping<RingInstance> tokenRange = new TokenRangeMapping<>(partitioner, - tokenRanges, - Collections.emptySet()); - - // This test proves that for any RF3 keyspace - replicationFactor3.addFailure(Range.openClosed(tokens[0], tokens[1]), instance1, "Complete Failure"); - replicationFactor3.addFailure(Range.openClosed(tokens[0], tokens[0].add(BigInteger.ONE)), instance2, "Failure 1"); - replicationFactor3.addFailure(Range.openClosed(tokens[0].add(BigInteger.ONE), - tokens[0].add(BigInteger.valueOf(2L))), instance2, "Failure 2"); - assertTrue(replicationFactor3.getFailedRanges(tokenRange, CL.LOCAL_QUORUM, DATACENTER_1, repFactor).isEmpty()); - } @NotNull private static RingEntry mockRingEntry() diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java index 5b0b927..9972c54 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java @@ -42,7 +42,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel; -import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler; +import org.apache.cassandra.spark.bulkwriter.token.MultiClusterReplicaAwareFailureHandler; import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; import org.apache.cassandra.spark.common.model.CassandraInstance; import org.apache.cassandra.spark.data.ReplicationFactor; @@ -222,7 +222,7 @@ public class StreamSessionConsistencyTest transportContext, "sessionId", RANGE, - new ReplicaAwareFailureHandler<>(writerContext.cluster().getPartitioner()), + new MultiClusterReplicaAwareFailureHandler<>(writerContext.cluster().getPartitioner()), executor); } } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java index 6c1c1f2..e907f10 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java @@ -217,13 +217,13 @@ public final class TokenRangeMappingUtils replicas.add("localhost" + (i + r) % instancesCount + ":9042"); } Map<String, List<String>> replicasPerDc = new HashMap<>(); - replicasPerDc.put("ignored", replicas); + replicasPerDc.put("dc1", replicas); ReplicaInfo ri = new ReplicaInfo(String.valueOf(startToken), String.valueOf(endToken), replicasPerDc); replicaInfoList.add(ri); String address = "localhost" + i; int port = 9042; String addressWithPort = address + ":" + port; - ReplicaMetadata rm = new ReplicaMetadata("NORMAL", "UP", address, address, 9042, "ignored"); + ReplicaMetadata rm = new ReplicaMetadata("NORMAL", "UP", address, address, 9042, "dc1"); replicaMetadata.put(addressWithPort, rm); startToken = endToken; } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobStreamSessionTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobStreamSessionTest.java index 0957258..9411350 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobStreamSessionTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobStreamSessionTest.java @@ -57,6 +57,7 @@ import org.apache.cassandra.spark.bulkwriter.RingInstance; import org.apache.cassandra.spark.bulkwriter.SortedSSTableWriter; import org.apache.cassandra.spark.bulkwriter.TokenRangeMappingUtils; import org.apache.cassandra.spark.bulkwriter.TransportContext; +import org.apache.cassandra.spark.bulkwriter.token.MultiClusterReplicaAwareFailureHandler; import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler; import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; import org.apache.cassandra.spark.common.client.ClientException; @@ -90,7 +91,8 @@ class BlobStreamSessionTest TokenRangeMapping<RingInstance> topology = TokenRangeMappingUtils.buildTokenRangeMapping(0, ImmutableMap.of("DC1", 3), 3); MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(topology); BulkWriterContext spiedWriterContext = spy(bulkWriterContext); - ReplicaAwareFailureHandler<RingInstance> replicaAwareFailureHandler = new ReplicaAwareFailureHandler<>(bulkWriterContext.cluster().getPartitioner()); + ReplicaAwareFailureHandler<RingInstance> replicaAwareFailureHandler = + new MultiClusterReplicaAwareFailureHandler<>(bulkWriterContext.cluster().getPartitioner()); Range<BigInteger> range = Range.range(BigInteger.valueOf(100L), BoundType.OPEN, BigInteger.valueOf(199L), BoundType.CLOSED); JobInfo job = mock(JobInfo.class); when(job.getRestoreJobId()).thenReturn(jobId); diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/FailureHandlerTextContext.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/FailureHandlerTextContext.java new file mode 100644 index 0000000..dbf43ab --- /dev/null +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/FailureHandlerTextContext.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.bulkwriter.token; + +import org.apache.cassandra.spark.bulkwriter.ClusterInfo; +import org.apache.cassandra.spark.bulkwriter.JobInfo; +import org.apache.cassandra.spark.bulkwriter.RingInstance; + +class FailureHandlerTextContext +{ + TokenRangeMapping<RingInstance> topology; + JobInfo jobInfo; + ClusterInfo clusterInfo; + + FailureHandlerTextContext(TokenRangeMapping<RingInstance> topology, JobInfo jobInfo, ClusterInfo clusterInfo) + { + this.topology = topology; + this.jobInfo = jobInfo; + this.clusterInfo = clusterInfo; + } +} diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/MultiClusterReplicaAwareFailureHandlerTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/MultiClusterReplicaAwareFailureHandlerTest.java new file mode 100644 index 0000000..71e99db --- /dev/null +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/MultiClusterReplicaAwareFailureHandlerTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.bulkwriter.token; + +import java.math.BigInteger; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Range; +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.spark.bulkwriter.ClusterInfo; +import org.apache.cassandra.spark.bulkwriter.JobInfo; +import org.apache.cassandra.spark.bulkwriter.RingInstance; +import org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CassandraClusterInfoGroup; +import org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf; +import org.apache.cassandra.spark.data.ReplicationFactor; +import org.apache.cassandra.spark.data.partitioner.Partitioner; + +import static org.apache.cassandra.spark.TestUtils.range; +import static org.apache.cassandra.spark.bulkwriter.RingInstanceTest.instance; +import static org.apache.cassandra.spark.bulkwriter.token.TokenRangeMappingTest.createTestMapping; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class MultiClusterReplicaAwareFailureHandlerTest +{ + private static final String DATACENTER_1 = "dc1"; + private final Partitioner partitioner = Partitioner.Murmur3Partitioner; + private MultiClusterReplicaAwareFailureHandler<RingInstance> handler = new MultiClusterReplicaAwareFailureHandler<>(partitioner); + + @Test + void testAddFailuresFromBothInstancesWithAndWithoutClusterIdFails() + { + RingInstance instance = instance(BigInteger.valueOf(10), "node1", DATACENTER_1); + RingInstance instanceWithClusterId = new RingInstance(instance.ringEntry(), "testCluster"); + assertThatNoException().isThrownBy(() -> handler.addFailure(range(0, 10), instance, "failure")); + assertThatThrownBy(() -> handler.addFailure(range(0, 10), instanceWithClusterId, "failure")) + .isExactlyInstanceOf(IllegalStateException.class) + .hasMessage("Cannot track failures from both instances with and without clusterId"); + + // create a new handler and add failures in the other order + handler = new MultiClusterReplicaAwareFailureHandler<>(partitioner); + assertThatNoException().isThrownBy(() -> handler.addFailure(range(0, 10), instanceWithClusterId, "failure")); + assertThatThrownBy(() -> handler.addFailure(range(0, 10), instance, "failure")) + .isExactlyInstanceOf(IllegalStateException.class) + .hasMessage("Cannot track failures from both instances with and without clusterId"); + } + + @Test + void testGetFailedInstanceFromMultipleClusters() + { + RingInstance instanceC1 = instance(BigInteger.ZERO, "node1", DATACENTER_1, "cluster1"); + RingInstance instanceC2 = instance(BigInteger.ZERO, "node1", DATACENTER_1, "cluster2"); + handler.addFailure(range(-10, 0), instanceC1, "failure in cluster1 instance"); + handler.addFailure(range(-10, 0), instanceC2, "failure in cluster2 instance"); + Set<RingInstance> failedInstances = handler.getFailedInstances(); + assertThat(failedInstances) + .hasSize(2) + .containsExactlyInAnyOrder(instanceC1, instanceC2); + } + + @Test + void testGetFailedRangesOfClusters() + { + testFailedRangeCheckWithTwoClusters(ctx -> { + Range<BigInteger> range = range(1, 10); // range value that does not overlap with the ranges in topology + Map<Range<BigInteger>, Set<RingInstance>> writeReplicasOfRangeAcrossClusters = ctx.topology.getWriteReplicasOfRange(range, DATACENTER_1); + assertThat(writeReplicasOfRangeAcrossClusters).hasSize(1); + Map<String, List<RingInstance>> writeReplicasPerCluster = writeReplicasOfRangeAcrossClusters + .values() + .stream() + .flatMap(Set::stream) + .collect(Collectors.groupingBy(RingInstance::clusterId)); + assertThat(writeReplicasPerCluster).hasSize(2).containsKeys("cluster1", "cluster2"); + handler.addFailure(range, writeReplicasPerCluster.get("cluster1").get(0), "failure in cluster1"); + handler.addFailure(range, writeReplicasPerCluster.get("cluster2").get(0), "failure in cluster2"); + assertThat(handler.getFailedRanges(ctx.topology, ctx.jobInfo, ctx.clusterInfo)) + .describedAs("Each cluster has only 1 failure of the range, which is acceptable for LOCAL_QUORUM.") + .isEmpty(); + + // now cluster1 has 2 failures + handler.addFailure(range, writeReplicasPerCluster.get("cluster1").get(1), "another failure in cluster1"); + List<ReplicaAwareFailureHandler<RingInstance>.ConsistencyFailurePerRange> + failedRanges = handler.getFailedRanges(ctx.topology, ctx.jobInfo, ctx.clusterInfo); + assertThat(failedRanges) + .describedAs("Cluster1 has failed range") + .hasSize(1); + Set<RingInstance> failedInstances = failedRanges.get(0).failuresPerInstance.instances(); + assertThat(failedInstances) + .hasSize(2) + .containsExactlyInAnyOrder(writeReplicasPerCluster.get("cluster1").get(0), + writeReplicasPerCluster.get("cluster1").get(1)); + + // now cluster2 has 2 failures too + handler.addFailure(range, writeReplicasPerCluster.get("cluster2").get(1), "another failure in cluster2"); + failedRanges = handler.getFailedRanges(ctx.topology, ctx.jobInfo, ctx.clusterInfo); + assertThat(failedRanges) + .describedAs("Both clusters have failed ranges") + .hasSize(2); + Set<RingInstance> failedInstancesInCluster2 = failedRanges.get(1).failuresPerInstance.instances(); + assertThat(failedInstancesInCluster2) + .hasSize(2) + .containsExactlyInAnyOrder(writeReplicasPerCluster.get("cluster2").get(0), + writeReplicasPerCluster.get("cluster2").get(1)); + }); + } + + private void testFailedRangeCheckWithTwoClusters(Consumer<FailureHandlerTextContext> test) + { + TokenRangeMapping<RingInstance> cluster1Topology = createTestMapping(0, 5, partitioner, "cluster1"); + TokenRangeMapping<RingInstance> cluster2Topology = createTestMapping(1, 5, partitioner, "cluster2"); + TokenRangeMapping<RingInstance> consolidatedTopology = TokenRangeMapping.consolidate(Arrays.asList(cluster1Topology, cluster2Topology)); + JobInfo jobInfo = mock(JobInfo.class); + when(jobInfo.getConsistencyLevel()).thenReturn(ConsistencyLevel.CL.LOCAL_QUORUM); + CoordinatedWriteConf cwc = mock(CoordinatedWriteConf.class); + CoordinatedWriteConf.ClusterConf cc = mock(CoordinatedWriteConf.ClusterConf.class); + when(cc.localDc()).thenReturn(DATACENTER_1); + when(cwc.cluster(any())).thenReturn(cc); + when(jobInfo.coordinatedWriteConf()).thenReturn(cwc); + CassandraClusterInfoGroup group = mock(CassandraClusterInfoGroup.class); + ReplicationFactor rf = new ReplicationFactor(ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy, + ImmutableMap.of(DATACENTER_1, 3)); + ClusterInfo clusterInfo = mock(ClusterInfo.class); + when(clusterInfo.replicationFactor()).thenReturn(rf); + when(group.cluster(any())).thenReturn(clusterInfo); // return the same clusterinfo for both clusters (for test simplicity) + test.accept(new FailureHandlerTextContext(consolidatedTopology, jobInfo, group)); + } +} diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/SingleClusterReplicaAwareFailureHandlerTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/SingleClusterReplicaAwareFailureHandlerTest.java new file mode 100644 index 0000000..3813dc1 --- /dev/null +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/SingleClusterReplicaAwareFailureHandlerTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.bulkwriter.token; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Range; +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.spark.bulkwriter.ClusterInfo; +import org.apache.cassandra.spark.bulkwriter.JobInfo; +import org.apache.cassandra.spark.bulkwriter.RingInstance; +import org.apache.cassandra.spark.data.ReplicationFactor; +import org.apache.cassandra.spark.data.partitioner.Partitioner; + +import static org.apache.cassandra.spark.TestUtils.range; +import static org.apache.cassandra.spark.bulkwriter.RingInstanceTest.instance; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class SingleClusterReplicaAwareFailureHandlerTest +{ + private static final String DATACENTER_1 = "dc1"; + private final Partitioner partitioner = Partitioner.Murmur3Partitioner; + private final SingleClusterReplicaAwareFailureHandler<RingInstance> handler = new SingleClusterReplicaAwareFailureHandler<>(partitioner, null); + + @Test + void testGetFailedInstances() + { + RingInstance instance1 = instance(BigInteger.valueOf(10), "instance1", DATACENTER_1); + RingInstance instance2 = instance(BigInteger.valueOf(20), "instance2", DATACENTER_1); + assertThat(handler.isEmpty()).isTrue(); + handler.addFailure(range(0, 10), instance1, "instance 1 fails"); + assertThat(handler.isEmpty()).isFalse(); + handler.addFailure(range(0, 10), instance2, "instance 2 fails"); + handler.addFailure(range(10, 20), instance2, "instance 2 fails"); + assertThat(handler.getFailedInstances()) + .hasSize(2) + .containsExactlyInAnyOrder(instance1, instance2); + } + + @Test + public void testMinorityFailuresProduceNoFailedRanges() + { + testFailedRangeCheck(ctx -> { + Range<BigInteger> range = range(0, 10); + Map<Range<BigInteger>, Set<RingInstance>> writeReplicasOfRange = ctx.topology.getWriteReplicasOfRange(range, DATACENTER_1); + assertThat(writeReplicasOfRange).hasSize(1); + List<RingInstance> writeReplicas = new ArrayList<>(writeReplicasOfRange.values().iterator().next()); + assertThat(writeReplicas).hasSize(3); + RingInstance instance = writeReplicas.get(1); + // one failure per each distinct range; it should not fail as CL is LOCAL_QUORUM + Range<BigInteger> range1 = range(0, 3); + Range<BigInteger> range2 = range(3, 4); + Range<BigInteger> range3 = range(5, 6); + handler.addFailure(range1, instance, "Failure 1"); + handler.addFailure(range2, instance, "Failure 2"); + handler.addFailure(range3, instance, "Failure 3"); + assertThat(handler.getFailedRanges(ctx.topology, ctx.jobInfo, ctx.clusterInfo)).isEmpty(); + }); + } + + @Test + public void testMajorityFailuresProduceFailedRanges() + { + testFailedRangeCheck(ctx -> { + Range<BigInteger> range = range(0, 10); + Map<Range<BigInteger>, Set<RingInstance>> writeReplicasOfRange = ctx.topology.getWriteReplicasOfRange(range, DATACENTER_1); + assertThat(writeReplicasOfRange).hasSize(1); + List<RingInstance> writeReplicas = new ArrayList<>(writeReplicasOfRange.values().iterator().next()); + assertThat(writeReplicas).hasSize(3); + // the majority of the replicas of range + RingInstance instance1 = writeReplicas.get(1); + RingInstance instance2 = writeReplicas.get(2); + + handler.addFailure(range, instance1, "fails on instance 1"); + handler.addFailure(range, instance2, "fails on instance 2"); + List<ReplicaAwareFailureHandler<RingInstance>.ConsistencyFailurePerRange> + failedRanges = handler.getFailedRanges(ctx.topology, ctx.jobInfo, ctx.clusterInfo); + assertThat(failedRanges).isNotEmpty(); + }); + } + + private void testFailedRangeCheck(Consumer<FailureHandlerTextContext> test) + { + + TokenRangeMapping<RingInstance> topology = TokenRangeMappingTest.createTestMapping(0, 5, partitioner, null); + JobInfo jobInfo = mock(JobInfo.class); + when(jobInfo.getConsistencyLevel()).thenReturn(ConsistencyLevel.CL.LOCAL_QUORUM); + when(jobInfo.getLocalDC()).thenReturn(DATACENTER_1); + ClusterInfo clusterInfo = mock(ClusterInfo.class); + ReplicationFactor rf = new ReplicationFactor(ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy, + ImmutableMap.of(DATACENTER_1, 3)); + when(clusterInfo.replicationFactor()).thenReturn(rf); + test.accept(new FailureHandlerTextContext(topology, jobInfo, clusterInfo)); + } +} diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMappingTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMappingTest.java index b80538d..92f9aba 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMappingTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMappingTest.java @@ -35,6 +35,7 @@ import org.apache.cassandra.spark.bulkwriter.TokenRangeMappingUtils; import org.apache.cassandra.spark.common.model.NodeState; import org.apache.cassandra.spark.data.partitioner.Partitioner; +import static org.apache.cassandra.spark.TestUtils.range; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -219,7 +220,7 @@ class TokenRangeMappingTest return createTestMapping(0L, instanceCount, partitioner, null); } - private TokenRangeMapping<RingInstance> createTestMapping(long startToken, int instanceCount, Partitioner partitioner, String clusterId) + public static TokenRangeMapping<RingInstance> createTestMapping(long startToken, int instanceCount, Partitioner partitioner, String clusterId) { return TokenRangeMapping.create( () -> TokenRangeMappingUtils.mockSimpleTokenRangeReplicasResponse(startToken, instanceCount, 3), @@ -227,11 +228,6 @@ class TokenRangeMappingTest metadata -> new RingInstance(metadata, clusterId)); } - private Range<BigInteger> range(long start, long end) - { - return Range.openClosed(BigInteger.valueOf(start), BigInteger.valueOf(end)); - } - private List<RingInstance> getSoleValue(Map<?, List<RingInstance>> map) { if (map.isEmpty()) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org