yifan-c commented on code in PR #30:
URL: 
https://github.com/apache/cassandra-analytics/pull/30#discussion_r1447772924


##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java:
##########
@@ -120,6 +122,34 @@ public void testWriteFailWhenTopologyChangeWithinTask()
         assertThat(ex.getMessage(), endsWith("Token range mappings have 
changed since the task started"));
     }
 
+    @Test
+        public void testWriteWithBlockedInstances()

Review Comment:
   remove extra spaces



##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java:
##########
@@ -54,6 +54,41 @@ public static TokenRangeMapping<RingInstance> 
buildTokenRangeMapping(final int i
         return buildTokenRangeMapping(initialToken, rfByDC, instancesPerDC, 
false, -1);
     }
 
+    public static TokenRangeMapping<RingInstance> 
buildTokenRangeMappingWithBlockedInstance(int initialToken,
+                                                                               
             ImmutableMap<String, Integer> rfByDC,
+                                                                               
             int instancesPerDC)
+    {
+        final List<RingInstance> instances = getInstances(initialToken, 
rfByDC, instancesPerDC);

Review Comment:
   remove `final`



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidator.java:
##########
@@ -122,27 +122,43 @@ private void updateInstanceAvailability()
 
     private void validateAvailabilityAndUpdateFailures(RingInstance instance, 
InstanceAvailability availability)
     {
-        if (availability == InstanceAvailability.INVALID_STATE)
+        switch (availability)
         {
-            // If we find any nodes in a totally invalid state, just throw as 
we can't continue
-            String message = String.format("Instance (%s) is in an invalid 
state (%s) during import. "
-                                           + "Please rerun import once 
topology changes are complete.",
-                                           instance.getNodeName(), 
cluster.getInstanceState(instance));
-            throw new RuntimeException(message);
-        }
-
-        if (availability == InstanceAvailability.UNAVAILABLE_BLOCKED
-            || availability == InstanceAvailability.UNAVAILABLE_DOWN)
-        {
-            Collection<Range<BigInteger>> failedRanges = 
cluster.getTokenRangeMapping(true)
-                                                                
.getTokenRanges()
-                                                                .get(instance);
-            failedRanges.forEach(failedRange -> {
-                String nodeDisplayName = instance.getNodeName();
-                String message = String.format("%s %s", nodeDisplayName, 
availability.getMessage());
-                LOGGER.warn("{} failed in phase {} on {} because {}", 
failedRange, phase, nodeDisplayName, message);
-                failureHandler.addFailure(failedRange, instance, message);
-            });
+            case INVALID_STATE:
+                // If we find any nodes in a totally invalid state, just throw 
as we can't continue
+                String errorMessage = String.format("Instance (%s) is in an 
invalid state (%s) during import. "
+                                                    + "Please rerun import 
once topology changes are complete.",
+                                                    instance.getNodeName(), 
cluster.getInstanceState(instance));
+                throw new RuntimeException(errorMessage);
+
+            // Check for blocked instances and ranges for the purpose of 
logging only.
+            // We check for blocked instances while validating consistency 
level requirements
+            case UNAVAILABLE_BLOCKED:
+                Collection<Range<BigInteger>> rangesInBlockedInstance = 
cluster.getTokenRangeMapping(true)
+                                                                               
.getTokenRanges()
+                                                                               
.get(instance);
+                rangesInBlockedInstance.forEach(failedRange -> {
+                    String nodeDisplayName = instance.getNodeName();
+                    String message = String.format("%s %s", nodeDisplayName, 
availability.getMessage());
+                    LOGGER.warn("{} failed in phase {} on {} because {}", 
failedRange, phase, nodeDisplayName, message);
+                });
+                break;
+
+            case UNAVAILABLE_DOWN:
+                Collection<Range<BigInteger>> failedRanges = 
cluster.getTokenRangeMapping(true)
+                                                                    
.getTokenRanges()
+                                                                    
.get(instance);
+                failedRanges.forEach(failedRange -> {
+                    String nodeDisplayName = instance.getNodeName();
+                    String message = String.format("%s %s", nodeDisplayName, 
availability.getMessage());
+                    LOGGER.warn("{} failed in phase {} on {} because {}", 
failedRange, phase, nodeDisplayName, message);
+                    failureHandler.addFailure(failedRange, instance, message);
+                });
+                break;

Review Comment:
   nit: to remove some duplication
   
   ```suggestion
               case UNAVAILABLE_BLOCKED:
               case UNAVAILABLE_DOWN:
                   boolean shouldAddFailure = availability == 
InstanceAvailability.UNAVAILABLE_DOWN;
   
                   Collection<Range<BigInteger>> unavailableRanges = 
cluster.getTokenRangeMapping(true)
                                                                            
.getTokenRanges()
                                                                            
.get(instance);
                   unavailableRanges.forEach(failedRange -> {
                       String nodeDisplayName = instance.getNodeName();
                       String message = String.format("%s %s", nodeDisplayName, 
availability.getMessage());
                       LOGGER.warn("{} failed in phase {} on {} because {}", 
failedRange, phase, nodeDisplayName, message);
                       if (shouldAddFailure)
                       {
                           failureHandler.addFailure(failedRange, instance, 
message);
                       }
                   });
                   break;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to