frankgh commented on code in PR #35:
URL: 
https://github.com/apache/cassandra-analytics/pull/35#discussion_r1468214477


##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java:
##########
@@ -108,12 +109,23 @@ private String getStreamId(TaskContext taskContext)
     public List<StreamResult> write(Iterator<Tuple2<DecoratedKey, Object[]>> 
sourceIterator)
     {
         TaskContext taskContext = taskContextSupplier.get();
+        LOGGER.info("[{}]: Processing bulk writer partition", 
taskContext.partitionId());

Review Comment:
   debug level?



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java:
##########
@@ -108,12 +109,23 @@ private String getStreamId(TaskContext taskContext)
     public List<StreamResult> write(Iterator<Tuple2<DecoratedKey, Object[]>> 
sourceIterator)
     {
         TaskContext taskContext = taskContextSupplier.get();
+        LOGGER.info("[{}]: Processing bulk writer partition", 
taskContext.partitionId());
+
         Range<BigInteger> taskTokenRange = getTokenRange(taskContext);
         Preconditions.checkState(!taskTokenRange.isEmpty(),
                                  "Token range for the partition %s is empty",
                                  taskTokenRange);
 
         TokenRangeMapping<RingInstance> initialTokenRangeMapping = 
writerContext.cluster().getTokenRangeMapping(false);
+        LOGGER.info("[{}]: Fetched token range mapping for keyspace:{} with 
write replicas:{} containing pending " +
+                    "replicas:{}, blocked instances: {}, replacement 
instances: {}",

Review Comment:
   debug level?



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java:
##########
@@ -274,16 +279,42 @@ private Set<Range<BigInteger>> 
getIntersectingSubRanges(Set<Range<BigInteger>> r
                      .collect(Collectors.toSet());
     }
 
-    private boolean 
haveTokenRangeMappingsChanged(TokenRangeMapping<RingInstance> startTaskMapping, 
Range<BigInteger> taskTokenRange)
+    private void validateTaskTokenRangeMappings(int partitionId,
+                                                
TokenRangeMapping<RingInstance> startTaskMapping,
+                                                Range<BigInteger> 
taskTokenRange)
     {
         // Get the uncached, current view of the ring to compare with initial 
ring
         TokenRangeMapping<RingInstance> endTaskMapping = 
writerContext.cluster().getTokenRangeMapping(false);
         Map<Range<BigInteger>, List<RingInstance>> startMapping = 
taskTokenRangeMapping(startTaskMapping, taskTokenRange);
         Map<Range<BigInteger>, List<RingInstance>> endMapping = 
taskTokenRangeMapping(endTaskMapping, taskTokenRange);
 
+        Set<RingInstance> initialInstances = 
instancesFromMapping(startMapping);
+        Set<RingInstance> endInstances = instancesFromMapping(endMapping);
         // Token ranges are identical and overall instance list is same
-        return !(startMapping.keySet().equals(endMapping.keySet()) &&
-               
instancesFromMapping(startMapping).equals(instancesFromMapping(endMapping)));
+        boolean haveMappingsChanged = 
!(startMapping.keySet().equals(endMapping.keySet()) &&
+                                        initialInstances.equals(endInstances));
+        if (haveMappingsChanged)
+        {
+            Set<Range<BigInteger>> rangeDelta = 
symmetricDifference(startMapping.keySet(), endMapping.keySet());
+            Set<String> instanceDelta = symmetricDifference(initialInstances, 
endInstances).stream()
+                                                                               
            .map(RingInstance::getIpAddress)
+                                                                               
            .collect(Collectors.toSet());
+            String message = String.format("[%s] Token range mappings have 
changed since the task started " +
+                                           "with non-overlapping instances: %s 
and ranges: %s",
+                                           partitionId,
+                                           instanceDelta,
+                                           rangeDelta);
+            LOGGER.error(message);
+            throw new RuntimeException(message);
+        }
+    }
+
+    public static <T> Set<T> symmetricDifference(Set<T> set1, Set<T> set2)

Review Comment:
   if feels we should be able to test this code. Maybe add some unit tests for 
this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to