arjunashok commented on code in PR #35:
URL: 
https://github.com/apache/cassandra-analytics/pull/35#discussion_r1468249360


##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java:
##########
@@ -274,16 +279,42 @@ private Set<Range<BigInteger>> 
getIntersectingSubRanges(Set<Range<BigInteger>> r
                      .collect(Collectors.toSet());
     }
 
-    private boolean 
haveTokenRangeMappingsChanged(TokenRangeMapping<RingInstance> startTaskMapping, 
Range<BigInteger> taskTokenRange)
+    private void validateTaskTokenRangeMappings(int partitionId,
+                                                
TokenRangeMapping<RingInstance> startTaskMapping,
+                                                Range<BigInteger> 
taskTokenRange)
     {
         // Get the uncached, current view of the ring to compare with initial 
ring
         TokenRangeMapping<RingInstance> endTaskMapping = 
writerContext.cluster().getTokenRangeMapping(false);
         Map<Range<BigInteger>, List<RingInstance>> startMapping = 
taskTokenRangeMapping(startTaskMapping, taskTokenRange);
         Map<Range<BigInteger>, List<RingInstance>> endMapping = 
taskTokenRangeMapping(endTaskMapping, taskTokenRange);
 
+        Set<RingInstance> initialInstances = 
instancesFromMapping(startMapping);
+        Set<RingInstance> endInstances = instancesFromMapping(endMapping);
         // Token ranges are identical and overall instance list is same
-        return !(startMapping.keySet().equals(endMapping.keySet()) &&
-               
instancesFromMapping(startMapping).equals(instancesFromMapping(endMapping)));
+        boolean haveMappingsChanged = 
!(startMapping.keySet().equals(endMapping.keySet()) &&
+                                        initialInstances.equals(endInstances));
+        if (haveMappingsChanged)
+        {
+            Set<Range<BigInteger>> rangeDelta = 
symmetricDifference(startMapping.keySet(), endMapping.keySet());
+            Set<String> instanceDelta = symmetricDifference(initialInstances, 
endInstances).stream()
+                                                                               
            .map(RingInstance::getIpAddress)
+                                                                               
            .collect(Collectors.toSet());
+            String message = String.format("[%s] Token range mappings have 
changed since the task started " +
+                                           "with non-overlapping instances: %s 
and ranges: %s",
+                                           partitionId,
+                                           instanceDelta,
+                                           rangeDelta);
+            LOGGER.error(message);
+            throw new RuntimeException(message);
+        }
+    }
+
+    public static <T> Set<T> symmetricDifference(Set<T> set1, Set<T> set2)

Review Comment:
   Will do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to