This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4624a17  CASSANDRA-19933: Support aggregated consistency validation 
for multiple clusters (#86)
4624a17 is described below

commit 4624a17098e055e0abf9a6025451d4352cb9c147
Author: Yifan Cai <y...@apache.org>
AuthorDate: Tue Sep 24 23:50:35 2024 -0700

    CASSANDRA-19933: Support aggregated consistency validation for multiple 
clusters (#86)
    
    Patch by Yifan Cai; Reviewed by Francisco Guerrero for CASSANDRA-19933
---
 CHANGES.txt                                        |   1 +
 .../spark/common/model/CassandraInstance.java      |   5 -
 cassandra-analytics-core/build.gradle              |   4 +
 .../spark/bulkwriter/BulkWriteValidator.java       |   8 +-
 .../spark/bulkwriter/BulkWriterContextFactory.java |  12 +-
 .../bulkwriter/CassandraBulkSourceRelation.java    |   3 +-
 .../cassandra/spark/bulkwriter/RecordWriter.java   |   3 +-
 .../cassandra/spark/bulkwriter/RingInstance.java   |   1 +
 .../MultiClusterReplicaAwareFailureHandler.java    | 135 ++++++++++++++
 .../token/ReplicaAwareFailureHandler.java          | 152 +++++-----------
 .../SingleClusterReplicaAwareFailureHandler.java   | 196 +++++++++++++++++++++
 .../spark/bulkwriter/token/TokenRangeMapping.java  |  24 ++-
 .../spark/sparksql/CassandraDataSink.java          |   2 +-
 .../java/org/apache/cassandra/spark/TestUtils.java |  94 ++++++----
 .../spark/bulkwriter/BulkWriteValidatorTest.java   |   3 +-
 .../spark/bulkwriter/CassandraClusterInfoTest.java |   8 +-
 .../spark/bulkwriter/DirectStreamSessionTest.java  |   3 +-
 .../ImportCompletionCoordinatorTest.java           |   4 +-
 .../spark/bulkwriter/MockBulkWriterContext.java    |   1 +
 .../spark/bulkwriter/RingInstanceTest.java         | 102 ++---------
 .../bulkwriter/StreamSessionConsistencyTest.java   |   4 +-
 .../spark/bulkwriter/TokenRangeMappingUtils.java   |   4 +-
 .../blobupload/BlobStreamSessionTest.java          |   4 +-
 .../token/FailureHandlerTextContext.java           |  38 ++++
 ...MultiClusterReplicaAwareFailureHandlerTest.java | 155 ++++++++++++++++
 ...ingleClusterReplicaAwareFailureHandlerTest.java | 121 +++++++++++++
 .../bulkwriter/token/TokenRangeMappingTest.java    |   8 +-
 27 files changed, 821 insertions(+), 274 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index a45595e..2cb65f5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.0.0
+ * Support aggregated consistency validation for multiple clusters 
(CASSANDRA-19933)
  * Add transport extension for coordinated write (CASSANDRA-19923)
  * Support data partitioning for multiple clusters coordinated write 
(CASSANDRA-19910)
  * Add writer option COORDINATED_WRITE_CONFIG to define coordinated write to 
multiple Cassandra clusters (CASSANDRA-19909)
diff --git 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/CassandraInstance.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/CassandraInstance.java
index c69b6eb..fed2f2d 100644
--- 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/CassandraInstance.java
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/CassandraInstance.java
@@ -29,11 +29,6 @@ public interface CassandraInstance extends TokenOwner
      */
     @Nullable String clusterId();
 
-    default boolean hasClusterId()
-    {
-        return clusterId() != null;
-    }
-
     String nodeName();
 
     String datacenter();
diff --git a/cassandra-analytics-core/build.gradle 
b/cassandra-analytics-core/build.gradle
index b0f9c9c..e33a1ac 100644
--- a/cassandra-analytics-core/build.gradle
+++ b/cassandra-analytics-core/build.gradle
@@ -147,6 +147,10 @@ test {
             destination = destDir
         }
     }
+
+    testLogging {
+        events "started", "passed", "skipped", "failed"
+    }
 }
 
 /* Start: JaCoCo check */
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidator.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidator.java
index 56e1989..a8377a6 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidator.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidator.java
@@ -32,16 +32,18 @@ import 
org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
 import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
 import org.apache.cassandra.spark.exception.ConsistencyNotSatisfiedException;
 
+/**
+ * A validator for bulk write result against the target cluster(s).
+ */
 public class BulkWriteValidator
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(BulkWriteValidator.class);
 
+    private final ClusterInfo cluster;
     private final ReplicaAwareFailureHandler<RingInstance> failureHandler;
     private final JobInfo job;
     private String phase = "Initializing";
 
-    private final ClusterInfo cluster;
-
     public BulkWriteValidator(BulkWriterContext bulkWriterContext,
                               ReplicaAwareFailureHandler<RingInstance> 
failureHandler)
     {
@@ -58,7 +60,7 @@ public class BulkWriteValidator
                                         ClusterInfo cluster)
     {
         
List<ReplicaAwareFailureHandler<RingInstance>.ConsistencyFailurePerRange> 
failedRanges =
-        failureHandler.getFailedRanges(tokenRangeMapping, 
job.getConsistencyLevel(), job.getLocalDC(), cluster.replicationFactor());
+        failureHandler.getFailedRanges(tokenRangeMapping, job, cluster);
 
         if (failedRanges.isEmpty())
         {
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContextFactory.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContextFactory.java
index 9d710bd..72b360c 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContextFactory.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriterContextFactory.java
@@ -38,8 +38,6 @@ public class BulkWriterContextFactory
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(BulkWriterContextFactory.class);
 
-    public static final BulkWriterContextFactory INSTANCE = new 
BulkWriterContextFactory();
-
     @NotNull
     public BulkWriterContext createBulkWriterContext(@NotNull SparkContext 
sparkContext,
                                                      @NotNull Map<String, 
String> options,
@@ -47,9 +45,9 @@ public class BulkWriterContextFactory
     {
         Preconditions.checkNotNull(schema);
 
-        BulkWriterContext bulkWriterContext;
-        BulkSparkConf conf = new BulkSparkConf(sparkContext.getConf(), 
options);
+        BulkSparkConf conf = createBulkSparkConf(sparkContext, options);
         int sparkDefaultParallelism = sparkContext.defaultParallelism();
+        BulkWriterContext bulkWriterContext;
         if (conf.isCoordinatedWriteConfigured())
         {
             LOGGER.info("Initializing bulk writer context for multi-clusters 
coordinated write");
@@ -67,6 +65,12 @@ public class BulkWriterContextFactory
         return bulkWriterContext;
     }
 
+    @NotNull
+    protected BulkSparkConf createBulkSparkConf(@NotNull SparkContext 
sparkContext, @NotNull Map<String, String> options)
+    {
+        return new BulkSparkConf(sparkContext.getConf(), options);
+    }
+
     @NotNull
     protected BulkWriterContext createBulkWriterContext(@NotNull BulkSparkConf 
conf, StructType schema, int sparkDefaultParallelism)
     {
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
index 3eb39c1..51d18b8 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
@@ -39,6 +39,7 @@ import 
o.a.c.sidecar.client.shaded.common.data.RestoreJobStatus;
 import 
o.a.c.sidecar.client.shaded.common.request.data.CreateRestoreJobRequestPayload;
 import 
o.a.c.sidecar.client.shaded.common.request.data.UpdateRestoreJobRequestPayload;
 import org.apache.cassandra.spark.bulkwriter.blobupload.BlobStreamResult;
+import 
org.apache.cassandra.spark.bulkwriter.token.MultiClusterReplicaAwareFailureHandler;
 import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
 import org.apache.cassandra.spark.common.client.ClientException;
 import 
org.apache.cassandra.spark.transports.storage.extensions.StorageTransportConfiguration;
@@ -78,7 +79,7 @@ public class CassandraBulkSourceRelation extends BaseRelation 
implements Inserta
         this.sqlContext = sqlContext;
         this.sparkContext = 
JavaSparkContext.fromSparkContext(sqlContext.sparkContext());
         this.broadcastContext = 
sparkContext.<BulkWriterContext>broadcast(writerContext);
-        ReplicaAwareFailureHandler<RingInstance> failureHandler = new 
ReplicaAwareFailureHandler<>(writerContext.cluster().getPartitioner());
+        ReplicaAwareFailureHandler<RingInstance> failureHandler = new 
MultiClusterReplicaAwareFailureHandler<>(writerContext.cluster().getPartitioner());
         this.writeValidator = new BulkWriteValidator(writerContext, 
failureHandler);
         this.simpleTaskScheduler = new SimpleTaskScheduler();
     }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
index 2cd6abf..fe73b7b 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
@@ -46,6 +46,7 @@ import com.google.common.collect.Range;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import 
org.apache.cassandra.spark.bulkwriter.token.MultiClusterReplicaAwareFailureHandler;
 import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
 import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
 import org.apache.cassandra.spark.bulkwriter.util.TaskContextUtils;
@@ -96,7 +97,7 @@ public class RecordWriter
         this.columnNames = columnNames;
         this.taskContextSupplier = taskContextSupplier;
         this.tableWriterFactory = tableWriterFactory;
-        this.failureHandler = new 
ReplicaAwareFailureHandler<>(writerContext.cluster().getPartitioner());
+        this.failureHandler = new 
MultiClusterReplicaAwareFailureHandler<>(writerContext.cluster().getPartitioner());
         this.writeValidator = new BulkWriteValidator(writerContext, 
failureHandler);
         this.digestAlgorithm = 
this.writerContext.job().digestAlgorithmSupplier().get();
         this.streamFutures = new HashMap<>();
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java
index 362e04c..2304b55 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java
@@ -80,6 +80,7 @@ public class RingInstance implements CassandraInstance, 
Serializable
     }
 
     @Override
+    @Nullable
     public String clusterId()
     {
         return clusterId;
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/MultiClusterReplicaAwareFailureHandler.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/MultiClusterReplicaAwareFailureHandler.java
new file mode 100644
index 0000000..36c0e93
--- /dev/null
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/MultiClusterReplicaAwareFailureHandler.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter.token;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Range;
+
+import org.apache.cassandra.spark.bulkwriter.ClusterInfo;
+import org.apache.cassandra.spark.bulkwriter.JobInfo;
+import 
org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CassandraClusterInfoGroup;
+import 
org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf;
+import 
org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf.ClusterConf;
+import org.apache.cassandra.spark.common.model.CassandraInstance;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A ReplicaAwareFailureHandler that can handle multiple clusters, including 
the case of single cluster.
+ *
+ * @param <I> CassandraInstance type
+ */
+public class MultiClusterReplicaAwareFailureHandler<I extends 
CassandraInstance> extends ReplicaAwareFailureHandler<I>
+{
+    // default failure handler for CassandraInstance w/o specifying the 
belonging clusterId
+    private final SingleClusterReplicaAwareFailureHandler<I> 
defaultFailureHandler;
+    private final Map<String, SingleClusterReplicaAwareFailureHandler<I>> 
failureHandlerPerCluster = new HashMap<>();
+
+    private final Partitioner partitioner;
+
+    public MultiClusterReplicaAwareFailureHandler(Partitioner partitioner)
+    {
+        this.partitioner = partitioner;
+        this.defaultFailureHandler = new 
SingleClusterReplicaAwareFailureHandler<>(partitioner, null);
+    }
+
+    @Override
+    public synchronized void addFailure(Range<BigInteger> tokenRange, I 
instance, String errMessage)
+    {
+        String clusterId = instance.clusterId();
+        if (clusterId != null)
+        {
+            Preconditions.checkState(defaultFailureHandler.isEmpty(),
+                                     "Cannot track failures from both 
instances with and without clusterId");
+            ReplicaAwareFailureHandler<I> handler = failureHandlerPerCluster
+                                                    
.computeIfAbsent(clusterId, k -> new 
SingleClusterReplicaAwareFailureHandler<>(partitioner, clusterId));
+            handler.addFailure(tokenRange, instance, errMessage);
+        }
+        else
+        {
+            Preconditions.checkState(failureHandlerPerCluster.isEmpty(),
+                                     "Cannot track failures from both 
instances with and without clusterId");
+            defaultFailureHandler.addFailure(tokenRange, instance, errMessage);
+        }
+    }
+
+    @Override
+    public synchronized Set<I> getFailedInstances()
+    {
+        if (failureHandlerPerCluster.isEmpty())
+        {
+            return defaultFailureHandler.getFailedInstances();
+        }
+
+        // failed instances merged from all clusters
+        HashSet<I> failedInstances = new HashSet<>();
+        failureHandlerPerCluster.values().forEach(handler -> 
failedInstances.addAll(handler.getFailedInstances()));
+        return failedInstances;
+    }
+
+    @Override
+    public synchronized 
List<ReplicaAwareFailureHandler<I>.ConsistencyFailurePerRange>
+    getFailedRanges(TokenRangeMapping<I> tokenRangeMapping,
+                    JobInfo job,
+                    ClusterInfo cluster)
+    {
+        if (failureHandlerPerCluster.isEmpty())
+        {
+            return defaultFailureHandler.getFailedRanges(tokenRangeMapping, 
job, cluster);
+        }
+
+        List<ReplicaAwareFailureHandler<I>.ConsistencyFailurePerRange> 
failurePerRanges = new ArrayList<>();
+        CoordinatedWriteConf coordinatedWriteConf = job.coordinatedWriteConf();
+        Preconditions.checkState(coordinatedWriteConf != null,
+                                 "CoordinatedWriteConf is absent for 
multi-cluster write");
+        Preconditions.checkState(cluster instanceof CassandraClusterInfoGroup,
+                                 "Not a CassandraClusterInfoGroup for 
multi-cluster write");
+        CassandraClusterInfoGroup group = (CassandraClusterInfoGroup) cluster;
+        failureHandlerPerCluster.forEach((clusterId, handler) -> {
+            ClusterConf clusterConf = 
Objects.requireNonNull(coordinatedWriteConf.cluster(clusterId),
+                                                             () -> 
"ClusterConf is not found for " + clusterId);
+            ClusterInfo clusterInfo = 
Objects.requireNonNull(group.cluster(clusterId),
+                                                             () -> 
"ClusterInfo is not found for " + clusterId);
+            ReplicationFactor rf = clusterInfo.replicationFactor();
+            
failurePerRanges.addAll(handler.getFailedRangesInternal(tokenRangeMapping, 
job.getConsistencyLevel(), clusterConf.localDc(), rf));
+        });
+        return failurePerRanges;
+    }
+
+    @Override
+    protected List<ReplicaAwareFailureHandler<I>.ConsistencyFailurePerRange>
+    getFailedRangesInternal(TokenRangeMapping<I> tokenRangeMapping,
+                            ConsistencyLevel cl,
+                            @Nullable String localDC,
+                            ReplicationFactor replicationFactor)
+    {
+        throw new UnsupportedOperationException("Not implemented for 
MultiClusterReplicaAwareFailureHandler");
+    }
+}
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ReplicaAwareFailureHandler.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ReplicaAwareFailureHandler.java
index f821ba9..45067b2 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ReplicaAwareFailureHandler.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ReplicaAwareFailureHandler.java
@@ -20,39 +20,37 @@
 package org.apache.cassandra.spark.bulkwriter.token;
 
 import java.math.BigInteger;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.function.BiConsumer;
-import java.util.stream.Collectors;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Range;
-import com.google.common.collect.RangeMap;
-import com.google.common.collect.TreeRangeMap;
 
+import org.apache.cassandra.spark.bulkwriter.ClusterInfo;
+import org.apache.cassandra.spark.bulkwriter.JobInfo;
 import org.apache.cassandra.spark.common.model.CassandraInstance;
-import org.apache.cassandra.spark.common.model.NodeStatus;
 import org.apache.cassandra.spark.data.ReplicationFactor;
-import org.apache.cassandra.spark.data.partitioner.Partitioner;
 import org.jetbrains.annotations.Nullable;
 
-public class ReplicaAwareFailureHandler<Instance extends CassandraInstance>
+/**
+ * Handles write failures of a single cluster
+ * @param <I> CassandraInstance type
+ */
+public abstract class ReplicaAwareFailureHandler<I extends CassandraInstance>
 {
     public class FailuresPerInstance
     {
-        private final Multimap<Instance, String> errorMessagesPerInstance;
+        private final Multimap<I, String> errorMessagesPerInstance;
 
         public FailuresPerInstance()
         {
             this.errorMessagesPerInstance = ArrayListMultimap.create();
         }
 
-        public FailuresPerInstance(Multimap<Instance, String> 
errorMessagesPerInstance)
+        public FailuresPerInstance(Multimap<I, String> 
errorMessagesPerInstance)
         {
             this.errorMessagesPerInstance = 
ArrayListMultimap.create(errorMessagesPerInstance);
         }
@@ -62,26 +60,32 @@ public class ReplicaAwareFailureHandler<Instance extends 
CassandraInstance>
             return new FailuresPerInstance(this.errorMessagesPerInstance);
         }
 
-        public Set<Instance> instances()
+        public Set<I> instances()
         {
             return errorMessagesPerInstance.keySet();
         }
 
-        public void addErrorForInstance(Instance instance, String errorMessage)
+        public void addErrorForInstance(I instance, String errorMessage)
         {
             errorMessagesPerInstance.put(instance, errorMessage);
         }
 
-        public boolean hasError(Instance instance)
+        public boolean hasError(I instance)
         {
             return errorMessagesPerInstance.containsKey(instance)
                    && !errorMessagesPerInstance.get(instance).isEmpty();
         }
 
-        public void forEachInstance(BiConsumer<Instance, Collection<String>> 
instanceErrorsConsumer)
+        public void forEachInstance(BiConsumer<I, Collection<String>> 
instanceErrorsConsumer)
         {
             errorMessagesPerInstance.asMap().forEach(instanceErrorsConsumer);
         }
+
+        @Override
+        public String toString()
+        {
+            return errorMessagesPerInstance.toString();
+        }
     }
 
     public class ConsistencyFailurePerRange
@@ -96,13 +100,18 @@ public class ReplicaAwareFailureHandler<Instance extends 
CassandraInstance>
         }
     }
 
-    // failures captures per each range; note that failures do not necessarily 
fail a range, as long as consistency level is considered
-    private final RangeMap<BigInteger, FailuresPerInstance> rangeFailuresMap = 
TreeRangeMap.create();
-
-    public ReplicaAwareFailureHandler(Partitioner partitioner)
-    {
-        rangeFailuresMap.put(Range.openClosed(partitioner.minToken(), 
partitioner.maxToken()), new FailuresPerInstance());
-    }
+    /**
+     * Given the number of failed instances for each token range, validates if 
the consistency guarantees are maintained for the job
+     *
+     * @param tokenRangeMapping the mapping of token ranges to a Cassandra 
instance
+     * @param job               the job to verify
+     * @param cluster           cluster info
+     * @return list of failed token ranges that break consistency. This should 
ideally be empty for a
+     * successful operation.
+     */
+    public abstract List<ConsistencyFailurePerRange> 
getFailedRanges(TokenRangeMapping<I> tokenRangeMapping,
+                                                                     JobInfo 
job,
+                                                                     
ClusterInfo cluster);
 
     /**
      * Adds a new token range as a failed token range, with errors on given 
instance.
@@ -115,32 +124,16 @@ public class ReplicaAwareFailureHandler<Instance extends 
CassandraInstance>
      * one returned from failedRangesMap map. As our range could be 
overlapping partially and the map could be used
      * by other range.
      *
-     * @param tokenRange  the range which failed
-     * @param casInstance the instance on which the range failed
-     * @param errMessage  the error that occurred for this particular 
range/instance pair
+     * @param tokenRange the range which failed
+     * @param instance   the instance on which the range failed
+     * @param errMessage the error that occurred for this particular 
range/instance pair
      */
-    public synchronized void addFailure(Range<BigInteger> tokenRange, Instance 
casInstance, String errMessage)
-    {
-        RangeMap<BigInteger, FailuresPerInstance> overlappingFailures = 
rangeFailuresMap.subRangeMap(tokenRange);
-        RangeMap<BigInteger, FailuresPerInstance> mappingsToAdd = 
TreeRangeMap.create();
+    public abstract void addFailure(Range<BigInteger> tokenRange, I instance, 
String errMessage);
 
-        for (Map.Entry<Range<BigInteger>, FailuresPerInstance> entry : 
overlappingFailures.asMapOfRanges().entrySet())
-        {
-            FailuresPerInstance newErrorMap = entry.getValue().copy();
-            newErrorMap.addErrorForInstance(casInstance, errMessage);
-            mappingsToAdd.put(entry.getKey(), newErrorMap);
-        }
-        rangeFailuresMap.putAll(mappingsToAdd);
-    }
-
-    public Set<Instance> getFailedInstances()
-    {
-        return rangeFailuresMap.asMapOfRanges().values()
-                               .stream()
-                               .map(FailuresPerInstance::instances)
-                               .flatMap(Collection::stream)
-                               .collect(Collectors.toSet());
-    }
+    /**
+     * @return the set of all failed instances
+     */
+    public abstract Set<I> getFailedInstances();
 
     /**
      * Given the number of failed instances for each token range, validates if 
the consistency guarantees are maintained
@@ -149,69 +142,12 @@ public class ReplicaAwareFailureHandler<Instance extends 
CassandraInstance>
      * @param tokenRangeMapping the mapping of token ranges to a Cassandra 
instance
      * @param cl                the desired consistency level
      * @param localDC           the local datacenter
+     * @param replicationFactor replication of the enclosing keyspace
      * @return list of failed token ranges that break consistency. This should 
ideally be empty for a
      * successful operation.
      */
-    public synchronized List<ConsistencyFailurePerRange>
-    getFailedRanges(TokenRangeMapping<Instance> tokenRangeMapping,
-                    ConsistencyLevel cl,
-                    @Nullable String localDC,
-                    ReplicationFactor replicationFactor)
-    {
-        Preconditions.checkArgument((cl.isLocal() && localDC != null) || 
(!cl.isLocal() && localDC == null),
-                                    "Not a valid pair of consistency level 
configuration. " +
-                                    "Consistency level: " + cl + " localDc: " 
+ localDC);
-        List<ConsistencyFailurePerRange> failedRanges = new ArrayList<>();
-
-        for (Map.Entry<Range<BigInteger>, FailuresPerInstance> 
failedRangeEntry : rangeFailuresMap.asMapOfRanges()
-                                                                               
                   .entrySet())
-        {
-            Range<BigInteger> range = failedRangeEntry.getKey();
-            FailuresPerInstance errorMap = failedRangeEntry.getValue();
-            Set<Instance> failedReplicas = errorMap.instances()
-                                                   .stream()
-                                                   .filter(errorMap::hasError)
-                                                   
.collect(Collectors.toSet());
-
-            // no failures found for the range; skip consistency check on this 
one and move on
-            if (failedReplicas.isEmpty())
-            {
-                continue;
-            }
-
-            tokenRangeMapping.getWriteReplicasOfRange(range, localDC)
-                             .forEach((subrange, liveAndDown) -> {
-                                 if (!checkSubrange(cl, localDC, 
replicationFactor, liveAndDown, failedReplicas))
-                                 {
-                                     failedRanges.add(new 
ConsistencyFailurePerRange(subrange, errorMap));
-                                 }
-                             });
-        }
-
-        return failedRanges;
-    }
-
-    /**
-     * Check whether a CL can be satisfied for each sub-range.
-     * @return true if consistency is satisfied; false otherwise.
-     */
-    private boolean checkSubrange(ConsistencyLevel cl,
-                                  @Nullable String localDC,
-                                  ReplicationFactor replicationFactor,
-                                  Set<Instance> liveAndDown,
-                                  Set<Instance> failedReplicas)
-    {
-        Set<Instance> liveReplicas = liveAndDown.stream()
-                                                .filter(instance -> 
instance.nodeStatus() == NodeStatus.UP)
-                                                .collect(Collectors.toSet());
-        Set<Instance> pendingReplicas = liveAndDown.stream()
-                                                   .filter(instance -> 
instance.nodeState().isPending)
-                                                   
.collect(Collectors.toSet());
-        // success is assumed if not failed
-        Set<Instance> succeededReplicas = liveReplicas.stream()
-                                                      .filter(instance -> 
!failedReplicas.contains(instance))
-                                                      
.collect(Collectors.toSet());
-
-        return cl.canBeSatisfied(succeededReplicas, pendingReplicas, 
replicationFactor, localDC);
-    }
+    protected abstract List<ConsistencyFailurePerRange> 
getFailedRangesInternal(TokenRangeMapping<I> tokenRangeMapping,
+                                                                               
 ConsistencyLevel cl,
+                                                                               
 @Nullable String localDC,
+                                                                               
 ReplicationFactor replicationFactor);
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/SingleClusterReplicaAwareFailureHandler.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/SingleClusterReplicaAwareFailureHandler.java
new file mode 100644
index 0000000..7a35091
--- /dev/null
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/SingleClusterReplicaAwareFailureHandler.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter.token;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeMap;
+import com.google.common.collect.TreeRangeMap;
+
+import org.apache.cassandra.spark.bulkwriter.ClusterInfo;
+import org.apache.cassandra.spark.bulkwriter.JobInfo;
+import org.apache.cassandra.spark.common.model.CassandraInstance;
+import org.apache.cassandra.spark.common.model.NodeStatus;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * ReplicaAwareFailureHandler for a single cluster
+ * The handler should be constructed by {@link 
MultiClusterReplicaAwareFailureHandler} only, hence package-private
+ * @param <I> CassandraInstance type
+ */
+class SingleClusterReplicaAwareFailureHandler<I extends CassandraInstance> 
extends ReplicaAwareFailureHandler<I>
+{
+    // failures captures per each range; note that failures do not necessarily 
fail a range, as long as consistency level is considered
+    @GuardedBy("this")
+    private final RangeMap<BigInteger, FailuresPerInstance> rangeFailuresMap = 
TreeRangeMap.create();
+
+    @GuardedBy("this")
+    private boolean isEmpty = true;
+
+    @Nullable
+    private final String clusterId;
+
+    SingleClusterReplicaAwareFailureHandler(Partitioner partitioner, @Nullable 
String clusterId)
+    {
+        this.clusterId = clusterId;
+        rangeFailuresMap.put(Range.openClosed(partitioner.minToken(), 
partitioner.maxToken()), new FailuresPerInstance());
+    }
+
+    /**
+     * Check whether the handler contains any failure
+     * @return true if there is at least a failure; false otherwise.
+     */
+    public boolean isEmpty()
+    {
+        return isEmpty;
+    }
+
+    @Override
+    public List<ReplicaAwareFailureHandler<I>.ConsistencyFailurePerRange>
+    getFailedRanges(TokenRangeMapping<I> tokenRangeMapping, JobInfo job, 
ClusterInfo cluster)
+    {
+        return getFailedRangesInternal(tokenRangeMapping, 
job.getConsistencyLevel(), job.getLocalDC(), cluster.replicationFactor());
+    }
+
+    @Override
+    public synchronized void addFailure(Range<BigInteger> tokenRange, I 
instance, String errMessage)
+    {
+        RangeMap<BigInteger, FailuresPerInstance> overlappingFailures = 
rangeFailuresMap.subRangeMap(tokenRange);
+        RangeMap<BigInteger, FailuresPerInstance> mappingsToAdd = 
TreeRangeMap.create();
+
+        for (Map.Entry<Range<BigInteger>, FailuresPerInstance> entry : 
overlappingFailures.asMapOfRanges().entrySet())
+        {
+            FailuresPerInstance newErrorMap = entry.getValue().copy();
+            newErrorMap.addErrorForInstance(instance, errMessage);
+            mappingsToAdd.put(entry.getKey(), newErrorMap);
+        }
+        rangeFailuresMap.putAll(mappingsToAdd);
+        isEmpty = false;
+    }
+
+    @Override
+    public synchronized Set<I> getFailedInstances()
+    {
+        if (isEmpty)
+        {
+            return Collections.emptySet();
+        }
+
+        return rangeFailuresMap.asMapOfRanges()
+                               .values()
+                               .stream()
+                               .map(FailuresPerInstance::instances)
+                               .flatMap(Collection::stream)
+                               .collect(Collectors.toSet());
+    }
+
+    @Override
+    protected synchronized 
List<ReplicaAwareFailureHandler<I>.ConsistencyFailurePerRange>
+    getFailedRangesInternal(TokenRangeMapping<I> tokenRangeMapping,
+                            ConsistencyLevel cl,
+                            @Nullable String localDC,
+                            ReplicationFactor replicationFactor)
+    {
+        Preconditions.checkArgument((cl.isLocal() && localDC != null) || 
(!cl.isLocal() && localDC == null),
+                                    "Not a valid pair of consistency level 
configuration. " +
+                                    "Consistency level: " + cl + " localDc: " 
+ localDC);
+        List<ConsistencyFailurePerRange> failedRanges = new ArrayList<>();
+
+        if (isEmpty)
+        {
+            return failedRanges;
+        }
+
+        for (Map.Entry<Range<BigInteger>, FailuresPerInstance> 
failedRangeEntry : rangeFailuresMap.asMapOfRanges().entrySet())
+        {
+            Range<BigInteger> range = failedRangeEntry.getKey();
+            FailuresPerInstance errorMap = failedRangeEntry.getValue();
+            Set<I> failedReplicas = errorMap.instances()
+                                            .stream()
+                                            .filter(errorMap::hasError)
+                                            .collect(Collectors.toSet());
+
+            // no failures found for the range; skip consistency check on this 
one and move on
+            if (failedReplicas.isEmpty())
+            {
+                continue;
+            }
+
+            tokenRangeMapping.getWriteReplicasOfRange(range, instance -> {
+                                 boolean shouldKeep = localDC == null || 
instance.datacenter().equalsIgnoreCase(localDC);
+                                 if (shouldKeep && clusterId != null)
+                                 {
+                                     shouldKeep = 
clusterId.equalsIgnoreCase(instance.clusterId());
+                                 }
+                                 return shouldKeep;
+                             })
+                             .forEach((subrange, liveAndDown) -> {
+                                 if (!checkSubrange(cl, localDC, 
replicationFactor, liveAndDown, failedReplicas))
+                                 {
+                                     failedRanges.add(new 
ConsistencyFailurePerRange(subrange, errorMap));
+                                 }
+                             });
+        }
+
+        return failedRanges;
+    }
+
+    /**
+     * Check whether a CL can be satisfied for each sub-range.
+     * @return true if consistency is satisfied; false otherwise.
+     */
+    private boolean checkSubrange(ConsistencyLevel cl,
+                                  @Nullable String localDC,
+                                  ReplicationFactor replicationFactor,
+                                  Set<I> liveAndDown,
+                                  Set<I> failedReplicas)
+    {
+        Set<I> pendingReplicas = new HashSet<>();
+        // success is assumed if not failed
+        Set<I> succeededReplicas = new HashSet<>();
+        for (I instance : liveAndDown)
+        {
+            if (instance.nodeStatus() == NodeStatus.UP && 
!failedReplicas.contains(instance))
+            {
+                succeededReplicas.add(instance);
+            }
+            if (instance.nodeState().isPending)
+            {
+                pendingReplicas.add(instance);
+            }
+        }
+
+        return cl.canBeSatisfied(succeededReplicas, pendingReplicas, 
replicationFactor, localDC);
+    }
+}
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMapping.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMapping.java
index 367c385..caf1f65 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMapping.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMapping.java
@@ -29,9 +29,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ArrayListMultimap;
@@ -204,21 +206,35 @@ public class TokenRangeMapping<I extends 
CassandraInstance> implements Serializa
      * @param localDc local DC name to filter out non-local-DC instances. The 
parameter is optional. When not present, i.e. null, no filtering is applied
      * @return the write replicas of sub-ranges
      */
+    @VisibleForTesting
     public Map<Range<BigInteger>, Set<I>> 
getWriteReplicasOfRange(Range<BigInteger> range, @Nullable String localDc)
+    {
+        return getWriteReplicasOfRange(range, instance -> 
instance.datacenter().equalsIgnoreCase(localDc));
+    }
+
+    /**
+     * Get write replica-sets of sub-ranges that overlap with the input range.
+     *
+     * @param range range to check. The range can potentially overlap with 
multiple ranges.
+     *              For example, a down node adds one failure of a token range 
that covers multiple primary token ranges that replicate to it.
+     * @param instanceFilter predicate to filter the instances
+     * @return the write replicas of sub-ranges
+     */
+    public Map<Range<BigInteger>, Set<I>> 
getWriteReplicasOfRange(Range<BigInteger> range, @Nullable Predicate<I> 
instanceFilter)
     {
         Map<Range<BigInteger>, List<I>> subRangeReplicas = 
instancesByTokenRange.subRangeMap(range).asMapOfRanges();
-        Function<List<I>, Set<I>> inDcInstances = instances -> {
-            if (localDc != null)
+        Function<List<I>, Set<I>> filterAndTransform = instances -> {
+            if (instanceFilter != null)
             {
                 return instances.stream()
-                                .filter(instance -> 
instance.datacenter().equalsIgnoreCase(localDc))
+                                .filter(instanceFilter)
                                 .collect(Collectors.toSet());
             }
             return new HashSet<>(instances);
         };
         return subRangeReplicas.entrySet()
                                .stream()
-                               .collect(Collectors.toMap(Map.Entry::getKey, 
entry -> inDcInstances.apply(entry.getValue())));
+                               .collect(Collectors.toMap(Map.Entry::getKey, 
entry -> filterAndTransform.apply(entry.getValue())));
     }
 
     // Used for writes
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/CassandraDataSink.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/CassandraDataSink.java
index bdd6bc1..1791dae 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/CassandraDataSink.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/CassandraDataSink.java
@@ -103,6 +103,6 @@ public class CassandraDataSink implements 
DataSourceRegister, CreatableRelationP
     @NotNull
     protected BulkWriterContextFactory factory()
     {
-        return BulkWriterContextFactory.INSTANCE;
+        return new BulkWriterContextFactory();
     }
 }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java
index 60266f0..d7c6bdf 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java
@@ -38,6 +38,7 @@ import java.util.stream.Stream;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Range;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 
@@ -67,13 +68,26 @@ import static 
org.quicktheories.generators.SourceDSL.arbitrary;
 
 public final class TestUtils
 {
-    private static final SparkSession SPARK = SparkSession.builder()
-                                                          .appName("Java Test")
-                                                          
.config("spark.master", "local")
-                                                          // Spark is not 
case-sensitive by default, but we want to make it case-sensitive for
-                                                          // the quoted 
identifiers tests where we test mixed case
-                                                          
.config("spark.sql.caseSensitive", "True")
-                                                          .getOrCreate();
+    private static class Holder
+    {
+        private static final SparkSession SPARK_SESSION = createSession();
+
+        static SparkSession createSession()
+        {
+            return SparkSession.builder()
+                               .appName("Java Test")
+                               .config("spark.master", "local")
+                               // Spark is not case-sensitive by default, but 
we want to make it case-sensitive for
+                               // the quoted identifiers tests where we test 
mixed case
+                               .config("spark.sql.caseSensitive", "True")
+                               .getOrCreate();
+        }
+    }
+
+    private static SparkSession session()
+    {
+        return Holder.SPARK_SESSION;
+    }
 
     private TestUtils()
     {
@@ -162,15 +176,17 @@ public final class TestUtils
                                                      Set<CqlField.CqlUdt> udts,
                                                      @Nullable String 
statsClass)
     {
-        DataFrameReader frameReader = 
SPARK.read().format("org.apache.cassandra.spark.sparksql.LocalPartitionSizeSource")
-                                           .option("keyspace", keyspace)
-                                           .option("createStmt", createStmt)
-                                           .option("dirs", 
dir.toAbsolutePath().toString())
-                                           .option("version", 
version.toString())
-                                           .option("useBufferingInputStream", 
true)  // Use in the test system to test the BufferingInputStream
-                                           .option("partitioner", 
partitioner.name())
-                                           .option("udts", udts.stream().map(f 
-> f.createStatement(bridge.cassandraTypes(), keyspace))
-                                                               
.collect(Collectors.joining("\n")));
+        DataFrameReader frameReader = session()
+                                      .read()
+                                      
.format("org.apache.cassandra.spark.sparksql.LocalPartitionSizeSource")
+                                      .option("keyspace", keyspace)
+                                      .option("createStmt", createStmt)
+                                      .option("dirs", 
dir.toAbsolutePath().toString())
+                                      .option("version", version.toString())
+                                      .option("useBufferingInputStream", true) 
 // Use in the test system to test the BufferingInputStream
+                                      .option("partitioner", 
partitioner.name())
+                                      .option("udts", udts.stream().map(f -> 
f.createStatement(bridge.cassandraTypes(), keyspace))
+                                                          
.collect(Collectors.joining("\n")));
         if (statsClass != null)
         {
             frameReader = frameReader.option("statsClass", statsClass);
@@ -180,11 +196,12 @@ public final class TestUtils
 
     public static Dataset<Row> read(Path path, StructType schema)
     {
-        return SPARK.read()
-                    .format("parquet")
-                    .option("path", path.toString())
-                    .schema(schema)
-                    .load();
+        return session()
+               .read()
+               .format("parquet")
+               .option("path", path.toString())
+               .schema(schema)
+               .load();
     }
 
     // CHECKSTYLE IGNORE: Method with many parameters
@@ -200,18 +217,19 @@ public final class TestUtils
                                                 @Nullable String 
filterExpression,
                                                 @Nullable String... columns)
     {
-        DataFrameReader frameReader = SPARK.read()
-                                           
.format("org.apache.cassandra.spark.sparksql.LocalDataSource")
-                                           .option("keyspace", keyspace)
-                                           .option("createStmt", 
createStatement)
-                                           .option("dirs", 
directory.toAbsolutePath().toString())
-                                           .option("version", 
version.toString())
-                                           .option("useBufferingInputStream", 
true)  // Use in the test system to test the BufferingInputStream
-                                           .option("partitioner", 
partitioner.name())
-                                           
.option(SchemaFeatureSet.LAST_MODIFIED_TIMESTAMP.optionName(), 
addLastModifiedTimestampColumn)
-                                           .option("udts", udts.stream()
-                                                               .map(udt -> 
udt.createStatement(bridge.cassandraTypes(), keyspace))
-                                                               
.collect(Collectors.joining("\n")));
+        DataFrameReader frameReader = session()
+                                      .read()
+                                      
.format("org.apache.cassandra.spark.sparksql.LocalDataSource")
+                                      .option("keyspace", keyspace)
+                                      .option("createStmt", createStatement)
+                                      .option("dirs", 
directory.toAbsolutePath().toString())
+                                      .option("version", version.toString())
+                                      .option("useBufferingInputStream", true) 
 // Use in the test system to test the BufferingInputStream
+                                      .option("partitioner", 
partitioner.name())
+                                      
.option(SchemaFeatureSet.LAST_MODIFIED_TIMESTAMP.optionName(), 
addLastModifiedTimestampColumn)
+                                      .option("udts", udts.stream()
+                                                          .map(udt -> 
udt.createStatement(bridge.cassandraTypes(), keyspace))
+                                                          
.collect(Collectors.joining("\n")));
         if (statsClass != null)
         {
             frameReader = frameReader.option("statsClass", statsClass);
@@ -364,4 +382,14 @@ public final class TestUtils
                           .substring(0, size)
                           .getBytes(StandardCharsets.UTF_8);
     }
+
+    public static Range<BigInteger> range(long start, long end)
+    {
+        return range(BigInteger.valueOf(start), BigInteger.valueOf(end));
+    }
+
+    public static Range<BigInteger> range(BigInteger start, BigInteger end)
+    {
+        return Range.openClosed(start, end);
+    }
 }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidatorTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidatorTest.java
index ef6cb13..5b33859 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidatorTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidatorTest.java
@@ -26,6 +26,7 @@ import java.util.UUID;
 import org.junit.jupiter.api.Test;
 
 import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
+import 
org.apache.cassandra.spark.bulkwriter.token.MultiClusterReplicaAwareFailureHandler;
 import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
 import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
 import org.apache.cassandra.spark.data.QualifiedTableName;
@@ -77,7 +78,7 @@ class BulkWriteValidatorTest
         when(mockJobInfo.jobKeepAliveMinutes()).thenReturn(-1);
         when(mockWriterContext.job()).thenReturn(mockJobInfo);
 
-        ReplicaAwareFailureHandler<RingInstance> failureHandler = new 
ReplicaAwareFailureHandler<>(Partitioner.Murmur3Partitioner);
+        ReplicaAwareFailureHandler<RingInstance> failureHandler = new 
MultiClusterReplicaAwareFailureHandler<>(Partitioner.Murmur3Partitioner);
         BulkWriteValidator writerValidator = new 
BulkWriteValidator(mockWriterContext, failureHandler);
         assertThatThrownBy(() -> writerValidator.validateClOrFail(topology))
         .isExactlyInstanceOf(ConsistencyNotSatisfiedException.class)
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfoTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfoTest.java
index 846b897..8497f81 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfoTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfoTest.java
@@ -19,20 +19,19 @@
 
 package org.apache.cassandra.spark.bulkwriter;
 
-import java.math.BigInteger;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Range;
 import org.junit.jupiter.api.Test;
 
 import o.a.c.sidecar.client.shaded.common.response.TimeSkewResponse;
 import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
 import org.apache.cassandra.spark.exception.TimeSkewTooLargeException;
 
+import static org.apache.cassandra.spark.TestUtils.range;
 import static org.assertj.core.api.Assertions.assertThatNoException;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
@@ -76,11 +75,6 @@ public class CassandraClusterInfoTest
         return new MockClusterInfoForTimeSkew(allowanceMinutes, remoteNow);
     }
 
-    private static Range<BigInteger> range(long start, long end)
-    {
-        return Range.openClosed(BigInteger.valueOf(start), 
BigInteger.valueOf(end));
-    }
-
     private static class MockClusterInfoForTimeSkew extends 
CassandraClusterInfo
     {
         private CassandraContext cassandraContext;
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/DirectStreamSessionTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/DirectStreamSessionTest.java
index 95af8d9..2c85e75 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/DirectStreamSessionTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/DirectStreamSessionTest.java
@@ -39,6 +39,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
+import 
org.apache.cassandra.spark.bulkwriter.token.MultiClusterReplicaAwareFailureHandler;
 import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
 import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
 import org.apache.cassandra.spark.common.model.CassandraInstance;
@@ -278,7 +279,7 @@ public class DirectStreamSessionTest
     @NotNull
     private ReplicaAwareFailureHandler<RingInstance> 
replicaAwareFailureHandler()
     {
-        return new 
ReplicaAwareFailureHandler<>(writerContext.cluster().getPartitioner());
+        return new 
MultiClusterReplicaAwareFailureHandler<>(writerContext.cluster().getPartitioner());
     }
 
     private DirectStreamSession createStreamSession(MockTableWriter.Creator 
writerCreator)
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinatorTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinatorTest.java
index 7386c69..5a15198 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinatorTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinatorTest.java
@@ -56,7 +56,7 @@ import 
org.apache.cassandra.spark.bulkwriter.blobupload.BlobStreamResult;
 import org.apache.cassandra.spark.bulkwriter.blobupload.CreatedRestoreSlice;
 import org.apache.cassandra.spark.bulkwriter.blobupload.StorageClient;
 import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
-import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
+import 
org.apache.cassandra.spark.bulkwriter.token.MultiClusterReplicaAwareFailureHandler;
 import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
 import org.apache.cassandra.spark.data.QualifiedTableName;
 import org.apache.cassandra.spark.data.ReplicationFactor;
@@ -124,7 +124,7 @@ class ImportCompletionCoordinatorTest
         
when(mockClusterInfo.getTokenRangeMapping(anyBoolean())).thenReturn(topology);
         when(mockWriterContext.job()).thenReturn(mockJobInfo);
 
-        writerValidator = new BulkWriteValidator(mockWriterContext, new 
ReplicaAwareFailureHandler<>(Partitioner.Murmur3Partitioner));
+        writerValidator = new BulkWriteValidator(mockWriterContext, new 
MultiClusterReplicaAwareFailureHandler<>(Partitioner.Murmur3Partitioner));
 
         // clients will not be used in this test class; mock is at the API 
method level
         BlobDataTransferApi api = new BlobDataTransferApi(mockJobInfo, 
mock(SidecarClient.class), mock(StorageClient.class));
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
index 9fd90ad..6f33183 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
@@ -227,6 +227,7 @@ public class MockBulkWriterContext implements 
BulkWriterContext, ClusterInfo, Jo
         return consistencyLevel;
     }
 
+    // todo: yifan - unused?
     public void setConsistencyLevel(ConsistencyLevel.CL consistencyLevel)
     {
         this.consistencyLevel = consistencyLevel;
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java
index 022b44e..bd9efbb 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java
@@ -25,45 +25,27 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
 
 import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
-import com.google.common.collect.Range;
 import org.junit.jupiter.api.Test;
 
 import o.a.c.sidecar.client.shaded.common.response.data.RingEntry;
-import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel.CL;
-import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
-import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
-import org.apache.cassandra.spark.data.ReplicationFactor;
-import org.apache.cassandra.spark.data.partitioner.Partitioner;
 import org.jetbrains.annotations.NotNull;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 
 public class RingInstanceTest
 {
-    public static final String DATACENTER_1 = "DATACENTER1";
-
-    static List<RingInstance> getInstances(BigInteger[] tokens, String 
datacenter)
+    public static RingInstance instance(BigInteger token, String nodeName, 
String datacenter)
     {
-        List<RingInstance> instances = new ArrayList<>();
-        for (int token = 0; token < tokens.length; token++)
-        {
-            instances.add(instance(tokens[token], "node-" + token, 
datacenter));
-        }
-        return instances;
+        return instance(token, nodeName, datacenter, null);
     }
 
-    static RingInstance instance(BigInteger token, String nodeName, String 
datacenter)
+    public static RingInstance instance(BigInteger token, String nodeName, 
String datacenter, String clusterId)
     {
         return new RingInstance(new RingEntry.Builder()
                                 .datacenter(datacenter)
@@ -77,49 +59,8 @@ public class RingInstanceTest
                                 .owns("")
                                 .load("")
                                 .hostId("")
-                                .build());
-    }
-
-    static BigInteger[] getTokens(Partitioner partitioner, int nodes)
-    {
-        BigInteger[] tokens = new BigInteger[nodes];
-
-        for (int node = 0; node < nodes; node++)
-        {
-            tokens[node] = partitioner == Partitioner.Murmur3Partitioner
-                           ? getMurmur3Token(nodes, node)
-                           : getRandomToken(nodes, node);
-        }
-        return tokens;
-    }
-
-    static BigInteger getRandomToken(int nodes, int index)
-    {
-        // ((2^127 / nodes) * i)
-        return 
((BigInteger.valueOf(2).pow(127)).divide(BigInteger.valueOf(nodes))).multiply(BigInteger.valueOf(index));
-    }
-
-    static BigInteger getMurmur3Token(int nodes, int index)
-    {
-        // (((2^64 / n) * i) - 2^63)
-        return 
(((BigInteger.valueOf(2).pow(64)).divide(BigInteger.valueOf(nodes)))
-                
.multiply(BigInteger.valueOf(index))).subtract(BigInteger.valueOf(2).pow(63));
-    }
-
-    private static ReplicaAwareFailureHandler<RingInstance> 
ntsStrategyHandler(Partitioner partitioner)
-    {
-        return new ReplicaAwareFailureHandler<>(partitioner);
-    }
-
-    private static Map<String, Integer> ntsOptions(String[] names, int[] 
values)
-    {
-        assert names.length == values.length : "Invalid replication options - 
array lengths do not match";
-        Map<String, Integer> options = Maps.newHashMap();
-        for (int name = 0; name < names.length; name++)
-        {
-            options.put(names[name], values[name]);
-        }
-        return options;
+                                .build(),
+                                clusterId);
     }
 
     @Test
@@ -177,14 +118,14 @@ public class RingInstanceTest
     }
 
     @Test
-    public void testHasClusterId()
+    public void testGetClusterId()
     {
         RingEntry ringEntry = mockRingEntry();
         RingInstance instance = new RingInstance(ringEntry);
-        assertFalse(instance.hasClusterId());
+        assertNull(instance.clusterId());
 
         RingInstance instanceWithClusterId = new RingInstance(ringEntry, 
"cluster1");
-        assertTrue(instanceWithClusterId.hasClusterId());
+        assertNotNull(instanceWithClusterId.clusterId());
         assertEquals("cluster1", instanceWithClusterId.clusterId());
     }
 
@@ -220,30 +161,7 @@ public class RingInstanceTest
         assertEquals(1, newErrorMap.keySet().size());
     }
 
-    @Test
-    public void testMultipleFailuresSingleInstanceSucceedRF3()
-    {
-        Partitioner partitioner = Partitioner.Murmur3Partitioner;
-        BigInteger[] tokens = getTokens(partitioner, 5);
-        List<RingInstance> instances = getInstances(tokens, DATACENTER_1);
-        RingInstance instance1 = instances.get(0);
-        RingInstance instance2 = instance(tokens[0], instance1.nodeName(), 
instance1.datacenter());
-        ReplicaAwareFailureHandler<RingInstance> replicationFactor3 = 
ntsStrategyHandler(partitioner);
-        ReplicationFactor repFactor = new 
ReplicationFactor(ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy,
-                                                            ntsOptions(new 
String[]{DATACENTER_1 }, new int[]{3 }));
-        Multimap<RingInstance, Range<BigInteger>> tokenRanges = 
TokenRangeMappingUtils.setupTokenRangeMap(partitioner, repFactor, instances);
-        TokenRangeMapping<RingInstance> tokenRange = new 
TokenRangeMapping<>(partitioner,
-                                                                             
tokenRanges,
-                                                                             
Collections.emptySet());
-
-        // This test proves that for any RF3 keyspace
-        replicationFactor3.addFailure(Range.openClosed(tokens[0], tokens[1]), 
instance1, "Complete Failure");
-        replicationFactor3.addFailure(Range.openClosed(tokens[0], 
tokens[0].add(BigInteger.ONE)), instance2, "Failure 1");
-        
replicationFactor3.addFailure(Range.openClosed(tokens[0].add(BigInteger.ONE),
-                                                       
tokens[0].add(BigInteger.valueOf(2L))), instance2, "Failure 2");
 
-        assertTrue(replicationFactor3.getFailedRanges(tokenRange, 
CL.LOCAL_QUORUM, DATACENTER_1, repFactor).isEmpty());
-    }
 
     @NotNull
     private static RingEntry mockRingEntry()
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java
index 5b0b927..9972c54 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java
@@ -42,7 +42,7 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
-import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
+import 
org.apache.cassandra.spark.bulkwriter.token.MultiClusterReplicaAwareFailureHandler;
 import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
 import org.apache.cassandra.spark.common.model.CassandraInstance;
 import org.apache.cassandra.spark.data.ReplicationFactor;
@@ -222,7 +222,7 @@ public class StreamSessionConsistencyTest
                                        transportContext,
                                        "sessionId",
                                        RANGE,
-                                       new 
ReplicaAwareFailureHandler<>(writerContext.cluster().getPartitioner()),
+                                       new 
MultiClusterReplicaAwareFailureHandler<>(writerContext.cluster().getPartitioner()),
                                        executor);
     }
 }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java
index 6c1c1f2..e907f10 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java
@@ -217,13 +217,13 @@ public final class TokenRangeMappingUtils
                 replicas.add("localhost" + (i + r) % instancesCount + ":9042");
             }
             Map<String, List<String>> replicasPerDc = new HashMap<>();
-            replicasPerDc.put("ignored", replicas);
+            replicasPerDc.put("dc1", replicas);
             ReplicaInfo ri = new ReplicaInfo(String.valueOf(startToken), 
String.valueOf(endToken), replicasPerDc);
             replicaInfoList.add(ri);
             String address = "localhost" + i;
             int port = 9042;
             String addressWithPort = address + ":" + port;
-            ReplicaMetadata rm = new ReplicaMetadata("NORMAL", "UP", address, 
address, 9042, "ignored");
+            ReplicaMetadata rm = new ReplicaMetadata("NORMAL", "UP", address, 
address, 9042, "dc1");
             replicaMetadata.put(addressWithPort, rm);
             startToken = endToken;
         }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobStreamSessionTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobStreamSessionTest.java
index 0957258..9411350 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobStreamSessionTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/blobupload/BlobStreamSessionTest.java
@@ -57,6 +57,7 @@ import org.apache.cassandra.spark.bulkwriter.RingInstance;
 import org.apache.cassandra.spark.bulkwriter.SortedSSTableWriter;
 import org.apache.cassandra.spark.bulkwriter.TokenRangeMappingUtils;
 import org.apache.cassandra.spark.bulkwriter.TransportContext;
+import 
org.apache.cassandra.spark.bulkwriter.token.MultiClusterReplicaAwareFailureHandler;
 import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
 import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
 import org.apache.cassandra.spark.common.client.ClientException;
@@ -90,7 +91,8 @@ class BlobStreamSessionTest
         TokenRangeMapping<RingInstance> topology = 
TokenRangeMappingUtils.buildTokenRangeMapping(0, ImmutableMap.of("DC1", 3), 3);
         MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(topology);
         BulkWriterContext spiedWriterContext = spy(bulkWriterContext);
-        ReplicaAwareFailureHandler<RingInstance> replicaAwareFailureHandler = 
new ReplicaAwareFailureHandler<>(bulkWriterContext.cluster().getPartitioner());
+        ReplicaAwareFailureHandler<RingInstance> replicaAwareFailureHandler =
+        new 
MultiClusterReplicaAwareFailureHandler<>(bulkWriterContext.cluster().getPartitioner());
         Range<BigInteger> range = Range.range(BigInteger.valueOf(100L), 
BoundType.OPEN, BigInteger.valueOf(199L), BoundType.CLOSED);
         JobInfo job = mock(JobInfo.class);
         when(job.getRestoreJobId()).thenReturn(jobId);
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/FailureHandlerTextContext.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/FailureHandlerTextContext.java
new file mode 100644
index 0000000..dbf43ab
--- /dev/null
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/FailureHandlerTextContext.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter.token;
+
+import org.apache.cassandra.spark.bulkwriter.ClusterInfo;
+import org.apache.cassandra.spark.bulkwriter.JobInfo;
+import org.apache.cassandra.spark.bulkwriter.RingInstance;
+
+class FailureHandlerTextContext
+{
+    TokenRangeMapping<RingInstance> topology;
+    JobInfo jobInfo;
+    ClusterInfo clusterInfo;
+
+    FailureHandlerTextContext(TokenRangeMapping<RingInstance> topology, 
JobInfo jobInfo, ClusterInfo clusterInfo)
+    {
+        this.topology = topology;
+        this.jobInfo = jobInfo;
+        this.clusterInfo = clusterInfo;
+    }
+}
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/MultiClusterReplicaAwareFailureHandlerTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/MultiClusterReplicaAwareFailureHandlerTest.java
new file mode 100644
index 0000000..71e99db
--- /dev/null
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/MultiClusterReplicaAwareFailureHandlerTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter.token;
+
+import java.math.BigInteger;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Range;
+import org.junit.jupiter.api.Test;
+
+import org.apache.cassandra.spark.bulkwriter.ClusterInfo;
+import org.apache.cassandra.spark.bulkwriter.JobInfo;
+import org.apache.cassandra.spark.bulkwriter.RingInstance;
+import 
org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CassandraClusterInfoGroup;
+import 
org.apache.cassandra.spark.bulkwriter.coordinatedwrite.CoordinatedWriteConf;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+
+import static org.apache.cassandra.spark.TestUtils.range;
+import static org.apache.cassandra.spark.bulkwriter.RingInstanceTest.instance;
+import static 
org.apache.cassandra.spark.bulkwriter.token.TokenRangeMappingTest.createTestMapping;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class MultiClusterReplicaAwareFailureHandlerTest
+{
+    private static final String DATACENTER_1 = "dc1";
+    private final Partitioner partitioner = Partitioner.Murmur3Partitioner;
+    private MultiClusterReplicaAwareFailureHandler<RingInstance> handler = new 
MultiClusterReplicaAwareFailureHandler<>(partitioner);
+
+    @Test
+    void testAddFailuresFromBothInstancesWithAndWithoutClusterIdFails()
+    {
+        RingInstance instance = instance(BigInteger.valueOf(10), "node1", 
DATACENTER_1);
+        RingInstance instanceWithClusterId = new 
RingInstance(instance.ringEntry(), "testCluster");
+        assertThatNoException().isThrownBy(() -> handler.addFailure(range(0, 
10), instance, "failure"));
+        assertThatThrownBy(() -> handler.addFailure(range(0, 10), 
instanceWithClusterId, "failure"))
+        .isExactlyInstanceOf(IllegalStateException.class)
+        .hasMessage("Cannot track failures from both instances with and 
without clusterId");
+
+        // create a new handler and add failures in the other order
+        handler = new MultiClusterReplicaAwareFailureHandler<>(partitioner);
+        assertThatNoException().isThrownBy(() -> handler.addFailure(range(0, 
10), instanceWithClusterId, "failure"));
+        assertThatThrownBy(() -> handler.addFailure(range(0, 10), instance, 
"failure"))
+        .isExactlyInstanceOf(IllegalStateException.class)
+        .hasMessage("Cannot track failures from both instances with and 
without clusterId");
+    }
+
+    @Test
+    void testGetFailedInstanceFromMultipleClusters()
+    {
+        RingInstance instanceC1 = instance(BigInteger.ZERO, "node1", 
DATACENTER_1, "cluster1");
+        RingInstance instanceC2 = instance(BigInteger.ZERO, "node1", 
DATACENTER_1, "cluster2");
+        handler.addFailure(range(-10, 0), instanceC1, "failure in cluster1 
instance");
+        handler.addFailure(range(-10, 0), instanceC2, "failure in cluster2 
instance");
+        Set<RingInstance> failedInstances = handler.getFailedInstances();
+        assertThat(failedInstances)
+        .hasSize(2)
+        .containsExactlyInAnyOrder(instanceC1, instanceC2);
+    }
+
+    @Test
+    void testGetFailedRangesOfClusters()
+    {
+        testFailedRangeCheckWithTwoClusters(ctx -> {
+            Range<BigInteger> range = range(1, 10); // range value that does 
not overlap with the ranges in topology
+            Map<Range<BigInteger>, Set<RingInstance>> 
writeReplicasOfRangeAcrossClusters = 
ctx.topology.getWriteReplicasOfRange(range, DATACENTER_1);
+            assertThat(writeReplicasOfRangeAcrossClusters).hasSize(1);
+            Map<String, List<RingInstance>> writeReplicasPerCluster = 
writeReplicasOfRangeAcrossClusters
+                                                                      .values()
+                                                                      .stream()
+                                                                      
.flatMap(Set::stream)
+                                                                      
.collect(Collectors.groupingBy(RingInstance::clusterId));
+            
assertThat(writeReplicasPerCluster).hasSize(2).containsKeys("cluster1", 
"cluster2");
+            handler.addFailure(range, 
writeReplicasPerCluster.get("cluster1").get(0), "failure in cluster1");
+            handler.addFailure(range, 
writeReplicasPerCluster.get("cluster2").get(0), "failure in cluster2");
+            assertThat(handler.getFailedRanges(ctx.topology, ctx.jobInfo, 
ctx.clusterInfo))
+            .describedAs("Each cluster has only 1 failure of the range, which 
is acceptable for LOCAL_QUORUM.")
+            .isEmpty();
+
+            // now cluster1 has 2 failures
+            handler.addFailure(range, 
writeReplicasPerCluster.get("cluster1").get(1), "another failure in cluster1");
+            
List<ReplicaAwareFailureHandler<RingInstance>.ConsistencyFailurePerRange>
+            failedRanges = handler.getFailedRanges(ctx.topology, ctx.jobInfo, 
ctx.clusterInfo);
+            assertThat(failedRanges)
+            .describedAs("Cluster1 has failed range")
+            .hasSize(1);
+            Set<RingInstance> failedInstances = 
failedRanges.get(0).failuresPerInstance.instances();
+            assertThat(failedInstances)
+            .hasSize(2)
+            
.containsExactlyInAnyOrder(writeReplicasPerCluster.get("cluster1").get(0),
+                                       
writeReplicasPerCluster.get("cluster1").get(1));
+
+            // now cluster2 has 2 failures too
+            handler.addFailure(range, 
writeReplicasPerCluster.get("cluster2").get(1), "another failure in cluster2");
+            failedRanges = handler.getFailedRanges(ctx.topology, ctx.jobInfo, 
ctx.clusterInfo);
+            assertThat(failedRanges)
+            .describedAs("Both clusters have failed ranges")
+            .hasSize(2);
+            Set<RingInstance> failedInstancesInCluster2 = 
failedRanges.get(1).failuresPerInstance.instances();
+            assertThat(failedInstancesInCluster2)
+            .hasSize(2)
+            
.containsExactlyInAnyOrder(writeReplicasPerCluster.get("cluster2").get(0),
+                                       
writeReplicasPerCluster.get("cluster2").get(1));
+        });
+    }
+
+    private void 
testFailedRangeCheckWithTwoClusters(Consumer<FailureHandlerTextContext> test)
+    {
+        TokenRangeMapping<RingInstance> cluster1Topology = 
createTestMapping(0, 5, partitioner, "cluster1");
+        TokenRangeMapping<RingInstance> cluster2Topology = 
createTestMapping(1, 5, partitioner, "cluster2");
+        TokenRangeMapping<RingInstance> consolidatedTopology = 
TokenRangeMapping.consolidate(Arrays.asList(cluster1Topology, 
cluster2Topology));
+        JobInfo jobInfo = mock(JobInfo.class);
+        
when(jobInfo.getConsistencyLevel()).thenReturn(ConsistencyLevel.CL.LOCAL_QUORUM);
+        CoordinatedWriteConf cwc = mock(CoordinatedWriteConf.class);
+        CoordinatedWriteConf.ClusterConf cc = 
mock(CoordinatedWriteConf.ClusterConf.class);
+        when(cc.localDc()).thenReturn(DATACENTER_1);
+        when(cwc.cluster(any())).thenReturn(cc);
+        when(jobInfo.coordinatedWriteConf()).thenReturn(cwc);
+        CassandraClusterInfoGroup group = 
mock(CassandraClusterInfoGroup.class);
+        ReplicationFactor rf = new 
ReplicationFactor(ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy,
+                                                     
ImmutableMap.of(DATACENTER_1, 3));
+        ClusterInfo clusterInfo = mock(ClusterInfo.class);
+        when(clusterInfo.replicationFactor()).thenReturn(rf);
+        when(group.cluster(any())).thenReturn(clusterInfo); // return the same 
clusterinfo for both clusters (for test simplicity)
+        test.accept(new FailureHandlerTextContext(consolidatedTopology, 
jobInfo, group));
+    }
+}
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/SingleClusterReplicaAwareFailureHandlerTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/SingleClusterReplicaAwareFailureHandlerTest.java
new file mode 100644
index 0000000..3813dc1
--- /dev/null
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/SingleClusterReplicaAwareFailureHandlerTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter.token;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Range;
+import org.junit.jupiter.api.Test;
+
+import org.apache.cassandra.spark.bulkwriter.ClusterInfo;
+import org.apache.cassandra.spark.bulkwriter.JobInfo;
+import org.apache.cassandra.spark.bulkwriter.RingInstance;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+
+import static org.apache.cassandra.spark.TestUtils.range;
+import static org.apache.cassandra.spark.bulkwriter.RingInstanceTest.instance;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class SingleClusterReplicaAwareFailureHandlerTest
+{
+    private static final String DATACENTER_1 = "dc1";
+    private final Partitioner partitioner = Partitioner.Murmur3Partitioner;
+    private final SingleClusterReplicaAwareFailureHandler<RingInstance> 
handler = new SingleClusterReplicaAwareFailureHandler<>(partitioner, null);
+
+    @Test
+    void testGetFailedInstances()
+    {
+        RingInstance instance1 = instance(BigInteger.valueOf(10), "instance1", 
DATACENTER_1);
+        RingInstance instance2 = instance(BigInteger.valueOf(20), "instance2", 
DATACENTER_1);
+        assertThat(handler.isEmpty()).isTrue();
+        handler.addFailure(range(0, 10), instance1, "instance 1 fails");
+        assertThat(handler.isEmpty()).isFalse();
+        handler.addFailure(range(0, 10), instance2, "instance 2 fails");
+        handler.addFailure(range(10, 20), instance2, "instance 2 fails");
+        assertThat(handler.getFailedInstances())
+        .hasSize(2)
+        .containsExactlyInAnyOrder(instance1, instance2);
+    }
+
+    @Test
+    public void testMinorityFailuresProduceNoFailedRanges()
+    {
+        testFailedRangeCheck(ctx -> {
+            Range<BigInteger> range = range(0, 10);
+            Map<Range<BigInteger>, Set<RingInstance>> writeReplicasOfRange = 
ctx.topology.getWriteReplicasOfRange(range, DATACENTER_1);
+            assertThat(writeReplicasOfRange).hasSize(1);
+            List<RingInstance> writeReplicas = new 
ArrayList<>(writeReplicasOfRange.values().iterator().next());
+            assertThat(writeReplicas).hasSize(3);
+            RingInstance instance = writeReplicas.get(1);
+            // one failure per each distinct range; it should not fail as CL 
is LOCAL_QUORUM
+            Range<BigInteger> range1 = range(0, 3);
+            Range<BigInteger> range2 = range(3, 4);
+            Range<BigInteger> range3 = range(5, 6);
+            handler.addFailure(range1, instance, "Failure 1");
+            handler.addFailure(range2, instance, "Failure 2");
+            handler.addFailure(range3, instance, "Failure 3");
+            assertThat(handler.getFailedRanges(ctx.topology, ctx.jobInfo, 
ctx.clusterInfo)).isEmpty();
+        });
+    }
+
+    @Test
+    public void testMajorityFailuresProduceFailedRanges()
+    {
+        testFailedRangeCheck(ctx -> {
+            Range<BigInteger> range = range(0, 10);
+            Map<Range<BigInteger>, Set<RingInstance>> writeReplicasOfRange = 
ctx.topology.getWriteReplicasOfRange(range, DATACENTER_1);
+            assertThat(writeReplicasOfRange).hasSize(1);
+            List<RingInstance> writeReplicas = new 
ArrayList<>(writeReplicasOfRange.values().iterator().next());
+            assertThat(writeReplicas).hasSize(3);
+            // the majority of the replicas of range
+            RingInstance instance1 = writeReplicas.get(1);
+            RingInstance instance2 = writeReplicas.get(2);
+
+            handler.addFailure(range, instance1, "fails on instance 1");
+            handler.addFailure(range, instance2, "fails on instance 2");
+            
List<ReplicaAwareFailureHandler<RingInstance>.ConsistencyFailurePerRange>
+            failedRanges = handler.getFailedRanges(ctx.topology, ctx.jobInfo, 
ctx.clusterInfo);
+            assertThat(failedRanges).isNotEmpty();
+        });
+    }
+
+    private void testFailedRangeCheck(Consumer<FailureHandlerTextContext> test)
+    {
+
+        TokenRangeMapping<RingInstance> topology = 
TokenRangeMappingTest.createTestMapping(0, 5, partitioner, null);
+        JobInfo jobInfo = mock(JobInfo.class);
+        
when(jobInfo.getConsistencyLevel()).thenReturn(ConsistencyLevel.CL.LOCAL_QUORUM);
+        when(jobInfo.getLocalDC()).thenReturn(DATACENTER_1);
+        ClusterInfo clusterInfo = mock(ClusterInfo.class);
+        ReplicationFactor rf = new 
ReplicationFactor(ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy,
+                                                     
ImmutableMap.of(DATACENTER_1, 3));
+        when(clusterInfo.replicationFactor()).thenReturn(rf);
+        test.accept(new FailureHandlerTextContext(topology, jobInfo, 
clusterInfo));
+    }
+}
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMappingTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMappingTest.java
index b80538d..92f9aba 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMappingTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMappingTest.java
@@ -35,6 +35,7 @@ import 
org.apache.cassandra.spark.bulkwriter.TokenRangeMappingUtils;
 import org.apache.cassandra.spark.common.model.NodeState;
 import org.apache.cassandra.spark.data.partitioner.Partitioner;
 
+import static org.apache.cassandra.spark.TestUtils.range;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -219,7 +220,7 @@ class TokenRangeMappingTest
         return createTestMapping(0L, instanceCount, partitioner, null);
     }
 
-    private TokenRangeMapping<RingInstance> createTestMapping(long startToken, 
int instanceCount, Partitioner partitioner, String clusterId)
+    public static TokenRangeMapping<RingInstance> createTestMapping(long 
startToken, int instanceCount, Partitioner partitioner, String clusterId)
     {
         return TokenRangeMapping.create(
         () -> 
TokenRangeMappingUtils.mockSimpleTokenRangeReplicasResponse(startToken, 
instanceCount, 3),
@@ -227,11 +228,6 @@ class TokenRangeMappingTest
         metadata -> new RingInstance(metadata, clusterId));
     }
 
-    private Range<BigInteger> range(long start, long end)
-    {
-        return Range.openClosed(BigInteger.valueOf(start), 
BigInteger.valueOf(end));
-    }
-
     private List<RingInstance> getSoleValue(Map<?, List<RingInstance>> map)
     {
         if (map.isEmpty())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to