yifan-c commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1416591381


##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java:
##########
@@ -207,46 +203,62 @@ private Set<RingInstance> 
instancesFromMapping(Map<Range<BigInteger>, List<RingI
     private StreamSession maybeCreateStreamSession(TaskContext taskContext,
                                                    StreamSession streamSession,
                                                    Tuple2<DecoratedKey, 
Object[]> rowData,
-                                                   Set<Range<BigInteger>> 
newRanges,
-                                                   
ReplicaAwareFailureHandler<RingInstance> failureHandler) throws IOException
+                                                   Set<Range<BigInteger>> 
subRanges,
+                                                   
ReplicaAwareFailureHandler<RingInstance> failureHandler,
+                                                   List<StreamResult> results)
+    throws IOException, ExecutionException, InterruptedException
     {
         BigInteger token = rowData._1().getToken();
         Range<BigInteger> tokenRange = getTokenRange(taskContext);
 
         Preconditions.checkState(tokenRange.contains(token),
                                  String.format("Received Token %s outside of 
expected range %s", token, tokenRange));
 
-        // token range for this partition is not among the write-replica-set 
ranges
-        if (!newRanges.contains(tokenRange))
+        // We have split ranges likely resulting from pending nodes
+        // Evaluate creating a new session if the token from current row is 
part of a sub-range
+        if (subRanges.size() > 1)
         {
-            Set<Range<BigInteger>> subRanges = 
getIntersectingSubRanges(newRanges, tokenRange);
-            // We have split ranges - likely resulting from pending nodes
-            if (subRanges.size() > 1)
-            {
-                // Create session using sub-range that contains the token from 
current row
-                Range<BigInteger> matchingRange = subRanges.stream().filter(r 
-> r.contains(token)).findFirst().get();
-                Preconditions.checkState(matchingRange != null,
-                                         String.format("Received Token %s 
outside of expected range %s", token, matchingRange));
+            // Create session using sub-range that contains the token from 
current row
+            Range<BigInteger> matchingSubRange = subRanges.stream().filter(r 
-> r.contains(token)).findFirst().get();
+            Preconditions.checkState(matchingSubRange != null,
+                                     String.format("Received Token %s outside 
of expected range %s", token, matchingSubRange));

Review Comment:
   The `checkState` will not ever see `matchingSubRange == null`. The reason is 
that at line#222, if the value is null, the `get()` operation throws exception 
already. 
   If the intent to provide a more user friendly error message, can you not 
call `get()` and use `Optional<Range<BigInteger>> matchingSubRangeOpt` to 
capture the result and run `checkState` on the optional.



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java:
##########
@@ -132,37 +133,32 @@ public List<StreamResult> 
write(Iterator<Tuple2<DecoratedKey, Object[]>> sourceI
         Map<String, Object> valueMap = new HashMap<>();
         try
         {
-            List<RingInstance> exclusions = 
failureHandler.getFailedInstances();
             Set<Range<BigInteger>> newRanges = 
initialTokenRangeMapping.getRangeMap().asMapOfRanges().entrySet()
                                                                        
.stream()
-                                                                       
.filter(e -> !exclusions.contains(e.getValue()))
                                                                        
.map(Map.Entry::getKey)
                                                                        
.collect(Collectors.toSet());
+            Range<BigInteger> tokenRange = getTokenRange(taskContext);
+            Set<Range<BigInteger>> subRanges = newRanges.contains(tokenRange) ?
+                                               
Collections.singleton(tokenRange) :
+                                               
getIntersectingSubRanges(newRanges, tokenRange);
 
             while (dataIterator.hasNext())
             {
                 Tuple2<DecoratedKey, Object[]> rowData = dataIterator.next();
-                streamSession = maybeCreateStreamSession(taskContext, 
streamSession, rowData, newRanges, failureHandler);
-
-                sessions.add(streamSession);
+                streamSession = maybeCreateStreamSession(taskContext, 
streamSession, rowData, subRanges, failureHandler, results);
                 maybeCreateTableWriter(partitionId, baseDir);
                 writeRow(rowData, valueMap, partitionId, 
streamSession.getTokenRange());
                 checkBatchSize(streamSession, partitionId, job);
             }
 
-            // Finalize SSTable for the last StreamSession
-            if (sstableWriter != null || (streamSession != null && batchSize 
!= 0))
+            // Cleanup SSTable writer and schedule the last stream

Review Comment:
   "Cleanup SSTable writer" reads wrong to me. I would stick with "Finalize". 
The code is to flush any data to sstable by closing the writer. Cleanup leads 
me to think it remove files.  



##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java:
##########
@@ -50,13 +51,36 @@ private TokenRangeMappingUtils()
 
     public static TokenRangeMapping<RingInstance> buildTokenRangeMapping(final 
int initialToken, final ImmutableMap<String, Integer> rfByDC, int 
instancesPerDC)
     {
+        return buildTokenRangeMapping(initialToken, rfByDC, instancesPerDC, 
false, -1);
+    }
 
+    public static TokenRangeMapping<RingInstance> 
buildTokenRangeMappingWithFailures(int initialToken,
+                                                                               
      ImmutableMap<String, Integer> rfByDC,
+                                                                               
      int instancesPerDC)
+    {
         final List<RingInstance> instances = getInstances(initialToken, 
rfByDC, instancesPerDC);
+        RingInstance instance = instances.remove(0);
+        RingEntry entry = instance.getRingInstance();
+        RingEntry newEntry = new RingEntry.Builder()
+                             .datacenter(entry.datacenter())
+                             .port(entry.port())
+                             .address(entry.address())
+                             .status(InstanceStatus.DOWN.name())
+                             .state(entry.state())
+                             .token(entry.token())
+                             .fqdn(entry.fqdn())
+                             .rack(entry.rack())
+                             .owns(entry.owns())
+                             .load(entry.load())
+                             .hostId(entry.hostId())
+                             .build();
+        RingInstance newInstance = new RingInstance(newEntry);
+        instances.add(0, newInstance);
         ReplicationFactor replicationFactor = getReplicationFactor(rfByDC);
         Map<String, Set<String>> writeReplicas =
-        instances.stream()
-                 .collect(Collectors.groupingBy(RingInstance::getDataCenter,
-                                                
Collectors.mapping(RingInstance::getNodeName, Collectors.toSet())));
+        
instances.stream().collect(Collectors.groupingBy(RingInstance::getDataCenter,
+                                                         
Collectors.mapping(RingInstance::getNodeName,
+                                                                            
Collectors.toSet())));

Review Comment:
   nit: prefer to not reformatting if there is no actual code change. 



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidator.java:
##########
@@ -23,103 +23,120 @@
 import java.util.AbstractMap;
 import java.util.Collection;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
 
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Range;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
+import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
 
-public class BulkWriteValidator implements AutoCloseable
+public class BulkWriteValidator
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(BulkWriteValidator.class);
 
     private final ReplicaAwareFailureHandler<RingInstance> failureHandler;
-    private final CassandraRingMonitor monitor;
     private final JobInfo job;
     private String phase = "Initializing";
 
     private final ClusterInfo cluster;
 
     public BulkWriteValidator(BulkWriterContext bulkWriterContext,
-                              Consumer<CancelJobEvent> cancelJobFunc) throws 
Exception
+                              ReplicaAwareFailureHandler<RingInstance> 
failureHandler)
     {
         cluster = bulkWriterContext.cluster();
         job = bulkWriterContext.job();
-        failureHandler = new 
ReplicaAwareFailureHandler<>(cluster.getRing(true));
-        monitor = new CassandraRingMonitor(cluster, cancelJobFunc, 1000, 
TimeUnit.MILLISECONDS);
+        this.failureHandler = failureHandler;
     }
 
-    public void setPhase(String phase)
-    {
-        this.phase = phase;
-    }
-
-    public String getPhase()
-    {
-        return phase;
-    }
-
-    public void validateInitialEnvironment()
-    {
-        validateCLOrFail();
-    }
-
-    public void close()
-    {
-        monitor.stop();
-    }
-
-    private void validateCLOrFail()
-    {
-        updateInstanceAvailability();
-        validateClOrFail(failureHandler, LOGGER, phase, job);
-    }
-
-    public static void 
validateClOrFail(ReplicaAwareFailureHandler<RingInstance> failureHandler,
+    public static void validateClOrFail(TokenRangeMapping<RingInstance> 
tokenRangeMapping,
+                                        
ReplicaAwareFailureHandler<RingInstance> failureHandler,
                                         Logger logger,
                                         String phase,
                                         JobInfo job)
     {
         Collection<AbstractMap.SimpleEntry<Range<BigInteger>, 
Multimap<RingInstance, String>>> failedRanges =
-                failureHandler.getFailedEntries(job.getConsistencyLevel(), 
job.getLocalDC());
+        failureHandler.getFailedEntries(tokenRangeMapping, 
job.getConsistencyLevel(), job.getLocalDC());
+
         if (failedRanges.isEmpty())
         {
             logger.info("Succeeded {} with {}", phase, 
job.getConsistencyLevel());
         }
         else
         {
-            String message = String.format("Failed to load %s ranges with %s 
for job %s in phase %s",
+            String message = String.format("Failed to load %s ranges with %s 
for job %s in phase %s.",
                                            failedRanges.size(), 
job.getConsistencyLevel(), job.getId(), phase);
             logger.error(message);
-            failedRanges.forEach(failedRange -> 
failedRange.getValue().keySet().forEach(instance ->
-                    logger.error("Failed {} for {} on {}", phase, 
failedRange.getKey(), instance.toString())));
+            failedRanges.forEach(failedRange ->
+                                 failedRange.getValue()
+                                            .keySet()
+                                            .forEach(instance ->
+                                                     logger.error("Failed {} 
for {} on {}",
+                                                                  phase,
+                                                                  
failedRange.getKey(),
+                                                                  
instance.toString())));
             throw new RuntimeException(message);
         }
     }
 
-    public static void updateFailureHandler(CommitResult commitResult,
-                                            String phase,
+    public String getPhase()
+    {
+        return phase;
+    }
+
+    public void setPhase(String phase)
+    {
+        this.phase = phase;
+    }
+
+    public static void updateFailureHandler(CommitResult commitResult, String 
phase,
                                             
ReplicaAwareFailureHandler<RingInstance> failureHandler)
     {
         LOGGER.debug("Commit Result: {}", commitResult);
         commitResult.failures.forEach((uuid, err) -> {
             LOGGER.warn("[{}]: {} failed on {} with message {}",
-                    uuid, phase, commitResult.instance.getNodeName(), 
err.errMsg);
+                        uuid,
+                        phase,
+                        commitResult.instance.getNodeName(),
+                        err.errMsg);
             failureHandler.addFailure(err.tokenRange, commitResult.instance, 
err.errMsg);
         });
     }
 
+    public void validateClOrFail(TokenRangeMapping<RingInstance> 
tokenRangeMapping)
+    {
+        // Updates failures by looking up instance metadata
+        updateInstanceAvailability();
+        // Fails if the failures violate consistency requirements
+        validateClOrFail(tokenRangeMapping, failureHandler, LOGGER, phase, 
job);
+    }
+
     private void updateInstanceAvailability()
     {
         cluster.refreshClusterInfo();
         Map<RingInstance, InstanceAvailability> availability = 
cluster.getInstanceAvailability();
         availability.forEach(this::checkInstance);
     }
 
+    private void updateFailureHandler(RingInstance instance, 
InstanceAvailability availability)

Review Comment:
   This method is only called in `private void checkInstance(RingInstance 
instance, InstanceAvailability availability)`, which throws exception when 
`availability == InstanceAvailability.INVALID_STATE`. It voids the need to 
check the condition again in this method. 



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CommitCoordinator.java:
##########
@@ -128,61 +128,51 @@ private Stream<ListenableFuture<CommitResult>> 
commit(Map<RingInstance, Listenin
                                                           RingInstance 
instance,
                                                           Map<String, 
Range<BigInteger>> uploadRanges)
     {
-        if (cluster.instanceIsAvailable(instance))

Review Comment:
   What if the instance is down? The new code will try to commit the sstable on 
those down instances. Should it skip contacting sidecar and mark it as failure 
directly? 



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamSession.java:
##########
@@ -145,32 +157,27 @@ private List<CommitResult> commit(StreamResult 
streamResult) throws ExecutionExc
     @VisibleForTesting
     List<RingInstance> getReplicas()
     {
-        Map<Range<BigInteger>, List<RingInstance>> overlappingRanges = 
ring.getSubRanges(tokenRange).asMapOfRanges();
+        List<RingInstance> exclusions = failureHandler.getFailedInstances();
+        final Map<Range<BigInteger>, List<RingInstance>> overlappingRanges = 
tokenRangeMapping.getSubRanges(tokenRange).asMapOfRanges();

Review Comment:
   drop the `final` for local scoped variable. 



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ReplicaAwareFailureHandler.java:
##########
@@ -71,64 +72,138 @@ public void addFailure(Range<BigInteger> tokenRange, 
Instance casInstance, Strin
         for (Map.Entry<Range<BigInteger>, Multimap<Instance, String>> entry : 
overlappingFailures.asMapOfRanges().entrySet())
         {
             Multimap<Instance, String> newErrorMap = 
ArrayListMultimap.create(entry.getValue());
-
             newErrorMap.put(casInstance, errMessage);
             mappingsToAdd.put(entry.getKey(), newErrorMap);
         }
         failedRangesMap.putAll(mappingsToAdd);
     }
 
-    public boolean hasFailed(ConsistencyLevel consistencyLevel, String localDC)
+    public List<Instance> getFailedInstances()

Review Comment:
   How about returning a set instead of list?



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ConsistencyLevel.java:
##########
@@ -19,19 +19,42 @@
 
 package org.apache.cassandra.spark.bulkwriter.token;
 
-import java.util.Collection;
+import java.util.Set;
 
-import com.google.common.base.Preconditions;
-
-import org.apache.cassandra.spark.common.model.CassandraInstance;
-import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public interface ConsistencyLevel
 {
     boolean isLocal();
 
-    boolean checkConsistency(Collection<? extends CassandraInstance> 
failedInsts, ReplicationFactor replicationFactor, String localDC);
-
+    Logger LOGGER = LoggerFactory.getLogger(ConsistencyLevel.class);
+
+    /**
+     * Checks if the consistency guarantees are maintained, given the failed, 
blocked and replacing instances, consistency-level and the replication-factor.
+     * <pre>
+     * - QUORUM based consistency levels check for quorum using the 
write-replica-set (instead of RF) as they include healthy and pending nodes.
+     *   This is done to ensure that writes go to a quorum of healthy nodes 
while accounting for potential failure in pending nodes becoming healthy.
+     * - ONE and TWO consistency guarantees are maintained by ensuring that 
the failures leave us with at-least the corresponding healthy
+     *   (and non-pending) nodes.
+     *
+     *   For both the above cases, blocked instances are also considered as 
failures while performing consistency checks.
+     *   Write replicas are adjusted to exclude replacement nodes for 
consistency checks, if we have replacement nodes that are not among the failed 
instances.
+     *   This is to ensure that we are writing to sufficient non-replacement 
nodes as replacements can potentially fail leaving us with fewer nodes.
+     * </pre>
+     *
+     * TODO javadocs

Review Comment:
   Is it addressed?



##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java:
##########
@@ -69,64 +78,217 @@ public class RecordWriterTest
     @BeforeEach
     public void setUp()
     {
-        tw = new MockTableWriter(folder);
-        ring = RingUtils.buildRing(0, "DC1", "test", 12);
-        writerContext = new MockBulkWriterContext(ring);
+        tw = new MockTableWriter(folder.getRoot());
+        tokenRangeMapping = 
TokenRangeMappingUtils.buildTokenRangeMapping(100000, ImmutableMap.of("DC1", 
3), 12);
+        writerContext = new MockBulkWriterContext(tokenRangeMapping);
         tc = new TestTaskContext();
         range = 
writerContext.job().getTokenPartitioner().getTokenRange(tc.partitionId());
         tokenizer = new Tokenizer(writerContext);
     }
 
     @Test
-    public void testSuccessfulWrite()
+    public void testWriteFailWhenTopologyChangeWithinTask()
+    {
+        // Generate token range mapping to simulate node movement of the first 
node by assigning it a different token
+        // within the same partition
+        int moveTargetToken = 50000;
+        TokenRangeMapping<RingInstance> testMapping =
+        TokenRangeMappingUtils.buildTokenRangeMapping(100000,
+                                                      ImmutableMap.of("DC1", 
3),
+                                                      12,
+                                                      true,
+                                                      moveTargetToken);
+
+        MockBulkWriterContext m = Mockito.spy(writerContext);
+        rw = new RecordWriter(m, COLUMN_NAMES, () -> tc, SSTableWriter::new);
+
+        
when(m.getTokenRangeMapping(false)).thenCallRealMethod().thenReturn(testMapping);
+        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true);
+        RuntimeException ex = assertThrows(RuntimeException.class, () -> 
rw.write(data));
+        assertThat(ex.getMessage(), endsWith("Token range mappings have 
changed since the task started"));
+    }
+
+    @Test
+    public void testWriteWithExclusions()
+    {
+        TokenRangeMapping<RingInstance> testMapping =
+        TokenRangeMappingUtils.buildTokenRangeMappingWithFailures(100000,
+                                                      ImmutableMap.of("DC1", 
3),
+                                                      12);
+
+        MockBulkWriterContext m = Mockito.spy(writerContext);
+        rw = new RecordWriter(m, COLUMN_NAMES, () -> tc, SSTableWriter::new);
+
+        when(m.getTokenRangeMapping(anyBoolean())).thenReturn(testMapping);
+        when(m.getInstanceAvailability()).thenCallRealMethod();
+        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true);
+        rw.write(data);
+        Map<CassandraInstance, List<UploadRequest>> uploads = 
writerContext.getUploads();
+        assertThat(uploads.keySet().size(), is(REPLICA_COUNT));  // Should 
upload to 3 replicas
+    }
+
+    @Test
+    public void testSuccessfulWrite() throws InterruptedException
     {
         Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true);
         validateSuccessfulWrite(writerContext, data, COLUMN_NAMES);
     }
 
     @Test
-    public void testWriteWithConstantTTL()
+    public void testSuccessfulWriteCheckUploads()
     {
-        MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(ring);
+        rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, 
SSTableWriter::new);
+        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true);
+        rw.write(data);
+        Map<CassandraInstance, List<UploadRequest>> uploads = 
writerContext.getUploads();
+        assertThat(uploads.keySet().size(), is(REPLICA_COUNT));  // Should 
upload to 3 replicas
+        assertThat(uploads.values().stream().mapToInt(List::size).sum(), 
is(REPLICA_COUNT * FILES_PER_SSTABLE * UPLOADED_TABLES));
+        List<UploadRequest> requests = 
uploads.values().stream().flatMap(List::stream).collect(Collectors.toList());
+        for (UploadRequest ur : requests)
+        {
+            assertNotNull(ur.fileHash);
+        }
+    }
+
+    @Test
+    public void testWriteWithConstantTTL() throws InterruptedException
+    {
+        MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(tokenRangeMapping);
         Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, 
false, false);
         validateSuccessfulWrite(bulkWriterContext, data, COLUMN_NAMES);
     }
 
     @Test
-    public void testWriteWithTTLColumn()
+    public void testWriteWithTTLColumn() throws InterruptedException
     {
-        MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(ring);
+        MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(tokenRangeMapping);
         Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, 
true, false);
-        String[] columnNamesWithTtl = {"id", "date", "course", "marks", "ttl"};
+        String[] columnNamesWithTtl =
+        {
+        "id", "date", "course", "marks", "ttl"
+        };
         validateSuccessfulWrite(bulkWriterContext, data, columnNamesWithTtl);
     }
 
     @Test
-    public void testWriteWithConstantTimestamp()
+    public void testWriteWithConstantTimestamp() throws InterruptedException
     {
-        MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(ring);
+        MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(tokenRangeMapping);
         Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, 
false, false);
         validateSuccessfulWrite(bulkWriterContext, data, COLUMN_NAMES);
     }
 
     @Test
-    public void testWriteWithTimestampColumn()
+    public void testWriteWithTimestampColumn() throws InterruptedException
     {
-        MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(ring);
+        MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(tokenRangeMapping);
         Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, 
false, true);
-        String[] columnNamesWithTimestamp = {"id", "date", "course", "marks", 
"timestamp"};
+        String[] columnNamesWithTimestamp =
+        {
+        "id", "date", "course", "marks", "timestamp"
+        };
         validateSuccessfulWrite(bulkWriterContext, data, 
columnNamesWithTimestamp);
     }
 
     @Test
-    public void testWriteWithTimestampAndTTLColumn()
+    public void testWriteWithTimestampAndTTLColumn() throws 
InterruptedException
     {
-        MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(ring);
+        MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(tokenRangeMapping);
         Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, 
true, true);
-        String[] columnNames = {"id", "date", "course", "marks", "ttl", 
"timestamp"};
+        String[] columnNames =
+        {
+        "id", "date", "course", "marks", "ttl", "timestamp"
+        };
         validateSuccessfulWrite(bulkWriterContext, data, columnNames);
     }
 
+    @Test
+    public void testWriteWithSubRanges()
+    {
+        MockBulkWriterContext m = Mockito.spy(writerContext);
+        TokenPartitioner mtp = Mockito.mock(TokenPartitioner.class);
+        when(m.job().getTokenPartitioner()).thenReturn(mtp);
+
+        // Override partition's token range to span across ranges to force a 
split into sub-ranges
+        Range<BigInteger> overlapRange = 
Range.closed(BigInteger.valueOf(-9223372036854775808L), 
BigInteger.valueOf(200000));
+        when(mtp.getTokenRange(anyInt())).thenReturn(overlapRange);
+
+        rw = new RecordWriter(m, COLUMN_NAMES, () -> tc, SSTableWriter::new);
+        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true);
+        List<StreamResult> res = rw.write(data);
+        assertEquals(1, res.size());
+        assertNotEquals(overlapRange, res.get(0).tokenRange);
+        final Map<CassandraInstance, List<UploadRequest>> uploads = 
m.getUploads();
+        // Should upload to 3 replicas
+        assertEquals(uploads.keySet().size(), REPLICA_COUNT);
+        assertThat(uploads.values().stream().mapToInt(List::size).sum(), 
is(REPLICA_COUNT * FILES_PER_SSTABLE * UPLOADED_TABLES));
+        List<UploadRequest> requests = 
uploads.values().stream().flatMap(List::stream).collect(Collectors.toList());
+        for (UploadRequest ur : requests)
+        {
+            assertNotNull(ur.fileHash);
+        }
+    }
+
+    @Test
+    public void testWriteWithSubRanges2()

Review Comment:
   can the test name be more descriptive? If comment helps, please add comments 
too. 



##########
cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java:
##########
@@ -90,16 +88,25 @@ public InstanceMetadata instanceFromId(int id) throws 
NoSuchElementException
          * @return instance meta information
          * @throws NoSuchElementException when the instance for {@code host} 
does not exist
          */
+        @Override
         public InstanceMetadata instanceFromHost(String host) throws 
NoSuchElementException
         {
-            return cassandraTestContext.instancesConfig.instanceFromHost(host);
+            return 
cassandraTestContext.instancesConfig().instanceFromHost(host);
         }
     }
 
     @Provides
     @Singleton
     public SidecarConfiguration sidecarConfiguration()
     {
-        return new SidecarConfigurationImpl(new 
ServiceConfigurationImpl("127.0.0.1"));
+        ServiceConfiguration conf = ServiceConfigurationImpl.builder()
+                                                            .host("0.0.0.0") 
// binds to all interfaces, potential security issue if left running for long

Review Comment:
   Is it required to bind to `0.0.0.0`?



##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java:
##########
@@ -82,6 +106,7 @@ public static TokenRangeMapping<RingInstance> 
buildTokenRangeMapping(final int i
                                        Collections.emptySet());
     }
 
+

Review Comment:
   nit: remove duplicated empty lines



##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java:
##########
@@ -346,19 +366,22 @@ void writeBuffered()
 
     private void validateSuccessfulWrite(MockBulkWriterContext writerContext,
                                          Iterator<Tuple2<DecoratedKey, 
Object[]>> data,
-                                         String[] columnNames)
+                                         String[] columnNames) throws 
InterruptedException
     {
         validateSuccessfulWrite(writerContext, data, columnNames, 
UPLOADED_TABLES);
     }
 
     private void validateSuccessfulWrite(MockBulkWriterContext writerContext,
                                          Iterator<Tuple2<DecoratedKey, 
Object[]>> data,
                                          String[] columnNames,
-                                         int uploadedTables)
+                                         int uploadedTables) throws 
InterruptedException
     {
         RecordWriter rw = new RecordWriter(writerContext, columnNames, () -> 
tc, SSTableWriter::new);
         rw.write(data);
+        // Wait for uploads to finish
+        Thread.sleep(500);
         Map<CassandraInstance, List<UploadRequest>> uploads = 
writerContext.getUploads();
+        System.out.println("Uploads in test: " + 
uploads.values().stream().mapToInt(List::size).sum());

Review Comment:
   remove it or use logger. 



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java:
##########
@@ -98,39 +95,19 @@ public void insert(@NotNull Dataset<Row> data, boolean 
overwrite)
         Tokenizer tokenizer = new Tokenizer(writerContext);
         TableSchema tableSchema = writerContext.schema().getTableSchema();
         JavaPairRDD<DecoratedKey, Object[]> sortedRDD = data.toJavaRDD()
-                .map(Row::toSeq)
-                .map(seq -> 
JavaConverters.seqAsJavaListConverter(seq).asJava().toArray())
-                .map(tableSchema::normalize)
-                .keyBy(tokenizer::getDecoratedKey)
-                
.repartitionAndSortWithinPartitions(broadcastContext.getValue().job().getTokenPartitioner());
+                                                            .map(Row::toSeq)
+                                                            .map(seq -> 
JavaConverters.seqAsJavaListConverter(seq).asJava().toArray())
+                                                            
.map(tableSchema::normalize)
+                                                            
.keyBy(tokenizer::getDecoratedKey)
+                                                            
.repartitionAndSortWithinPartitions(broadcastContext.getValue().job().getTokenPartitioner());

Review Comment:
   The indentation was so for the sake of readability. 
   
   The reformatting has no other change but just increasing indentations. Such 
reformatting is not preferred as it creates noise for review. 
   



##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java:
##########
@@ -346,19 +366,22 @@ void writeBuffered()
 
     private void validateSuccessfulWrite(MockBulkWriterContext writerContext,
                                          Iterator<Tuple2<DecoratedKey, 
Object[]>> data,
-                                         String[] columnNames)
+                                         String[] columnNames) throws 
InterruptedException
     {
         validateSuccessfulWrite(writerContext, data, columnNames, 
UPLOADED_TABLES);
     }
 
     private void validateSuccessfulWrite(MockBulkWriterContext writerContext,
                                          Iterator<Tuple2<DecoratedKey, 
Object[]>> data,
                                          String[] columnNames,
-                                         int uploadedTables)
+                                         int uploadedTables) throws 
InterruptedException
     {
         RecordWriter rw = new RecordWriter(writerContext, columnNames, () -> 
tc, SSTableWriter::new);
         rw.write(data);
+        // Wait for uploads to finish
+        Thread.sleep(500);

Review Comment:
   using `sleep` is prone to flaky test. It depends a lot on the test runtime. 
We should do the best to avoid the usage.
   The `rw.write` is already blocking. It blocks until the sstables are 
uploaded. I am not sure why sleep is needed. 



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java:
##########
@@ -339,140 +336,190 @@ public String getLowestCassandraVersion()
         return cassandraVersion;
     }
 
-    public String getVersionFromFeature()
+    @Override
+    public Map<RingInstance, InstanceAvailability> getInstanceAvailability()
     {
-        return null;
+        TokenRangeMapping<RingInstance> mapping = getTokenRangeMapping(true);
+        Map<RingInstance, InstanceAvailability> result =
+        mapping.getReplicaMetadata()
+               .stream()
+               .map(RingInstance::new)
+               .collect(Collectors.toMap(Function.identity(), 
this::determineInstanceAvailability));
+
+        if (LOGGER.isDebugEnabled())
+        {
+            result.forEach((inst, avail) -> LOGGER.debug("Instance {} has 
availability {}", inst, avail));
+        }
+        return result;
     }
 
-    protected List<NodeSettings> getAllNodeSettings()
+    private InstanceAvailability determineInstanceAvailability(RingInstance 
instance)
     {
-        List<NodeSettings> allNodeSettings = 
FutureUtils.bestEffortGet(allNodeSettingFutures,
-                                                                       
conf.getSidecarRequestMaxRetryDelayInSeconds(),
-                                                                       
TimeUnit.SECONDS);
-
-        if (allNodeSettings.isEmpty())
+        if (!instanceIsUp(instance.getRingInstance()))
         {
-            throw new RuntimeException(String.format("Unable to determine the 
node settings. 0/%d instances available.",
-                                                     
allNodeSettingFutures.size()));
+            return InstanceAvailability.UNAVAILABLE_DOWN;
         }
-        else if (allNodeSettings.size() < allNodeSettingFutures.size())
+        if (instanceIsBlocked(instance))
         {
-            LOGGER.warn("{}/{} instances were used to determine the node 
settings",
-                        allNodeSettings.size(), allNodeSettingFutures.size());
+            return InstanceAvailability.UNAVAILABLE_BLOCKED;
         }
-
-        return allNodeSettings;
-    }
-
-    public String getVersionFromSidecar()
-    {
-        NodeSettings nodeSettings = this.nodeSettings.get();
-        if (nodeSettings != null)
+        if (instanceIsNormal(instance.getRingInstance()) ||
+            instanceIsTransitioning(instance.getRingInstance()) ||
+            instanceIsBeingReplaced(instance.getRingInstance()))
         {
-            return nodeSettings.releaseVersion();
+            return InstanceAvailability.AVAILABLE;
         }
 
-        return getLowestVersion(getAllNodeSettings());
+        LOGGER.info("No valid state found for instance {}", instance);
+        // If it's not one of the above, it's inherently INVALID.
+        return InstanceAvailability.INVALID_STATE;
     }
 
-    protected RingResponse getRingResponse()
+    private TokenRangeMapping<RingInstance> getTokenRangeReplicas()
     {
-        RingResponse currentRingResponse = ringResponse;
-        if (currentRingResponse != null)
+        Map<String, Set<String>> writeReplicasByDC;
+        Map<String, Set<String>> pendingReplicasByDC;
+        List<ReplicaMetadata> replicaMetadata;
+        Set<RingInstance> blockedInstances;
+        Set<RingInstance> replacementInstances;
+        Multimap<RingInstance, Range<BigInteger>> tokenRangesByInstance;
+        try
         {
-            return currentRingResponse;
-        }
+            TokenRangeReplicasResponse response = 
getTokenRangesAndReplicaSets();
+            replicaMetadata = response.replicaMetadata();
 
-        synchronized (this)
-        {
-            if (ringResponse == null)
+            tokenRangesByInstance = 
getTokenRangesByInstance(response.writeReplicas(), response.replicaMetadata());
+            LOGGER.info("Retrieved token ranges for {} instances from write 
replica set ",
+                        tokenRangesByInstance.size());
+
+            replacementInstances = response.replicaMetadata()
+                                           .stream()
+                                           .filter(m -> 
m.state().equalsIgnoreCase(InstanceState.REPLACING.toString()))
+                                           .map(RingInstance::new)
+                                           .collect(Collectors.toSet());
+
+            blockedInstances = response.replicaMetadata().stream()
+                                       .map(RingInstance::new)
+                                       .filter(this::instanceIsBlocked)
+                                       .collect(Collectors.toSet());
+
+            Set<String> blockedIps = blockedInstances.stream().map(i -> 
i.getRingInstance().address())
+                                                     
.collect(Collectors.toSet());
+
+            // Each token range has hosts by DC. We collate them across all 
ranges into all hosts by DC
+            writeReplicasByDC = response.writeReplicas()
+                                        .stream()
+                                        .flatMap(wr -> 
wr.replicasByDatacenter().entrySet().stream())
+                                        
.collect(Collectors.toMap(Map.Entry::getKey, e -> new HashSet<>(e.getValue()),
+                                                                  (l1, l2) -> 
filterAndMergeInstances(l1, l2, blockedIps)));
+
+            pendingReplicasByDC = getPendingReplicas(response, 
writeReplicasByDC);
+
+            if (LOGGER.isDebugEnabled())
             {
-                try
-                {
-                    ringResponse = getCurrentRingResponse();
-                }
-                catch (Exception exception)
-                {
-                    LOGGER.error("Failed to load Cassandra ring", exception);
-                    throw new RuntimeException(exception);
-                }
+                LOGGER.debug("Fetched token-ranges with dcs={}, 
write_replica_count={}, pending_replica_count={}",
+                             writeReplicasByDC.keySet(),
+                             
writeReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size(),
+                             
pendingReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size());
             }
-            return ringResponse;
         }
-    }
+        catch (ExecutionException | InterruptedException exception)
+        {
+            LOGGER.error("Failed to get token ranges, ", exception);
+            throw new RuntimeException(exception);
+        }
 
-    private RingResponse getCurrentRingResponse() throws Exception
-    {
-        return 
getCassandraContext().getSidecarClient().ring(conf.keyspace).get();
+        // Include availability info so CL checks can use it to exclude 
replacement hosts
+        return new TokenRangeMapping<>(getPartitioner(),
+                                       getReplicationFactor(),
+                                       writeReplicasByDC,
+                                       pendingReplicasByDC,
+                                       tokenRangesByInstance,
+                                       replicaMetadata,
+                                       blockedInstances,
+                                       replacementInstances);
     }
 
-    private static List<RingInstance> getSerializableInstances(RingResponse 
ringResponse)
+    private Set<String> filterAndMergeInstances(Set<String> instancesList1, 
Set<String> instancesList2, Set<String> blockedIPs)
     {
-        return ringResponse.stream()
-                           .map(RingInstance::new)
-                           .collect(Collectors.toList());
+        Set<String> merged = new HashSet<>();
+        // Removes blocked instances. If this is included, remove 
blockedInstances from CL checks
+        merged.addAll(instancesList1.stream().filter(i -> 
!blockedIPs.contains(i)).collect(Collectors.toSet()));
+        merged.addAll(instancesList2.stream().filter(i -> 
!blockedIPs.contains(i)).collect(Collectors.toSet()));
+
+        return merged;
     }
 
-    private static RingInstance getCasInstanceMethodsImpl(RingEntry ringEntry)
+    // Pending replicas are currently calculated by extracting the 
non-read-replicas from the write-replica-set
+    // This will be replaced by the instance state metadata when it is 
supported by the token-ranges API
+    private Map<String, Set<String>> 
getPendingReplicas(TokenRangeReplicasResponse response, Map<String, 
Set<String>> writeReplicasByDC)
     {
-        return new RingInstance(ringEntry);
+        Set<String> readReplicas = 
readReplicasFromTokenRangeResponse(response);
+        return writeReplicasByDC.entrySet()
+                                .stream()
+                                .filter(entry -> 
entry.getValue().stream().noneMatch(readReplicas::contains))
+                                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
     }
 
-    protected GossipInfoResponse getGossipInfo(boolean forceRefresh)
+    private Multimap<RingInstance, Range<BigInteger>> 
getTokenRangesByInstance(List<ReplicaInfo> writeReplicas,
+                                                                               
List<ReplicaMetadata> replicaMetadata)
     {
-        GossipInfoResponse currentGossipInfo = gossipInfo;
-        if (!forceRefresh && currentGossipInfo != null)
-        {
-            return currentGossipInfo;
-        }
-
-        synchronized (this)
+        Multimap<RingInstance, Range<BigInteger>> instanceToRangeMap = 
ArrayListMultimap.create();
+        for (ReplicaInfo rInfo : writeReplicas)
         {
-            if (forceRefresh || gossipInfo == null)
+            Range<BigInteger> range = Range.openClosed(new 
BigInteger(rInfo.start()), new BigInteger(rInfo.end()));
+            for (Map.Entry<String, List<String>> dcReplicaEntry : 
rInfo.replicasByDatacenter().entrySet())
             {
-                try
-                {
-                    gossipInfo = 
cassandraContext.getSidecarClient().gossipInfo().get(conf.getHttpResponseTimeoutMs(),
-                                                                               
       TimeUnit.MILLISECONDS);
-                }
-                catch (ExecutionException | InterruptedException exception)
-                {
-                    LOGGER.error("Failed to retrieve gossip information");
-                    throw new RuntimeException("Failed to retrieve gossip 
information", exception);
-                }
-                catch (TimeoutException exception)
-                {
-                    Thread.currentThread().interrupt();
-                    throw new RuntimeException("Failed to retrieve gossip 
information", exception);
-                }
+                // For each writeReplica, get metadata and update map to 
include range
+                dcReplicaEntry.getValue().forEach(ipAddress -> {
+                    // Get metadata for this IP; Create RingInstance
+                    // TODO: Temporary change to extract IP from 'ip:port' 
string. THis will go oway once
+                    // corresponding change in sidecar is merged.
+                    ReplicaMetadata replica = replicaMetadata.stream()
+                                                             .filter(r ->
+                                                                     
r.address().equals(ipAddress.split(":")[0]))

Review Comment:
   what if it is an IPv6 address?



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java:
##########
@@ -339,140 +336,190 @@ public String getLowestCassandraVersion()
         return cassandraVersion;
     }
 
-    public String getVersionFromFeature()
+    @Override
+    public Map<RingInstance, InstanceAvailability> getInstanceAvailability()
     {
-        return null;
+        TokenRangeMapping<RingInstance> mapping = getTokenRangeMapping(true);
+        Map<RingInstance, InstanceAvailability> result =
+        mapping.getReplicaMetadata()
+               .stream()
+               .map(RingInstance::new)
+               .collect(Collectors.toMap(Function.identity(), 
this::determineInstanceAvailability));
+
+        if (LOGGER.isDebugEnabled())
+        {
+            result.forEach((inst, avail) -> LOGGER.debug("Instance {} has 
availability {}", inst, avail));
+        }
+        return result;
     }
 
-    protected List<NodeSettings> getAllNodeSettings()
+    private InstanceAvailability determineInstanceAvailability(RingInstance 
instance)
     {
-        List<NodeSettings> allNodeSettings = 
FutureUtils.bestEffortGet(allNodeSettingFutures,
-                                                                       
conf.getSidecarRequestMaxRetryDelayInSeconds(),
-                                                                       
TimeUnit.SECONDS);
-
-        if (allNodeSettings.isEmpty())
+        if (!instanceIsUp(instance.getRingInstance()))
         {
-            throw new RuntimeException(String.format("Unable to determine the 
node settings. 0/%d instances available.",
-                                                     
allNodeSettingFutures.size()));
+            return InstanceAvailability.UNAVAILABLE_DOWN;
         }
-        else if (allNodeSettings.size() < allNodeSettingFutures.size())
+        if (instanceIsBlocked(instance))
         {
-            LOGGER.warn("{}/{} instances were used to determine the node 
settings",
-                        allNodeSettings.size(), allNodeSettingFutures.size());
+            return InstanceAvailability.UNAVAILABLE_BLOCKED;
         }
-
-        return allNodeSettings;
-    }
-
-    public String getVersionFromSidecar()
-    {
-        NodeSettings nodeSettings = this.nodeSettings.get();
-        if (nodeSettings != null)
+        if (instanceIsNormal(instance.getRingInstance()) ||
+            instanceIsTransitioning(instance.getRingInstance()) ||
+            instanceIsBeingReplaced(instance.getRingInstance()))
         {
-            return nodeSettings.releaseVersion();
+            return InstanceAvailability.AVAILABLE;
         }
 
-        return getLowestVersion(getAllNodeSettings());
+        LOGGER.info("No valid state found for instance {}", instance);
+        // If it's not one of the above, it's inherently INVALID.
+        return InstanceAvailability.INVALID_STATE;
     }
 
-    protected RingResponse getRingResponse()
+    private TokenRangeMapping<RingInstance> getTokenRangeReplicas()
     {
-        RingResponse currentRingResponse = ringResponse;
-        if (currentRingResponse != null)
+        Map<String, Set<String>> writeReplicasByDC;
+        Map<String, Set<String>> pendingReplicasByDC;
+        List<ReplicaMetadata> replicaMetadata;
+        Set<RingInstance> blockedInstances;
+        Set<RingInstance> replacementInstances;
+        Multimap<RingInstance, Range<BigInteger>> tokenRangesByInstance;
+        try
         {
-            return currentRingResponse;
-        }
+            TokenRangeReplicasResponse response = 
getTokenRangesAndReplicaSets();
+            replicaMetadata = response.replicaMetadata();
 
-        synchronized (this)
-        {
-            if (ringResponse == null)
+            tokenRangesByInstance = 
getTokenRangesByInstance(response.writeReplicas(), response.replicaMetadata());
+            LOGGER.info("Retrieved token ranges for {} instances from write 
replica set ",
+                        tokenRangesByInstance.size());
+
+            replacementInstances = response.replicaMetadata()
+                                           .stream()
+                                           .filter(m -> 
m.state().equalsIgnoreCase(InstanceState.REPLACING.toString()))
+                                           .map(RingInstance::new)
+                                           .collect(Collectors.toSet());
+
+            blockedInstances = response.replicaMetadata().stream()
+                                       .map(RingInstance::new)
+                                       .filter(this::instanceIsBlocked)
+                                       .collect(Collectors.toSet());
+
+            Set<String> blockedIps = blockedInstances.stream().map(i -> 
i.getRingInstance().address())
+                                                     
.collect(Collectors.toSet());
+
+            // Each token range has hosts by DC. We collate them across all 
ranges into all hosts by DC
+            writeReplicasByDC = response.writeReplicas()
+                                        .stream()
+                                        .flatMap(wr -> 
wr.replicasByDatacenter().entrySet().stream())
+                                        
.collect(Collectors.toMap(Map.Entry::getKey, e -> new HashSet<>(e.getValue()),
+                                                                  (l1, l2) -> 
filterAndMergeInstances(l1, l2, blockedIps)));
+
+            pendingReplicasByDC = getPendingReplicas(response, 
writeReplicasByDC);
+
+            if (LOGGER.isDebugEnabled())
             {
-                try
-                {
-                    ringResponse = getCurrentRingResponse();
-                }
-                catch (Exception exception)
-                {
-                    LOGGER.error("Failed to load Cassandra ring", exception);
-                    throw new RuntimeException(exception);
-                }
+                LOGGER.debug("Fetched token-ranges with dcs={}, 
write_replica_count={}, pending_replica_count={}",
+                             writeReplicasByDC.keySet(),
+                             
writeReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size(),
+                             
pendingReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size());
             }
-            return ringResponse;
         }
-    }
+        catch (ExecutionException | InterruptedException exception)
+        {
+            LOGGER.error("Failed to get token ranges, ", exception);
+            throw new RuntimeException(exception);
+        }
 
-    private RingResponse getCurrentRingResponse() throws Exception
-    {
-        return 
getCassandraContext().getSidecarClient().ring(conf.keyspace).get();
+        // Include availability info so CL checks can use it to exclude 
replacement hosts
+        return new TokenRangeMapping<>(getPartitioner(),
+                                       getReplicationFactor(),
+                                       writeReplicasByDC,
+                                       pendingReplicasByDC,
+                                       tokenRangesByInstance,
+                                       replicaMetadata,
+                                       blockedInstances,
+                                       replacementInstances);
     }
 
-    private static List<RingInstance> getSerializableInstances(RingResponse 
ringResponse)
+    private Set<String> filterAndMergeInstances(Set<String> instancesList1, 
Set<String> instancesList2, Set<String> blockedIPs)
     {
-        return ringResponse.stream()
-                           .map(RingInstance::new)
-                           .collect(Collectors.toList());
+        Set<String> merged = new HashSet<>();
+        // Removes blocked instances. If this is included, remove 
blockedInstances from CL checks
+        merged.addAll(instancesList1.stream().filter(i -> 
!blockedIPs.contains(i)).collect(Collectors.toSet()));
+        merged.addAll(instancesList2.stream().filter(i -> 
!blockedIPs.contains(i)).collect(Collectors.toSet()));
+
+        return merged;
     }
 
-    private static RingInstance getCasInstanceMethodsImpl(RingEntry ringEntry)
+    // Pending replicas are currently calculated by extracting the 
non-read-replicas from the write-replica-set
+    // This will be replaced by the instance state metadata when it is 
supported by the token-ranges API

Review Comment:
   Replica metadata is exposed from the `TokenRangeReplicasResponse` now. Is 
the implementation outdated?



##########
cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ResiliencyTestBase.java:
##########
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Range;
+
+import com.datastax.driver.core.ConsistencyLevel;
+import 
o.a.c.analytics.sidecar.shaded.testing.adapters.base.StorageJmxOperations;
+import o.a.c.analytics.sidecar.shaded.testing.common.JmxClient;
+import o.a.c.analytics.sidecar.shaded.testing.common.data.QualifiedTableName;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.spark.KryoRegister;
+import org.apache.cassandra.spark.bulkwriter.BulkSparkConf;
+import org.apache.cassandra.spark.bulkwriter.DecoratedKey;
+import org.apache.cassandra.spark.bulkwriter.Tokenizer;
+import org.apache.cassandra.spark.common.schema.ColumnType;
+import org.apache.cassandra.spark.common.schema.ColumnTypes;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.DataFrameWriter;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructType;
+import scala.Tuple2;
+
+import static junit.framework.TestCase.assertTrue;
+import static 
org.apache.cassandra.distributed.shared.NetworkTopology.dcAndRack;
+import static 
org.apache.cassandra.distributed.shared.NetworkTopology.networkTopology;
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Base class for resiliency tests. Contains helper methods for data 
generation and validation
+ */
+public abstract class ResiliencyTestBase extends IntegrationTestBase
+{
+    private static final String createTableStmt = "create table if not exists 
%s (id int, course text, marks int, primary key (id));";
+    protected static final String retrieveRows = "select * from " + 
TEST_KEYSPACE + ".%s";
+    public static final int rowCount = 1000;
+
+    public QualifiedTableName initializeSchema()
+    {
+        return initializeSchema(ImmutableMap.of("datacenter1", 1));
+    }
+
+    public QualifiedTableName initializeSchema(Map<String, Integer> rf)
+    {
+        createTestKeyspace(rf);
+        return createTestTable(createTableStmt);
+    }
+
+    public SparkConf generateSparkConf()
+    {
+        SparkConf sparkConf = new SparkConf()
+                              .setAppName("Integration test Spark Cassandra 
Bulk Reader Job")
+                              .set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
+                              .set("spark.master", "local[8,4]");
+        BulkSparkConf.setupSparkConf(sparkConf, true);
+        KryoRegister.setup(sparkConf);
+        return sparkConf;
+    }
+
+    public SparkSession generateSparkSession(SparkConf sparkConf)
+    {
+        return SparkSession.builder()
+                           .config(sparkConf)
+                           .getOrCreate();
+    }
+
+    public Set<String> getDataForRange(Range<BigInteger> range)
+    {
+        // Iterate through all data entries; filter only entries that belong 
to range; convert to strings
+        return generateExpectedData().stream()
+                   .filter(t -> range.contains(t._1().getToken()))
+                   .map(t -> t._2()[0] + ":" + t._2()[1] + ":" + t._2()[2])
+                   .collect(Collectors.toSet());
+    }
+
+    public List<Tuple2<DecoratedKey, Object[]>> generateExpectedData()
+    {
+        // "create table if not exists %s (id int, course text, marks int, 
primary key (id));";
+        List<ColumnType<?>> columnTypes = Arrays.asList(ColumnTypes.INT);
+        Tokenizer tokenizer = new Tokenizer(Arrays.asList(0),
+                                            Arrays.asList("id"),
+                                            columnTypes,
+                                            true
+        );
+        return IntStream.range(0, rowCount).mapToObj(recordNum -> {
+            Object[] columns = new Object[]
+                               {
+                               recordNum, "course" + recordNum, recordNum
+                               };
+            return Tuple2.apply(tokenizer.getDecoratedKey(columns), columns);
+        }).collect(Collectors.toList());
+    }
+
+    public Map<IUpgradeableInstance, Set<String>> 
getInstanceData(List<IUpgradeableInstance> instances,
+                                                                  boolean 
isPending)
+    {
+
+        return instances.stream().collect(Collectors.toMap(Function.identity(),
+                                                           i -> 
filterTokenRangeData(getRangesForInstance(i, isPending))));
+    }
+
+    public Set<String> filterTokenRangeData(List<Range<BigInteger>> ranges)
+    {
+        return ranges.stream()
+                 .map(r -> getDataForRange(r))
+                 .flatMap(Collection::stream)
+                 .collect(Collectors.toSet());
+    }
+
+    private List<Range<BigInteger>> getRangesForInstance(IUpgradeableInstance 
instance, boolean isPending)
+    {
+        IInstanceConfig config = instance.config();
+        JmxClient client = JmxClient.builder()
+                                    
.host(config.broadcastAddress().getAddress().getHostAddress())
+                                    .port(config.jmxPort())
+                                    .build();
+        StorageJmxOperations ss = client.proxy(StorageJmxOperations.class, 
"org.apache.cassandra.db:type=StorageService");
+
+        Map<List<String>, List<String>> ranges = isPending ? 
ss.getPendingRangeToEndpointWithPortMap(TEST_KEYSPACE)
+                                                           : 
ss.getRangeToEndpointWithPortMap(TEST_KEYSPACE);
+
+        // filter ranges that belong to the instance
+        return ranges.entrySet()
+                            .stream()
+                            .filter(e -> 
e.getValue().contains(instance.broadcastAddress().getAddress().getHostAddress()
+                                                               + ":" + 
instance.broadcastAddress().getPort()))
+                            .map(e -> unwrapRanges(e.getKey()))
+                            .flatMap(Collection::stream)
+                            .collect(Collectors.toList());

Review Comment:
   not aligned



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java:
##########
@@ -339,140 +336,190 @@ public String getLowestCassandraVersion()
         return cassandraVersion;
     }
 
-    public String getVersionFromFeature()
+    @Override
+    public Map<RingInstance, InstanceAvailability> getInstanceAvailability()
     {
-        return null;
+        TokenRangeMapping<RingInstance> mapping = getTokenRangeMapping(true);
+        Map<RingInstance, InstanceAvailability> result =
+        mapping.getReplicaMetadata()
+               .stream()
+               .map(RingInstance::new)
+               .collect(Collectors.toMap(Function.identity(), 
this::determineInstanceAvailability));
+
+        if (LOGGER.isDebugEnabled())
+        {
+            result.forEach((inst, avail) -> LOGGER.debug("Instance {} has 
availability {}", inst, avail));
+        }
+        return result;
     }
 
-    protected List<NodeSettings> getAllNodeSettings()
+    private InstanceAvailability determineInstanceAvailability(RingInstance 
instance)
     {
-        List<NodeSettings> allNodeSettings = 
FutureUtils.bestEffortGet(allNodeSettingFutures,
-                                                                       
conf.getSidecarRequestMaxRetryDelayInSeconds(),
-                                                                       
TimeUnit.SECONDS);
-
-        if (allNodeSettings.isEmpty())
+        if (!instanceIsUp(instance.getRingInstance()))
         {
-            throw new RuntimeException(String.format("Unable to determine the 
node settings. 0/%d instances available.",
-                                                     
allNodeSettingFutures.size()));
+            return InstanceAvailability.UNAVAILABLE_DOWN;
         }
-        else if (allNodeSettings.size() < allNodeSettingFutures.size())
+        if (instanceIsBlocked(instance))
         {
-            LOGGER.warn("{}/{} instances were used to determine the node 
settings",
-                        allNodeSettings.size(), allNodeSettingFutures.size());
+            return InstanceAvailability.UNAVAILABLE_BLOCKED;
         }
-
-        return allNodeSettings;
-    }
-
-    public String getVersionFromSidecar()
-    {
-        NodeSettings nodeSettings = this.nodeSettings.get();
-        if (nodeSettings != null)
+        if (instanceIsNormal(instance.getRingInstance()) ||
+            instanceIsTransitioning(instance.getRingInstance()) ||
+            instanceIsBeingReplaced(instance.getRingInstance()))
         {
-            return nodeSettings.releaseVersion();
+            return InstanceAvailability.AVAILABLE;
         }
 
-        return getLowestVersion(getAllNodeSettings());
+        LOGGER.info("No valid state found for instance {}", instance);
+        // If it's not one of the above, it's inherently INVALID.
+        return InstanceAvailability.INVALID_STATE;
     }
 
-    protected RingResponse getRingResponse()
+    private TokenRangeMapping<RingInstance> getTokenRangeReplicas()
     {
-        RingResponse currentRingResponse = ringResponse;
-        if (currentRingResponse != null)
+        Map<String, Set<String>> writeReplicasByDC;
+        Map<String, Set<String>> pendingReplicasByDC;
+        List<ReplicaMetadata> replicaMetadata;
+        Set<RingInstance> blockedInstances;
+        Set<RingInstance> replacementInstances;
+        Multimap<RingInstance, Range<BigInteger>> tokenRangesByInstance;
+        try
         {
-            return currentRingResponse;
-        }
+            TokenRangeReplicasResponse response = 
getTokenRangesAndReplicaSets();
+            replicaMetadata = response.replicaMetadata();
 
-        synchronized (this)
-        {
-            if (ringResponse == null)
+            tokenRangesByInstance = 
getTokenRangesByInstance(response.writeReplicas(), response.replicaMetadata());
+            LOGGER.info("Retrieved token ranges for {} instances from write 
replica set ",
+                        tokenRangesByInstance.size());
+
+            replacementInstances = response.replicaMetadata()
+                                           .stream()
+                                           .filter(m -> 
m.state().equalsIgnoreCase(InstanceState.REPLACING.toString()))
+                                           .map(RingInstance::new)
+                                           .collect(Collectors.toSet());
+
+            blockedInstances = response.replicaMetadata().stream()
+                                       .map(RingInstance::new)
+                                       .filter(this::instanceIsBlocked)
+                                       .collect(Collectors.toSet());
+
+            Set<String> blockedIps = blockedInstances.stream().map(i -> 
i.getRingInstance().address())
+                                                     
.collect(Collectors.toSet());
+
+            // Each token range has hosts by DC. We collate them across all 
ranges into all hosts by DC
+            writeReplicasByDC = response.writeReplicas()
+                                        .stream()
+                                        .flatMap(wr -> 
wr.replicasByDatacenter().entrySet().stream())
+                                        
.collect(Collectors.toMap(Map.Entry::getKey, e -> new HashSet<>(e.getValue()),
+                                                                  (l1, l2) -> 
filterAndMergeInstances(l1, l2, blockedIps)));
+
+            pendingReplicasByDC = getPendingReplicas(response, 
writeReplicasByDC);
+
+            if (LOGGER.isDebugEnabled())
             {
-                try
-                {
-                    ringResponse = getCurrentRingResponse();
-                }
-                catch (Exception exception)
-                {
-                    LOGGER.error("Failed to load Cassandra ring", exception);
-                    throw new RuntimeException(exception);
-                }
+                LOGGER.debug("Fetched token-ranges with dcs={}, 
write_replica_count={}, pending_replica_count={}",
+                             writeReplicasByDC.keySet(),
+                             
writeReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size(),
+                             
pendingReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size());
             }
-            return ringResponse;
         }
-    }
+        catch (ExecutionException | InterruptedException exception)
+        {
+            LOGGER.error("Failed to get token ranges, ", exception);
+            throw new RuntimeException(exception);
+        }
 
-    private RingResponse getCurrentRingResponse() throws Exception
-    {
-        return 
getCassandraContext().getSidecarClient().ring(conf.keyspace).get();
+        // Include availability info so CL checks can use it to exclude 
replacement hosts
+        return new TokenRangeMapping<>(getPartitioner(),
+                                       getReplicationFactor(),
+                                       writeReplicasByDC,
+                                       pendingReplicasByDC,
+                                       tokenRangesByInstance,
+                                       replicaMetadata,
+                                       blockedInstances,
+                                       replacementInstances);
     }
 
-    private static List<RingInstance> getSerializableInstances(RingResponse 
ringResponse)
+    private Set<String> filterAndMergeInstances(Set<String> instancesList1, 
Set<String> instancesList2, Set<String> blockedIPs)
     {
-        return ringResponse.stream()
-                           .map(RingInstance::new)
-                           .collect(Collectors.toList());
+        Set<String> merged = new HashSet<>();
+        // Removes blocked instances. If this is included, remove 
blockedInstances from CL checks
+        merged.addAll(instancesList1.stream().filter(i -> 
!blockedIPs.contains(i)).collect(Collectors.toSet()));
+        merged.addAll(instancesList2.stream().filter(i -> 
!blockedIPs.contains(i)).collect(Collectors.toSet()));
+
+        return merged;
     }
 
-    private static RingInstance getCasInstanceMethodsImpl(RingEntry ringEntry)
+    // Pending replicas are currently calculated by extracting the 
non-read-replicas from the write-replica-set
+    // This will be replaced by the instance state metadata when it is 
supported by the token-ranges API
+    private Map<String, Set<String>> 
getPendingReplicas(TokenRangeReplicasResponse response, Map<String, 
Set<String>> writeReplicasByDC)
     {
-        return new RingInstance(ringEntry);
+        Set<String> readReplicas = 
readReplicasFromTokenRangeResponse(response);
+        return writeReplicasByDC.entrySet()
+                                .stream()
+                                .filter(entry -> 
entry.getValue().stream().noneMatch(readReplicas::contains))
+                                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
     }
 
-    protected GossipInfoResponse getGossipInfo(boolean forceRefresh)
+    private Multimap<RingInstance, Range<BigInteger>> 
getTokenRangesByInstance(List<ReplicaInfo> writeReplicas,
+                                                                               
List<ReplicaMetadata> replicaMetadata)
     {
-        GossipInfoResponse currentGossipInfo = gossipInfo;
-        if (!forceRefresh && currentGossipInfo != null)
-        {
-            return currentGossipInfo;
-        }
-
-        synchronized (this)
+        Multimap<RingInstance, Range<BigInteger>> instanceToRangeMap = 
ArrayListMultimap.create();
+        for (ReplicaInfo rInfo : writeReplicas)
         {
-            if (forceRefresh || gossipInfo == null)
+            Range<BigInteger> range = Range.openClosed(new 
BigInteger(rInfo.start()), new BigInteger(rInfo.end()));
+            for (Map.Entry<String, List<String>> dcReplicaEntry : 
rInfo.replicasByDatacenter().entrySet())
             {
-                try
-                {
-                    gossipInfo = 
cassandraContext.getSidecarClient().gossipInfo().get(conf.getHttpResponseTimeoutMs(),
-                                                                               
       TimeUnit.MILLISECONDS);
-                }
-                catch (ExecutionException | InterruptedException exception)
-                {
-                    LOGGER.error("Failed to retrieve gossip information");
-                    throw new RuntimeException("Failed to retrieve gossip 
information", exception);
-                }
-                catch (TimeoutException exception)
-                {
-                    Thread.currentThread().interrupt();
-                    throw new RuntimeException("Failed to retrieve gossip 
information", exception);
-                }
+                // For each writeReplica, get metadata and update map to 
include range
+                dcReplicaEntry.getValue().forEach(ipAddress -> {
+                    // Get metadata for this IP; Create RingInstance
+                    // TODO: Temporary change to extract IP from 'ip:port' 
string. THis will go oway once
+                    // corresponding change in sidecar is merged.

Review Comment:
   Is the todo resolved now?



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java:
##########
@@ -339,140 +336,190 @@ public String getLowestCassandraVersion()
         return cassandraVersion;
     }
 
-    public String getVersionFromFeature()
+    @Override
+    public Map<RingInstance, InstanceAvailability> getInstanceAvailability()
     {
-        return null;
+        TokenRangeMapping<RingInstance> mapping = getTokenRangeMapping(true);
+        Map<RingInstance, InstanceAvailability> result =
+        mapping.getReplicaMetadata()
+               .stream()
+               .map(RingInstance::new)
+               .collect(Collectors.toMap(Function.identity(), 
this::determineInstanceAvailability));
+
+        if (LOGGER.isDebugEnabled())
+        {
+            result.forEach((inst, avail) -> LOGGER.debug("Instance {} has 
availability {}", inst, avail));
+        }
+        return result;
     }
 
-    protected List<NodeSettings> getAllNodeSettings()
+    private InstanceAvailability determineInstanceAvailability(RingInstance 
instance)
     {
-        List<NodeSettings> allNodeSettings = 
FutureUtils.bestEffortGet(allNodeSettingFutures,
-                                                                       
conf.getSidecarRequestMaxRetryDelayInSeconds(),
-                                                                       
TimeUnit.SECONDS);
-
-        if (allNodeSettings.isEmpty())
+        if (!instanceIsUp(instance.getRingInstance()))
         {
-            throw new RuntimeException(String.format("Unable to determine the 
node settings. 0/%d instances available.",
-                                                     
allNodeSettingFutures.size()));
+            return InstanceAvailability.UNAVAILABLE_DOWN;
         }
-        else if (allNodeSettings.size() < allNodeSettingFutures.size())
+        if (instanceIsBlocked(instance))
         {
-            LOGGER.warn("{}/{} instances were used to determine the node 
settings",
-                        allNodeSettings.size(), allNodeSettingFutures.size());
+            return InstanceAvailability.UNAVAILABLE_BLOCKED;
         }
-
-        return allNodeSettings;
-    }
-
-    public String getVersionFromSidecar()
-    {
-        NodeSettings nodeSettings = this.nodeSettings.get();
-        if (nodeSettings != null)
+        if (instanceIsNormal(instance.getRingInstance()) ||
+            instanceIsTransitioning(instance.getRingInstance()) ||
+            instanceIsBeingReplaced(instance.getRingInstance()))
         {
-            return nodeSettings.releaseVersion();
+            return InstanceAvailability.AVAILABLE;
         }
 
-        return getLowestVersion(getAllNodeSettings());
+        LOGGER.info("No valid state found for instance {}", instance);
+        // If it's not one of the above, it's inherently INVALID.
+        return InstanceAvailability.INVALID_STATE;
     }
 
-    protected RingResponse getRingResponse()
+    private TokenRangeMapping<RingInstance> getTokenRangeReplicas()
     {
-        RingResponse currentRingResponse = ringResponse;
-        if (currentRingResponse != null)
+        Map<String, Set<String>> writeReplicasByDC;
+        Map<String, Set<String>> pendingReplicasByDC;
+        List<ReplicaMetadata> replicaMetadata;
+        Set<RingInstance> blockedInstances;
+        Set<RingInstance> replacementInstances;
+        Multimap<RingInstance, Range<BigInteger>> tokenRangesByInstance;
+        try
         {
-            return currentRingResponse;
-        }
+            TokenRangeReplicasResponse response = 
getTokenRangesAndReplicaSets();
+            replicaMetadata = response.replicaMetadata();
 
-        synchronized (this)
-        {
-            if (ringResponse == null)
+            tokenRangesByInstance = 
getTokenRangesByInstance(response.writeReplicas(), response.replicaMetadata());
+            LOGGER.info("Retrieved token ranges for {} instances from write 
replica set ",
+                        tokenRangesByInstance.size());
+
+            replacementInstances = response.replicaMetadata()
+                                           .stream()
+                                           .filter(m -> 
m.state().equalsIgnoreCase(InstanceState.REPLACING.toString()))
+                                           .map(RingInstance::new)
+                                           .collect(Collectors.toSet());
+
+            blockedInstances = response.replicaMetadata().stream()
+                                       .map(RingInstance::new)
+                                       .filter(this::instanceIsBlocked)
+                                       .collect(Collectors.toSet());
+
+            Set<String> blockedIps = blockedInstances.stream().map(i -> 
i.getRingInstance().address())
+                                                     
.collect(Collectors.toSet());
+
+            // Each token range has hosts by DC. We collate them across all 
ranges into all hosts by DC
+            writeReplicasByDC = response.writeReplicas()
+                                        .stream()
+                                        .flatMap(wr -> 
wr.replicasByDatacenter().entrySet().stream())
+                                        
.collect(Collectors.toMap(Map.Entry::getKey, e -> new HashSet<>(e.getValue()),
+                                                                  (l1, l2) -> 
filterAndMergeInstances(l1, l2, blockedIps)));
+
+            pendingReplicasByDC = getPendingReplicas(response, 
writeReplicasByDC);
+
+            if (LOGGER.isDebugEnabled())
             {
-                try
-                {
-                    ringResponse = getCurrentRingResponse();
-                }
-                catch (Exception exception)
-                {
-                    LOGGER.error("Failed to load Cassandra ring", exception);
-                    throw new RuntimeException(exception);
-                }
+                LOGGER.debug("Fetched token-ranges with dcs={}, 
write_replica_count={}, pending_replica_count={}",
+                             writeReplicasByDC.keySet(),
+                             
writeReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size(),
+                             
pendingReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size());
             }
-            return ringResponse;
         }
-    }
+        catch (ExecutionException | InterruptedException exception)
+        {
+            LOGGER.error("Failed to get token ranges, ", exception);
+            throw new RuntimeException(exception);
+        }
 
-    private RingResponse getCurrentRingResponse() throws Exception
-    {
-        return 
getCassandraContext().getSidecarClient().ring(conf.keyspace).get();
+        // Include availability info so CL checks can use it to exclude 
replacement hosts
+        return new TokenRangeMapping<>(getPartitioner(),
+                                       getReplicationFactor(),
+                                       writeReplicasByDC,
+                                       pendingReplicasByDC,
+                                       tokenRangesByInstance,
+                                       replicaMetadata,
+                                       blockedInstances,
+                                       replacementInstances);
     }
 
-    private static List<RingInstance> getSerializableInstances(RingResponse 
ringResponse)
+    private Set<String> filterAndMergeInstances(Set<String> instancesList1, 
Set<String> instancesList2, Set<String> blockedIPs)
     {
-        return ringResponse.stream()
-                           .map(RingInstance::new)
-                           .collect(Collectors.toList());
+        Set<String> merged = new HashSet<>();
+        // Removes blocked instances. If this is included, remove 
blockedInstances from CL checks
+        merged.addAll(instancesList1.stream().filter(i -> 
!blockedIPs.contains(i)).collect(Collectors.toSet()));
+        merged.addAll(instancesList2.stream().filter(i -> 
!blockedIPs.contains(i)).collect(Collectors.toSet()));
+
+        return merged;
     }
 
-    private static RingInstance getCasInstanceMethodsImpl(RingEntry ringEntry)
+    // Pending replicas are currently calculated by extracting the 
non-read-replicas from the write-replica-set
+    // This will be replaced by the instance state metadata when it is 
supported by the token-ranges API
+    private Map<String, Set<String>> 
getPendingReplicas(TokenRangeReplicasResponse response, Map<String, 
Set<String>> writeReplicasByDC)
     {
-        return new RingInstance(ringEntry);
+        Set<String> readReplicas = 
readReplicasFromTokenRangeResponse(response);
+        return writeReplicasByDC.entrySet()
+                                .stream()
+                                .filter(entry -> 
entry.getValue().stream().noneMatch(readReplicas::contains))

Review Comment:
   If my understanding is correct, the method is to find the pending nodes. If 
so, the filter to only include that list of instances (instances in a DC) that 
none is contained in `readReplicas` always returns `false`. The predicate feels 
wrong to me.



##########
cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/expansion/JoiningBaseTest.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics.expansion;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+
+import com.datastax.driver.core.ConsistencyLevel;
+import o.a.c.analytics.sidecar.shaded.testing.common.data.QualifiedTableName;
+import org.apache.cassandra.analytics.ResiliencyTestBase;
+import org.apache.cassandra.analytics.TestTokenSupplier;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+
+import static junit.framework.TestCase.assertNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class JoiningBaseTest extends ResiliencyTestBase

Review Comment:
   should it be abstract class?



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java:
##########
@@ -175,30 +324,31 @@ public void writeRow(Map<String, Object> valueMap,
         }
     }
 
-    void checkBatchSize(StreamSession streamSession, int partitionId, JobInfo 
job) throws IOException
+    /**
+     * Stream to replicas; if batchSize is reached, "finalize" SST to 
"schedule" streamSession
+     */
+    private void checkBatchSize(final StreamSession streamSession, final int 
partitionId, final JobInfo job) throws IOException

Review Comment:
   drop unnecessary `final` in the method parameters.



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java:
##########
@@ -136,35 +181,139 @@ public StreamResult write(Iterator<Tuple2<DecoratedKey, 
Object[]>> sourceIterato
         }
     }
 
+    private Map<Range<BigInteger>, List<RingInstance>> 
taskTokenRangeMapping(TokenRangeMapping<RingInstance> tokenRange,
+                                                                             
Range<BigInteger> taskTokenRange)
+    {
+        return tokenRange.getSubRanges(taskTokenRange).asMapOfRanges();
+    }
+
+    private Set<RingInstance> instancesFromMapping(Map<Range<BigInteger>, 
List<RingInstance>> mapping)
+    {
+        return mapping.values()
+                      .stream()
+                      .flatMap(Collection::stream)
+                      .collect(Collectors.toSet());
+    }
+
+    /**
+     * Creates a new session if we have the current token range intersecting 
the ranges from write replica-set.
+     * If we do find the need to split a range into sub-ranges, we create the 
corresponding session for the sub-range
+     * if the token from the row data belongs to the range.
+     */
+    private StreamSession maybeCreateStreamSession(TaskContext taskContext,
+                                                   StreamSession streamSession,
+                                                   Tuple2<DecoratedKey, 
Object[]> rowData,
+                                                   Set<Range<BigInteger>> 
subRanges,
+                                                   
ReplicaAwareFailureHandler<RingInstance> failureHandler,
+                                                   List<StreamResult> results)
+    throws IOException, ExecutionException, InterruptedException
+    {
+        BigInteger token = rowData._1().getToken();
+        Range<BigInteger> tokenRange = getTokenRange(taskContext);
+
+        Preconditions.checkState(tokenRange.contains(token),
+                                 String.format("Received Token %s outside of 
expected range %s", token, tokenRange));
+
+        // We have split ranges likely resulting from pending nodes
+        // Evaluate creating a new session if the token from current row is 
part of a sub-range
+        if (subRanges.size() > 1)
+        {
+            // Create session using sub-range that contains the token from 
current row
+            Range<BigInteger> matchingSubRange = subRanges.stream().filter(r 
-> r.contains(token)).findFirst().get();
+            Preconditions.checkState(matchingSubRange != null,
+                                     String.format("Received Token %s outside 
of expected range %s", token, matchingSubRange));
+            streamSession = maybeCreateSubRangeSession(taskContext, 
streamSession, failureHandler, results, matchingSubRange);
+        }
+
+        // If we do not have any stream session at this point, we create a 
session using the partition's token range
+        return (streamSession == null) ? createStreamSession(taskContext) : 
streamSession;
+    }
+
+    /**
+     * Given that the token belongs to a sub-range, creates a new stream 
session if either
+     * 1) we do not have an existing stream session, or 2) the existing stream 
session corresponds to a range that
+     * does NOT match the sub-range the token belongs to.
+     */
+    private StreamSession maybeCreateSubRangeSession(TaskContext taskContext,
+                                                     StreamSession 
streamSession,
+                                                     
ReplicaAwareFailureHandler<RingInstance> failureHandler,
+                                                     List<StreamResult> 
results,
+                                                     Range<BigInteger> 
matchingSubRange)
+    throws IOException, ExecutionException, InterruptedException
+    {
+        if (streamSession == null || streamSession.getTokenRange() != 
matchingSubRange)
+        {
+            LOGGER.debug("[{}] Creating stream session for range: {}", 
taskContext.partitionId(), matchingSubRange);
+            // Schedule data to be sent if we are processing a batch that has 
not been scheduled yet.
+            if (streamSession != null)
+            {
+                // Complete existing batched writes (if any) before the 
existing stream session is closed
+                if (batchSize != 0)
+                {
+                    finalizeSSTable(streamSession, taskContext.partitionId(), 
sstableWriter, batchNumber, batchSize);
+                    sstableWriter = null;
+                    batchSize = 0;
+                }
+                results.add(streamSession.close());
+            }
+            streamSession = new StreamSession(writerContext, 
getStreamId(taskContext), matchingSubRange, failureHandler);
+        }
+        return streamSession;
+    }
+
+    /**
+     * Get ranges from the set that intersect and/or overlap with the provided 
token range
+     */
+    private Set<Range<BigInteger>> 
getIntersectingSubRanges(Set<Range<BigInteger>> ranges, Range<BigInteger> 
tokenRange)
+    {
+        return ranges.stream()
+                     .filter(r -> r.isConnected(tokenRange) && 
!r.intersection(tokenRange).isEmpty())
+                     .collect(Collectors.toSet());
+    }
+
+    private boolean 
haveTokenRangeMappingsChanged(TokenRangeMapping<RingInstance> startTaskMapping, 
TaskContext taskContext)
+    {
+        Range<BigInteger> taskTokenRange = getTokenRange(taskContext);
+        // Get the uncached, current view of the ring to compare with initial 
ring
+        TokenRangeMapping<RingInstance> endTaskMapping = 
writerContext.cluster().getTokenRangeMapping(false);
+        Map<Range<BigInteger>, List<RingInstance>> startMapping = 
taskTokenRangeMapping(startTaskMapping, taskTokenRange);
+        Map<Range<BigInteger>, List<RingInstance>> endMapping = 
taskTokenRangeMapping(endTaskMapping, taskTokenRange);
+
+        // Token ranges are identical and overall instance list is same
+        return !(startMapping.keySet().equals(endMapping.keySet()) &&
+               
instancesFromMapping(startMapping).equals(instancesFromMapping(endMapping)));
+    }
+
     private void validateAcceptableTimeSkewOrThrow(List<RingInstance> replicas)
     {
-        TimeSkewResponse timeSkewResponse = 
writerContext.cluster().getTimeSkew(replicas);
-        Instant localNow = Instant.now();
-        Instant remoteNow = Instant.ofEpochMilli(timeSkewResponse.currentTime);
-        Duration range = 
Duration.ofMinutes(timeSkewResponse.allowableSkewInMinutes);
-        if (localNow.isBefore(remoteNow.minus(range)) || 
localNow.isAfter(remoteNow.plus(range)))
+        if (!replicas.isEmpty())
         {
-            String message = String.format("Time skew between Spark and 
Cassandra is too large. "
-                                           + "Allowable skew is %d minutes. "
-                                           + "Spark executor time is %s, 
Cassandra instance time is %s",
-                                           
timeSkewResponse.allowableSkewInMinutes, localNow, remoteNow);
-            throw new UnsupportedOperationException(message);
+            TimeSkewResponse timeSkewResponse = 
writerContext.cluster().getTimeSkew(replicas);
+            Instant localNow = Instant.now();
+            Instant remoteNow = 
Instant.ofEpochMilli(timeSkewResponse.currentTime);
+            Duration range = 
Duration.ofMinutes(timeSkewResponse.allowableSkewInMinutes);
+            if (localNow.isBefore(remoteNow.minus(range)) || 
localNow.isAfter(remoteNow.plus(range)))
+            {
+                final String message = String.format("Time skew between Spark 
and Cassandra is too large. "
+                                                     + "Allowable skew is %d 
minutes. Spark executor time is %s, Cassandra instance time is %s",
+                                                     
timeSkewResponse.allowableSkewInMinutes, localNow, remoteNow);
+                throw new UnsupportedOperationException(message);
+            }

Review Comment:
   looks like it only adds the check for whether `replicas` is empty. The 
following has less indentation. 
   
   ```java
       private void validateAcceptableTimeSkewOrThrow(List<RingInstance> 
replicas)
       {
           if (replicas.isEmpty())
           {
               return;
           }
           // original logic
       }
   ```



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java:
##########
@@ -110,20 +133,42 @@ public StreamResult write(Iterator<Tuple2<DecoratedKey, 
Object[]>> sourceIterato
         Map<String, Object> valueMap = new HashMap<>();
         try
         {
+            Set<Range<BigInteger>> newRanges = 
initialTokenRangeMapping.getRangeMap().asMapOfRanges().entrySet()
+                                                                       
.stream()
+                                                                       
.map(Map.Entry::getKey)
+                                                                       
.collect(Collectors.toSet());

Review Comment:
   nit: this is simpler to get a copy of the key set
   
   ```suggestion
               Set<Range<BigInteger>> newRanges = new 
HashSet<>(initialTokenRangeMapping.getRangeMap().asMapOfRanges().keySet());
   ```



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java:
##########
@@ -339,140 +336,190 @@ public String getLowestCassandraVersion()
         return cassandraVersion;
     }
 
-    public String getVersionFromFeature()
+    @Override
+    public Map<RingInstance, InstanceAvailability> getInstanceAvailability()
     {
-        return null;
+        TokenRangeMapping<RingInstance> mapping = getTokenRangeMapping(true);
+        Map<RingInstance, InstanceAvailability> result =
+        mapping.getReplicaMetadata()
+               .stream()
+               .map(RingInstance::new)
+               .collect(Collectors.toMap(Function.identity(), 
this::determineInstanceAvailability));
+
+        if (LOGGER.isDebugEnabled())
+        {
+            result.forEach((inst, avail) -> LOGGER.debug("Instance {} has 
availability {}", inst, avail));
+        }
+        return result;
     }
 
-    protected List<NodeSettings> getAllNodeSettings()
+    private InstanceAvailability determineInstanceAvailability(RingInstance 
instance)
     {
-        List<NodeSettings> allNodeSettings = 
FutureUtils.bestEffortGet(allNodeSettingFutures,
-                                                                       
conf.getSidecarRequestMaxRetryDelayInSeconds(),
-                                                                       
TimeUnit.SECONDS);
-
-        if (allNodeSettings.isEmpty())
+        if (!instanceIsUp(instance.getRingInstance()))
         {
-            throw new RuntimeException(String.format("Unable to determine the 
node settings. 0/%d instances available.",
-                                                     
allNodeSettingFutures.size()));
+            return InstanceAvailability.UNAVAILABLE_DOWN;
         }
-        else if (allNodeSettings.size() < allNodeSettingFutures.size())
+        if (instanceIsBlocked(instance))
         {
-            LOGGER.warn("{}/{} instances were used to determine the node 
settings",
-                        allNodeSettings.size(), allNodeSettingFutures.size());
+            return InstanceAvailability.UNAVAILABLE_BLOCKED;
         }
-
-        return allNodeSettings;
-    }
-
-    public String getVersionFromSidecar()
-    {
-        NodeSettings nodeSettings = this.nodeSettings.get();
-        if (nodeSettings != null)
+        if (instanceIsNormal(instance.getRingInstance()) ||
+            instanceIsTransitioning(instance.getRingInstance()) ||
+            instanceIsBeingReplaced(instance.getRingInstance()))
         {
-            return nodeSettings.releaseVersion();
+            return InstanceAvailability.AVAILABLE;
         }
 
-        return getLowestVersion(getAllNodeSettings());
+        LOGGER.info("No valid state found for instance {}", instance);
+        // If it's not one of the above, it's inherently INVALID.
+        return InstanceAvailability.INVALID_STATE;
     }
 
-    protected RingResponse getRingResponse()
+    private TokenRangeMapping<RingInstance> getTokenRangeReplicas()
     {
-        RingResponse currentRingResponse = ringResponse;
-        if (currentRingResponse != null)
+        Map<String, Set<String>> writeReplicasByDC;
+        Map<String, Set<String>> pendingReplicasByDC;
+        List<ReplicaMetadata> replicaMetadata;
+        Set<RingInstance> blockedInstances;
+        Set<RingInstance> replacementInstances;
+        Multimap<RingInstance, Range<BigInteger>> tokenRangesByInstance;
+        try
         {
-            return currentRingResponse;
-        }
+            TokenRangeReplicasResponse response = 
getTokenRangesAndReplicaSets();
+            replicaMetadata = response.replicaMetadata();
 
-        synchronized (this)
-        {
-            if (ringResponse == null)
+            tokenRangesByInstance = 
getTokenRangesByInstance(response.writeReplicas(), response.replicaMetadata());
+            LOGGER.info("Retrieved token ranges for {} instances from write 
replica set ",
+                        tokenRangesByInstance.size());
+
+            replacementInstances = response.replicaMetadata()
+                                           .stream()
+                                           .filter(m -> 
m.state().equalsIgnoreCase(InstanceState.REPLACING.toString()))
+                                           .map(RingInstance::new)
+                                           .collect(Collectors.toSet());
+
+            blockedInstances = response.replicaMetadata().stream()
+                                       .map(RingInstance::new)
+                                       .filter(this::instanceIsBlocked)
+                                       .collect(Collectors.toSet());
+
+            Set<String> blockedIps = blockedInstances.stream().map(i -> 
i.getRingInstance().address())
+                                                     
.collect(Collectors.toSet());
+
+            // Each token range has hosts by DC. We collate them across all 
ranges into all hosts by DC
+            writeReplicasByDC = response.writeReplicas()
+                                        .stream()
+                                        .flatMap(wr -> 
wr.replicasByDatacenter().entrySet().stream())
+                                        
.collect(Collectors.toMap(Map.Entry::getKey, e -> new HashSet<>(e.getValue()),
+                                                                  (l1, l2) -> 
filterAndMergeInstances(l1, l2, blockedIps)));
+
+            pendingReplicasByDC = getPendingReplicas(response, 
writeReplicasByDC);
+
+            if (LOGGER.isDebugEnabled())
             {
-                try
-                {
-                    ringResponse = getCurrentRingResponse();
-                }
-                catch (Exception exception)
-                {
-                    LOGGER.error("Failed to load Cassandra ring", exception);
-                    throw new RuntimeException(exception);
-                }
+                LOGGER.debug("Fetched token-ranges with dcs={}, 
write_replica_count={}, pending_replica_count={}",
+                             writeReplicasByDC.keySet(),
+                             
writeReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size(),
+                             
pendingReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size());
             }
-            return ringResponse;
         }
-    }
+        catch (ExecutionException | InterruptedException exception)
+        {
+            LOGGER.error("Failed to get token ranges, ", exception);
+            throw new RuntimeException(exception);
+        }
 
-    private RingResponse getCurrentRingResponse() throws Exception
-    {
-        return 
getCassandraContext().getSidecarClient().ring(conf.keyspace).get();
+        // Include availability info so CL checks can use it to exclude 
replacement hosts
+        return new TokenRangeMapping<>(getPartitioner(),
+                                       getReplicationFactor(),
+                                       writeReplicasByDC,
+                                       pendingReplicasByDC,
+                                       tokenRangesByInstance,
+                                       replicaMetadata,
+                                       blockedInstances,
+                                       replacementInstances);
     }
 
-    private static List<RingInstance> getSerializableInstances(RingResponse 
ringResponse)
+    private Set<String> filterAndMergeInstances(Set<String> instancesList1, 
Set<String> instancesList2, Set<String> blockedIPs)
     {
-        return ringResponse.stream()
-                           .map(RingInstance::new)
-                           .collect(Collectors.toList());
+        Set<String> merged = new HashSet<>();
+        // Removes blocked instances. If this is included, remove 
blockedInstances from CL checks
+        merged.addAll(instancesList1.stream().filter(i -> 
!blockedIPs.contains(i)).collect(Collectors.toSet()));
+        merged.addAll(instancesList2.stream().filter(i -> 
!blockedIPs.contains(i)).collect(Collectors.toSet()));
+
+        return merged;
     }
 
-    private static RingInstance getCasInstanceMethodsImpl(RingEntry ringEntry)
+    // Pending replicas are currently calculated by extracting the 
non-read-replicas from the write-replica-set
+    // This will be replaced by the instance state metadata when it is 
supported by the token-ranges API
+    private Map<String, Set<String>> 
getPendingReplicas(TokenRangeReplicasResponse response, Map<String, 
Set<String>> writeReplicasByDC)
     {
-        return new RingInstance(ringEntry);
+        Set<String> readReplicas = 
readReplicasFromTokenRangeResponse(response);
+        return writeReplicasByDC.entrySet()
+                                .stream()
+                                .filter(entry -> 
entry.getValue().stream().noneMatch(readReplicas::contains))
+                                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
     }
 
-    protected GossipInfoResponse getGossipInfo(boolean forceRefresh)
+    private Multimap<RingInstance, Range<BigInteger>> 
getTokenRangesByInstance(List<ReplicaInfo> writeReplicas,
+                                                                               
List<ReplicaMetadata> replicaMetadata)
     {
-        GossipInfoResponse currentGossipInfo = gossipInfo;
-        if (!forceRefresh && currentGossipInfo != null)
-        {
-            return currentGossipInfo;
-        }
-
-        synchronized (this)
+        Multimap<RingInstance, Range<BigInteger>> instanceToRangeMap = 
ArrayListMultimap.create();
+        for (ReplicaInfo rInfo : writeReplicas)
         {
-            if (forceRefresh || gossipInfo == null)
+            Range<BigInteger> range = Range.openClosed(new 
BigInteger(rInfo.start()), new BigInteger(rInfo.end()));
+            for (Map.Entry<String, List<String>> dcReplicaEntry : 
rInfo.replicasByDatacenter().entrySet())
             {
-                try
-                {
-                    gossipInfo = 
cassandraContext.getSidecarClient().gossipInfo().get(conf.getHttpResponseTimeoutMs(),
-                                                                               
       TimeUnit.MILLISECONDS);
-                }
-                catch (ExecutionException | InterruptedException exception)
-                {
-                    LOGGER.error("Failed to retrieve gossip information");
-                    throw new RuntimeException("Failed to retrieve gossip 
information", exception);
-                }
-                catch (TimeoutException exception)
-                {
-                    Thread.currentThread().interrupt();
-                    throw new RuntimeException("Failed to retrieve gossip 
information", exception);
-                }
+                // For each writeReplica, get metadata and update map to 
include range
+                dcReplicaEntry.getValue().forEach(ipAddress -> {
+                    // Get metadata for this IP; Create RingInstance
+                    // TODO: Temporary change to extract IP from 'ip:port' 
string. THis will go oway once
+                    // corresponding change in sidecar is merged.
+                    ReplicaMetadata replica = replicaMetadata.stream()
+                                                             .filter(r ->
+                                                                     
r.address().equals(ipAddress.split(":")[0]))
+                                                             
.findFirst().get();

Review Comment:
   can you handle the potential NoSuchElementException from `get()` gracefully?



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamSession.java:
##########
@@ -145,32 +157,27 @@ private List<CommitResult> commit(StreamResult 
streamResult) throws ExecutionExc
     @VisibleForTesting
     List<RingInstance> getReplicas()
     {
-        Map<Range<BigInteger>, List<RingInstance>> overlappingRanges = 
ring.getSubRanges(tokenRange).asMapOfRanges();
+        List<RingInstance> exclusions = failureHandler.getFailedInstances();
+        final Map<Range<BigInteger>, List<RingInstance>> overlappingRanges = 
tokenRangeMapping.getSubRanges(tokenRange).asMapOfRanges();
 
-        Preconditions.checkState(overlappingRanges.keySet().size() == 1,
-                                 String.format("Partition range %s is mapping 
more than one range %s",
-                                               tokenRange, overlappingRanges));
+        LOGGER.debug("[{}]: Stream session token range: {} overlaps with ring 
ranges: {}", sessionID, tokenRange, overlappingRanges);
+        Preconditions.checkState(!tokenRange.isEmpty(),

Review Comment:
   maybe move this check before getting the `overlappingRanges`, so that 
`tokenRange` is validated before using. 
   Or.. do you mean to check `overlappingRanges` that is not empty?



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TokenPartitioner.java:
##########
@@ -215,19 +221,19 @@ private void readObject(ObjectInputStream in) throws 
ClassNotFoundException, IOE
 
     // In order to best utilize the number of Spark cores while minimizing the 
number of commit calls,
     // we calculate the number of splits that will just match or exceed the 
total number of available Spark cores.
-    // NOTE: The actual number of partitions that result from this should 
always be at least
-    //       the number of token ranges times the number of splits, but can be 
slightly more.
-    public int calculateSplits(CassandraRing<RingInstance> ring,
+    // Note that the actual number of partitions that result from this should 
always be at least the number of token ranges * the number of splits,
+    // but can be slightly more.
+    public int calculateSplits(TokenRangeMapping<RingInstance> 
tokenRangeMapping,
                                Integer numberSplits,
                                int defaultParallelism,
                                Integer cores)
     {
-        if (numberSplits >= 0)
+        if (numberSplits != -1)
         {
             return numberSplits;
         }
-        int tasksToRun = Math.max(cores, defaultParallelism);
-        Map<Range<BigInteger>, List<RingInstance>> rangeListMap = 
ring.getRangeMap().asMapOfRanges();
+        final int tasksToRun = Math.max(cores, defaultParallelism);
+        final Map<Range<BigInteger>, List<RingInstance>> rangeListMap = 
tokenRangeMapping.getRangeMap().asMapOfRanges();

Review Comment:
   drop the `final`



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TokenPartitioner.java:
##########
@@ -138,15 +143,15 @@ private void validate()
     private void validateRangesDoNotOverlap()
     {
         List<Range<BigInteger>> sortedRanges = 
partitionMap.asMapOfRanges().keySet().stream()
-                .sorted(Comparator.comparing(Range::lowerEndpoint))
-                .collect(Collectors.toList());
+                                                           
.sorted(Comparator.comparing(Range::lowerEndpoint))
+                                                           
.collect(Collectors.toList());
         Range<BigInteger> previous = null;
         for (Range<BigInteger> current : sortedRanges)
         {
             if (previous != null)
             {
                 Preconditions.checkState(!current.isConnected(previous) || 
current.intersection(previous).isEmpty(),
-                        String.format("Two ranges in partition map are 
overlapping %s %s", previous, current));
+                                         String.format("Two ranges in 
partition map are overlapping %s %s", previous, current));

Review Comment:
   since this line is changed.. You can use the other checkState api to lazily 
evaluate the error message only when the check fails. 
   
   ```suggestion
                                            "Two ranges in partition map are 
overlapping %s %s", previous, current);
   ```



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ConsistencyLevel.java:
##########
@@ -59,42 +87,34 @@ public boolean isLocal()
             }
 
             @Override
-            public boolean checkConsistency(Collection<? extends 
CassandraInstance> failedInsts,
-                                            ReplicationFactor 
replicationFactor,
+            public boolean checkConsistency(Set<String> writeReplicas,
+                                            Set<String> pendingReplicas,
+                                            Set<String> replacementInstances,
+                                            Set<String> blockedInstances,
+                                            Set<String> failedInstanceIps,
                                             String localDC)
             {
-                
Preconditions.checkArgument(replicationFactor.getReplicationStrategy() != 
ReplicationFactor.ReplicationStrategy.SimpleStrategy,
-                                            "EACH_QUORUM doesn't make sense 
for SimpleStrategy keyspaces");
-
-                for (String datacenter : 
replicationFactor.getOptions().keySet())
-                {
-                    int rf = replicationFactor.getOptions().get(datacenter);
-                    if (failedInsts.stream()
-                                   .filter(instance -> 
instance.getDataCenter().matches(datacenter))
-                                   .count() > (rf - (rf / 2 + 1)))
-                    {
-                        return false;
-                    }
-                }
-
-                return true;
+                return (failedInstanceIps.size() + blockedInstances.size()) <= 
(writeReplicas.size() - (writeReplicas.size() / 2 + 1));

Review Comment:
   The implementation looks wrong to me. For EACH_QUORUM, the success condition 
is to meet quorum in each DC. The replication factor could vary per DC. 



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TokenPartitioner.java:
##########
@@ -215,19 +221,19 @@ private void readObject(ObjectInputStream in) throws 
ClassNotFoundException, IOE
 
     // In order to best utilize the number of Spark cores while minimizing the 
number of commit calls,
     // we calculate the number of splits that will just match or exceed the 
total number of available Spark cores.
-    // NOTE: The actual number of partitions that result from this should 
always be at least
-    //       the number of token ranges times the number of splits, but can be 
slightly more.
-    public int calculateSplits(CassandraRing<RingInstance> ring,
+    // Note that the actual number of partitions that result from this should 
always be at least the number of token ranges * the number of splits,
+    // but can be slightly more.
+    public int calculateSplits(TokenRangeMapping<RingInstance> 
tokenRangeMapping,
                                Integer numberSplits,
                                int defaultParallelism,
                                Integer cores)
     {
-        if (numberSplits >= 0)
+        if (numberSplits != -1)

Review Comment:
   This change is worrisome. There is no validation on the value of 
`numberSplits`. End user can provide a wrong value, say `-10`, the original 
code can handle it, but the changed code break. 



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java:
##########
@@ -94,6 +95,12 @@ public Object[] normalize(Object[] row)
     }
 
     public Object[] getKeyColumns(Object[] allColumns)
+    {
+        return getKeyColumns(allColumns, keyFieldPositions);
+    }
+
+    @NotNull
+    public static Object[] getKeyColumns(Object[] allColumns, List<Integer> 
keyFieldPositions)

Review Comment:
   What is the benefit of allow specifying the key field positions? Looks like 
the method is only used for testing via the `Tokenizer` constructor. If so, can 
you add the `@VisibleForTesting` annotation? 



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ConsistencyLevel.java:
##########
@@ -59,42 +87,34 @@ public boolean isLocal()
             }
 
             @Override
-            public boolean checkConsistency(Collection<? extends 
CassandraInstance> failedInsts,
-                                            ReplicationFactor 
replicationFactor,
+            public boolean checkConsistency(Set<String> writeReplicas,
+                                            Set<String> pendingReplicas,
+                                            Set<String> replacementInstances,
+                                            Set<String> blockedInstances,
+                                            Set<String> failedInstanceIps,
                                             String localDC)
             {
-                
Preconditions.checkArgument(replicationFactor.getReplicationStrategy() != 
ReplicationFactor.ReplicationStrategy.SimpleStrategy,
-                                            "EACH_QUORUM doesn't make sense 
for SimpleStrategy keyspaces");
-
-                for (String datacenter : 
replicationFactor.getOptions().keySet())
-                {
-                    int rf = replicationFactor.getOptions().get(datacenter);
-                    if (failedInsts.stream()
-                                   .filter(instance -> 
instance.getDataCenter().matches(datacenter))
-                                   .count() > (rf - (rf / 2 + 1)))
-                    {
-                        return false;
-                    }
-                }
-
-                return true;
+                return (failedInstanceIps.size() + blockedInstances.size()) <= 
(writeReplicas.size() - (writeReplicas.size() / 2 + 1));
             }
         },
         QUORUM
         {
+            // Keyspaces exist with RF 1 or 2

Review Comment:
   This comment does not make sense to be included in QUORUM.



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ReplicaAwareFailureHandler.java:
##########
@@ -71,64 +72,138 @@ public void addFailure(Range<BigInteger> tokenRange, 
Instance casInstance, Strin
         for (Map.Entry<Range<BigInteger>, Multimap<Instance, String>> entry : 
overlappingFailures.asMapOfRanges().entrySet())
         {
             Multimap<Instance, String> newErrorMap = 
ArrayListMultimap.create(entry.getValue());
-
             newErrorMap.put(casInstance, errMessage);
             mappingsToAdd.put(entry.getKey(), newErrorMap);
         }
         failedRangesMap.putAll(mappingsToAdd);
     }
 
-    public boolean hasFailed(ConsistencyLevel consistencyLevel, String localDC)
+    public List<Instance> getFailedInstances()
     {
-        return !getFailedEntries(consistencyLevel, localDC).isEmpty();
+        return failedRangesMap.asMapOfRanges().values()
+                              .stream()
+                              .map(Multimap::keySet)
+                              .flatMap(Collection::stream)
+                              .collect(Collectors.toList());
     }
 
-    @SuppressWarnings("unused")  // Convenience method can become useful in 
the future
-    public Collection<Range<BigInteger>> getFailedRanges(ConsistencyLevel 
consistencyLevel, String localDC)
+    /**
+     * Given the number of failed instances for each token range, validates if 
the consistency guarantees are maintained
+     * for the size of the ring and the consistency level.
+     *
+     * @return list of failed entries for token ranges that break consistency. 
This should ideally be empty for a
+     * successful operation.
+     */
+    public Collection<AbstractMap.SimpleEntry<Range<BigInteger>, 
Multimap<Instance, String>>>
+    getFailedEntries(TokenRangeMapping<? extends CassandraInstance> 
tokenRangeMapping,
+                     ConsistencyLevel cl,
+                     String localDC)
     {
-        return getFailedEntries(consistencyLevel, localDC).stream()
-                .map(AbstractMap.SimpleEntry::getKey)
-                .collect(Collectors.toList());
-    }
 
-    @SuppressWarnings("unused")  // Convenience method can become useful in 
the future
-    public Multimap<Instance, String> getFailedInstances(ConsistencyLevel 
consistencyLevel, String localDC)
-    {
-        Multimap<Instance, String> failedInstances = 
ArrayListMultimap.create();
-        getFailedEntries(consistencyLevel, localDC).stream()
-                .flatMap(failedMultiEntry -> 
failedMultiEntry.getValue().entries().stream())
-                .forEach(failedEntry -> 
failedInstances.put(failedEntry.getKey(), failedEntry.getValue()));
+        List<AbstractMap.SimpleEntry<Range<BigInteger>, Multimap<Instance, 
String>>> failedEntries =
+        new ArrayList<>();
+
+        for (Map.Entry<Range<BigInteger>, Multimap<Instance, String>> 
failedRangeEntry
+        : failedRangesMap.asMapOfRanges().entrySet())

Review Comment:
   it is unusual to break it into 2 lines..



##########
cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java:
##########
@@ -209,31 +222,49 @@ private InstancesConfig 
buildInstancesConfig(CassandraVersionProvider versionPro
             int nativeTransportPort = tryGetIntConfig(config, 
"native_transport_port", 9042);
             InetSocketAddress address = 
InetSocketAddress.createUnresolved(hostName,
                                                                            
nativeTransportPort);
-            CQLSessionProvider sessionProvider = new 
CQLSessionProvider(address, new NettyOptions());
+            addresses.add(address);
+        }
+        for (int i = 0; i < cluster.size(); i++)
+        {
+            IUpgradeableInstance instance = cluster.get(i + 1); // 1-based 
indexing to match node names;
+            IInstanceConfig config = instance.config();
+            String hostName = JMXUtil.getJmxHost(config);
+            int nativeTransportPort = tryGetIntConfig(config, 
"native_transport_port", 9042);
+            InetSocketAddress address = 
InetSocketAddress.createUnresolved(hostName,
+                                                                           
nativeTransportPort);
+            TemporaryCqlSessionProvider sessionProvider = new 
TemporaryCqlSessionProvider(address, new NettyOptions(), addresses);
             this.sessionProviders.add(sessionProvider);
-            JmxClient jmxClient = new JmxClient(hostName, config.jmxPort());
+            // The in-jvm dtest framework sometimes returns a cluster before 
all the jmx infrastructure is initialized.
+            // In these cases, we want to wait longer than the default 
retry/delay settings to connect.

Review Comment:
   👍  thanks for the comment



##########
cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/CassandraIntegrationTest.java:
##########
@@ -59,6 +59,13 @@
      */
     int numDcs() default 1;
 
+    /**
+     * This is only applied in context of multi-DC tests. Returns true if the 
keyspace is replicated
+     * across multiple DCs. Defaults to {@code true}
+     * @return whether the multi-DC test uses a cross-DC keyspace
+     */
+    boolean useCrossDcKeyspace() default true;

Review Comment:
   It is more flexible to define keyspaces in the test directly with some 
helper method, instead of declaring an option in the annotation. For example, 
you can define keyspace with varying number of DCs and RFs per test. 
   That being said. I am fine with the current code. 



##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenPartitionerTest.java:
##########
@@ -58,27 +58,34 @@ public void testOneSplit()
     @Test
     public void testTwoSplits()
     {
-        CassandraRing<RingInstance> ring = RingUtils.buildRing(0, "DC1", 
"test");
-        partitioner = new TokenPartitioner(ring, 2, 2, 1, false);
-        assertEquals(10, partitioner.numPartitions());
-        assertEquals(0, getPartitionForToken(new 
BigInteger("-4611686018427387905")));
-        assertEquals(1, getPartitionForToken(new 
BigInteger("-4611686018427387904")));
-        assertEquals(1, getPartitionForToken(-1));
-        assertEquals(2, getPartitionForToken(0));  // Single token range
-        assertEquals(3, getPartitionForToken(1));
-        assertEquals(3, getPartitionForToken(50));
-        assertEquals(4, getPartitionForToken(51000));
-        assertEquals(4, getPartitionForToken(51100));
-        assertEquals(5, getPartitionForToken(100001));
-        assertEquals(5, getPartitionForToken(100150));
-        assertEquals(5, getPartitionForToken(150000));
-        assertEquals(6, getPartitionForToken(150001));
-        assertEquals(6, getPartitionForToken(200000));
-        assertEquals(7, getPartitionForToken(200001));
-        assertEquals(7, getPartitionForToken(new 
BigInteger("4611686018427388003")));
-        assertEquals(7, getPartitionForToken(new 
BigInteger("4611686018427487903")));
-        assertEquals(8, getPartitionForToken(new 
BigInteger("4611686018427487904")));
-        assertEquals(9, getPartitionForToken(new 
BigInteger("9223372036854775807")));  // Single token range
+        final TokenRangeMapping<RingInstance> tokenRangeMapping = 
TokenRangeMappingUtils.buildTokenRangeMapping(0, ImmutableMap.of("DC1", 3), 3);

Review Comment:
   drop the final



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/model/CassandraInstance.java:
##########
@@ -26,4 +26,6 @@ public interface CassandraInstance
     String getNodeName();
 
     String getDataCenter();
+
+    String getIpAddress();

Review Comment:
   add javadoc for the new method. 
   It is fine to ignore the existing methods that are w/o javadoc. 



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ConsistencyLevel.java:
##########
@@ -59,42 +87,34 @@ public boolean isLocal()
             }
 
             @Override
-            public boolean checkConsistency(Collection<? extends 
CassandraInstance> failedInsts,
-                                            ReplicationFactor 
replicationFactor,
+            public boolean checkConsistency(Set<String> writeReplicas,
+                                            Set<String> pendingReplicas,
+                                            Set<String> replacementInstances,
+                                            Set<String> blockedInstances,
+                                            Set<String> failedInstanceIps,
                                             String localDC)
             {
-                
Preconditions.checkArgument(replicationFactor.getReplicationStrategy() != 
ReplicationFactor.ReplicationStrategy.SimpleStrategy,
-                                            "EACH_QUORUM doesn't make sense 
for SimpleStrategy keyspaces");
-
-                for (String datacenter : 
replicationFactor.getOptions().keySet())
-                {
-                    int rf = replicationFactor.getOptions().get(datacenter);
-                    if (failedInsts.stream()
-                                   .filter(instance -> 
instance.getDataCenter().matches(datacenter))
-                                   .count() > (rf - (rf / 2 + 1)))
-                    {
-                        return false;
-                    }
-                }
-
-                return true;
+                return (failedInstanceIps.size() + blockedInstances.size()) <= 
(writeReplicas.size() - (writeReplicas.size() / 2 + 1));

Review Comment:
   Looks like you moved the per DC check in 
`ReplicaAwareFailureHandler#validateConsistency`



##########
cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TemporaryCqlSessionProvider.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.testing;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.NettyOptions;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.DriverException;
+import com.datastax.driver.core.exceptions.DriverInternalError;
+import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;
+import com.datastax.driver.core.policies.ReconnectionPolicy;
+import org.apache.cassandra.sidecar.common.CQLSessionProvider;
+import org.jetbrains.annotations.Nullable;
+
+public class TemporaryCqlSessionProvider extends CQLSessionProvider
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(TemporaryCqlSessionProvider.class);
+    private Session localSession;
+    private final InetSocketAddress inet;
+    private final NettyOptions nettyOptions;
+    private final ReconnectionPolicy reconnectionPolicy;
+    private final List<InetSocketAddress> addresses = new ArrayList<>();
+
+    public TemporaryCqlSessionProvider(InetSocketAddress target, NettyOptions 
options, List<InetSocketAddress> addresses)
+    {
+        super(target, options);
+        inet = target;
+        nettyOptions = options;
+        reconnectionPolicy = new ExponentialReconnectionPolicy(100, 1000);
+        this.addresses.addAll(addresses);
+    }
+
+    @Override
+    public synchronized @Nullable Session localCql()

Review Comment:
   nit
   ```suggestion
       @Override @Nullable
       public synchronized Session localCql()
   ```



##########
cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java:
##########
@@ -209,31 +222,49 @@ private InstancesConfig 
buildInstancesConfig(CassandraVersionProvider versionPro
             int nativeTransportPort = tryGetIntConfig(config, 
"native_transport_port", 9042);
             InetSocketAddress address = 
InetSocketAddress.createUnresolved(hostName,
                                                                            
nativeTransportPort);
-            CQLSessionProvider sessionProvider = new 
CQLSessionProvider(address, new NettyOptions());
+            addresses.add(address);
+        }
+        for (int i = 0; i < cluster.size(); i++)
+        {
+            IUpgradeableInstance instance = cluster.get(i + 1); // 1-based 
indexing to match node names;
+            IInstanceConfig config = instance.config();
+            String hostName = JMXUtil.getJmxHost(config);
+            int nativeTransportPort = tryGetIntConfig(config, 
"native_transport_port", 9042);
+            InetSocketAddress address = 
InetSocketAddress.createUnresolved(hostName,
+                                                                           
nativeTransportPort);
+            TemporaryCqlSessionProvider sessionProvider = new 
TemporaryCqlSessionProvider(address, new NettyOptions(), addresses);
             this.sessionProviders.add(sessionProvider);
-            JmxClient jmxClient = new JmxClient(hostName, config.jmxPort());
+            // The in-jvm dtest framework sometimes returns a cluster before 
all the jmx infrastructure is initialized.
+            // In these cases, we want to wait longer than the default 
retry/delay settings to connect.

Review Comment:
   👍  thanks for the comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to