arjunashok commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1411327830


##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java:
##########
@@ -110,20 +132,47 @@ public StreamResult write(Iterator<Tuple2<DecoratedKey, 
Object[]>> sourceIterato
         Map<String, Object> valueMap = new HashMap<>();
         try
         {
+            List<RingInstance> exclusions = 
failureHandler.getFailedInstances();
+            Set<Range<BigInteger>> newRanges = 
initialTokenRangeMapping.getRangeMap().asMapOfRanges().entrySet()
+                                                                       
.stream()
+                                                                       
.filter(e -> !exclusions.contains(e.getValue()))
+                                                                       
.map(Map.Entry::getKey)
+                                                                       
.collect(Collectors.toSet());
+
             while (dataIterator.hasNext())
             {
+                Tuple2<DecoratedKey, Object[]> rowData = dataIterator.next();
+                streamSession = maybeCreateStreamSession(taskContext, 
streamSession, rowData, newRanges, failureHandler);
+
+                sessions.add(streamSession);

Review Comment:
   This was done to separate the session closures instead of having it 
scattered with checks for non-existent sessions. 
   
   However, on looking further it seems like we do not really need to eagerly 
create the session for the partition's token range. Will update for it to be 
created lazily.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to