arjunashok commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1411327830
########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java: ########## @@ -110,20 +132,47 @@ public StreamResult write(Iterator<Tuple2<DecoratedKey, Object[]>> sourceIterato Map<String, Object> valueMap = new HashMap<>(); try { + List<RingInstance> exclusions = failureHandler.getFailedInstances(); + Set<Range<BigInteger>> newRanges = initialTokenRangeMapping.getRangeMap().asMapOfRanges().entrySet() + .stream() + .filter(e -> !exclusions.contains(e.getValue())) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + while (dataIterator.hasNext()) { + Tuple2<DecoratedKey, Object[]> rowData = dataIterator.next(); + streamSession = maybeCreateStreamSession(taskContext, streamSession, rowData, newRanges, failureHandler); + + sessions.add(streamSession); Review Comment: This was done to separate the session closures instead of having it scattered with checks for non-existent sessions. However, on looking further it seems like we do not really need to eagerly create the session for the partition's token range. Will update for it to be created lazily. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org