arjunashok commented on code in PR #35: URL: https://github.com/apache/cassandra-analytics/pull/35#discussion_r1468249360
########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java: ########## @@ -274,16 +279,42 @@ private Set<Range<BigInteger>> getIntersectingSubRanges(Set<Range<BigInteger>> r .collect(Collectors.toSet()); } - private boolean haveTokenRangeMappingsChanged(TokenRangeMapping<RingInstance> startTaskMapping, Range<BigInteger> taskTokenRange) + private void validateTaskTokenRangeMappings(int partitionId, + TokenRangeMapping<RingInstance> startTaskMapping, + Range<BigInteger> taskTokenRange) { // Get the uncached, current view of the ring to compare with initial ring TokenRangeMapping<RingInstance> endTaskMapping = writerContext.cluster().getTokenRangeMapping(false); Map<Range<BigInteger>, List<RingInstance>> startMapping = taskTokenRangeMapping(startTaskMapping, taskTokenRange); Map<Range<BigInteger>, List<RingInstance>> endMapping = taskTokenRangeMapping(endTaskMapping, taskTokenRange); + Set<RingInstance> initialInstances = instancesFromMapping(startMapping); + Set<RingInstance> endInstances = instancesFromMapping(endMapping); // Token ranges are identical and overall instance list is same - return !(startMapping.keySet().equals(endMapping.keySet()) && - instancesFromMapping(startMapping).equals(instancesFromMapping(endMapping))); + boolean haveMappingsChanged = !(startMapping.keySet().equals(endMapping.keySet()) && + initialInstances.equals(endInstances)); + if (haveMappingsChanged) + { + Set<Range<BigInteger>> rangeDelta = symmetricDifference(startMapping.keySet(), endMapping.keySet()); + Set<String> instanceDelta = symmetricDifference(initialInstances, endInstances).stream() + .map(RingInstance::getIpAddress) + .collect(Collectors.toSet()); + String message = String.format("[%s] Token range mappings have changed since the task started " + + "with non-overlapping instances: %s and ranges: %s", + partitionId, + instanceDelta, + rangeDelta); + LOGGER.error(message); + throw new RuntimeException(message); + } + } + + public static <T> Set<T> symmetricDifference(Set<T> set1, Set<T> set2) Review Comment: Will do -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org