frankgh commented on code in PR #35: URL: https://github.com/apache/cassandra-analytics/pull/35#discussion_r1468214477
########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java: ########## @@ -108,12 +109,23 @@ private String getStreamId(TaskContext taskContext) public List<StreamResult> write(Iterator<Tuple2<DecoratedKey, Object[]>> sourceIterator) { TaskContext taskContext = taskContextSupplier.get(); + LOGGER.info("[{}]: Processing bulk writer partition", taskContext.partitionId()); Review Comment: debug level? ########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java: ########## @@ -108,12 +109,23 @@ private String getStreamId(TaskContext taskContext) public List<StreamResult> write(Iterator<Tuple2<DecoratedKey, Object[]>> sourceIterator) { TaskContext taskContext = taskContextSupplier.get(); + LOGGER.info("[{}]: Processing bulk writer partition", taskContext.partitionId()); + Range<BigInteger> taskTokenRange = getTokenRange(taskContext); Preconditions.checkState(!taskTokenRange.isEmpty(), "Token range for the partition %s is empty", taskTokenRange); TokenRangeMapping<RingInstance> initialTokenRangeMapping = writerContext.cluster().getTokenRangeMapping(false); + LOGGER.info("[{}]: Fetched token range mapping for keyspace:{} with write replicas:{} containing pending " + + "replicas:{}, blocked instances: {}, replacement instances: {}", Review Comment: debug level? ########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java: ########## @@ -274,16 +279,42 @@ private Set<Range<BigInteger>> getIntersectingSubRanges(Set<Range<BigInteger>> r .collect(Collectors.toSet()); } - private boolean haveTokenRangeMappingsChanged(TokenRangeMapping<RingInstance> startTaskMapping, Range<BigInteger> taskTokenRange) + private void validateTaskTokenRangeMappings(int partitionId, + TokenRangeMapping<RingInstance> startTaskMapping, + Range<BigInteger> taskTokenRange) { // Get the uncached, current view of the ring to compare with initial ring TokenRangeMapping<RingInstance> endTaskMapping = writerContext.cluster().getTokenRangeMapping(false); Map<Range<BigInteger>, List<RingInstance>> startMapping = taskTokenRangeMapping(startTaskMapping, taskTokenRange); Map<Range<BigInteger>, List<RingInstance>> endMapping = taskTokenRangeMapping(endTaskMapping, taskTokenRange); + Set<RingInstance> initialInstances = instancesFromMapping(startMapping); + Set<RingInstance> endInstances = instancesFromMapping(endMapping); // Token ranges are identical and overall instance list is same - return !(startMapping.keySet().equals(endMapping.keySet()) && - instancesFromMapping(startMapping).equals(instancesFromMapping(endMapping))); + boolean haveMappingsChanged = !(startMapping.keySet().equals(endMapping.keySet()) && + initialInstances.equals(endInstances)); + if (haveMappingsChanged) + { + Set<Range<BigInteger>> rangeDelta = symmetricDifference(startMapping.keySet(), endMapping.keySet()); + Set<String> instanceDelta = symmetricDifference(initialInstances, endInstances).stream() + .map(RingInstance::getIpAddress) + .collect(Collectors.toSet()); + String message = String.format("[%s] Token range mappings have changed since the task started " + + "with non-overlapping instances: %s and ranges: %s", + partitionId, + instanceDelta, + rangeDelta); + LOGGER.error(message); + throw new RuntimeException(message); + } + } + + public static <T> Set<T> symmetricDifference(Set<T> set1, Set<T> set2) Review Comment: if feels we should be able to test this code. Maybe add some unit tests for this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org