arjunashok commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1420981329


##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidator.java:
##########
@@ -23,103 +23,120 @@
 import java.util.AbstractMap;
 import java.util.Collection;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
 
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Range;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
+import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
 
-public class BulkWriteValidator implements AutoCloseable
+public class BulkWriteValidator
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(BulkWriteValidator.class);
 
     private final ReplicaAwareFailureHandler<RingInstance> failureHandler;
-    private final CassandraRingMonitor monitor;
     private final JobInfo job;
     private String phase = "Initializing";
 
     private final ClusterInfo cluster;
 
     public BulkWriteValidator(BulkWriterContext bulkWriterContext,
-                              Consumer<CancelJobEvent> cancelJobFunc) throws 
Exception
+                              ReplicaAwareFailureHandler<RingInstance> 
failureHandler)
     {
         cluster = bulkWriterContext.cluster();
         job = bulkWriterContext.job();
-        failureHandler = new 
ReplicaAwareFailureHandler<>(cluster.getRing(true));
-        monitor = new CassandraRingMonitor(cluster, cancelJobFunc, 1000, 
TimeUnit.MILLISECONDS);
+        this.failureHandler = failureHandler;
     }
 
-    public void setPhase(String phase)
-    {
-        this.phase = phase;
-    }
-
-    public String getPhase()
-    {
-        return phase;
-    }
-
-    public void validateInitialEnvironment()
-    {
-        validateCLOrFail();
-    }
-
-    public void close()
-    {
-        monitor.stop();
-    }
-
-    private void validateCLOrFail()
-    {
-        updateInstanceAvailability();
-        validateClOrFail(failureHandler, LOGGER, phase, job);
-    }
-
-    public static void 
validateClOrFail(ReplicaAwareFailureHandler<RingInstance> failureHandler,
+    public static void validateClOrFail(TokenRangeMapping<RingInstance> 
tokenRangeMapping,
+                                        
ReplicaAwareFailureHandler<RingInstance> failureHandler,
                                         Logger logger,
                                         String phase,
                                         JobInfo job)
     {
         Collection<AbstractMap.SimpleEntry<Range<BigInteger>, 
Multimap<RingInstance, String>>> failedRanges =
-                failureHandler.getFailedEntries(job.getConsistencyLevel(), 
job.getLocalDC());
+        failureHandler.getFailedEntries(tokenRangeMapping, 
job.getConsistencyLevel(), job.getLocalDC());
+
         if (failedRanges.isEmpty())
         {
             logger.info("Succeeded {} with {}", phase, 
job.getConsistencyLevel());
         }
         else
         {
-            String message = String.format("Failed to load %s ranges with %s 
for job %s in phase %s",
+            String message = String.format("Failed to load %s ranges with %s 
for job %s in phase %s.",
                                            failedRanges.size(), 
job.getConsistencyLevel(), job.getId(), phase);
             logger.error(message);
-            failedRanges.forEach(failedRange -> 
failedRange.getValue().keySet().forEach(instance ->
-                    logger.error("Failed {} for {} on {}", phase, 
failedRange.getKey(), instance.toString())));
+            failedRanges.forEach(failedRange ->
+                                 failedRange.getValue()
+                                            .keySet()
+                                            .forEach(instance ->
+                                                     logger.error("Failed {} 
for {} on {}",
+                                                                  phase,
+                                                                  
failedRange.getKey(),
+                                                                  
instance.toString())));
             throw new RuntimeException(message);
         }
     }
 
-    public static void updateFailureHandler(CommitResult commitResult,
-                                            String phase,
+    public String getPhase()
+    {
+        return phase;
+    }
+
+    public void setPhase(String phase)
+    {
+        this.phase = phase;
+    }
+
+    public static void updateFailureHandler(CommitResult commitResult, String 
phase,
                                             
ReplicaAwareFailureHandler<RingInstance> failureHandler)
     {
         LOGGER.debug("Commit Result: {}", commitResult);
         commitResult.failures.forEach((uuid, err) -> {
             LOGGER.warn("[{}]: {} failed on {} with message {}",
-                    uuid, phase, commitResult.instance.getNodeName(), 
err.errMsg);
+                        uuid,
+                        phase,
+                        commitResult.instance.getNodeName(),
+                        err.errMsg);
             failureHandler.addFailure(err.tokenRange, commitResult.instance, 
err.errMsg);
         });
     }
 
+    public void validateClOrFail(TokenRangeMapping<RingInstance> 
tokenRangeMapping)
+    {
+        // Updates failures by looking up instance metadata
+        updateInstanceAvailability();
+        // Fails if the failures violate consistency requirements
+        validateClOrFail(tokenRangeMapping, failureHandler, LOGGER, phase, 
job);
+    }
+
     private void updateInstanceAvailability()
     {
         cluster.refreshClusterInfo();
         Map<RingInstance, InstanceAvailability> availability = 
cluster.getInstanceAvailability();
         availability.forEach(this::checkInstance);
     }
 
+    private void updateFailureHandler(RingInstance instance, 
InstanceAvailability availability)

Review Comment:
   Updated to make the availability checks more cohesive.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to