arjunashok commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1417772294


##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java:
##########
@@ -132,37 +133,32 @@ public List<StreamResult> 
write(Iterator<Tuple2<DecoratedKey, Object[]>> sourceI
         Map<String, Object> valueMap = new HashMap<>();
         try
         {
-            List<RingInstance> exclusions = 
failureHandler.getFailedInstances();
             Set<Range<BigInteger>> newRanges = 
initialTokenRangeMapping.getRangeMap().asMapOfRanges().entrySet()
                                                                        
.stream()
-                                                                       
.filter(e -> !exclusions.contains(e.getValue()))
                                                                        
.map(Map.Entry::getKey)
                                                                        
.collect(Collectors.toSet());
+            Range<BigInteger> tokenRange = getTokenRange(taskContext);
+            Set<Range<BigInteger>> subRanges = newRanges.contains(tokenRange) ?
+                                               
Collections.singleton(tokenRange) :
+                                               
getIntersectingSubRanges(newRanges, tokenRange);
 
             while (dataIterator.hasNext())
             {
                 Tuple2<DecoratedKey, Object[]> rowData = dataIterator.next();
-                streamSession = maybeCreateStreamSession(taskContext, 
streamSession, rowData, newRanges, failureHandler);
-
-                sessions.add(streamSession);
+                streamSession = maybeCreateStreamSession(taskContext, 
streamSession, rowData, subRanges, failureHandler, results);
                 maybeCreateTableWriter(partitionId, baseDir);
                 writeRow(rowData, valueMap, partitionId, 
streamSession.getTokenRange());
                 checkBatchSize(streamSession, partitionId, job);
             }
 
-            // Finalize SSTable for the last StreamSession
-            if (sstableWriter != null || (streamSession != null && batchSize 
!= 0))
+            // Cleanup SSTable writer and schedule the last stream

Review Comment:
   Makes sense.



##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java:
##########
@@ -346,19 +366,22 @@ void writeBuffered()
 
     private void validateSuccessfulWrite(MockBulkWriterContext writerContext,
                                          Iterator<Tuple2<DecoratedKey, 
Object[]>> data,
-                                         String[] columnNames)
+                                         String[] columnNames) throws 
InterruptedException
     {
         validateSuccessfulWrite(writerContext, data, columnNames, 
UPLOADED_TABLES);
     }
 
     private void validateSuccessfulWrite(MockBulkWriterContext writerContext,
                                          Iterator<Tuple2<DecoratedKey, 
Object[]>> data,
                                          String[] columnNames,
-                                         int uploadedTables)
+                                         int uploadedTables) throws 
InterruptedException
     {
         RecordWriter rw = new RecordWriter(writerContext, columnNames, () -> 
tc, SSTableWriter::new);
         rw.write(data);
+        // Wait for uploads to finish
+        Thread.sleep(500);

Review Comment:
   In general, I agree with the flakiness introduced by sleep. This was added 
because when the entire test suite was executed, we did see the uploads not 
finishing before we look up the no. files that we uploaded. We could 
potentially use a latch in the `MockBulkWriterContext` to make this more 
deterministic. Will explore some more.



##########
cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java:
##########
@@ -90,16 +88,25 @@ public InstanceMetadata instanceFromId(int id) throws 
NoSuchElementException
          * @return instance meta information
          * @throws NoSuchElementException when the instance for {@code host} 
does not exist
          */
+        @Override
         public InstanceMetadata instanceFromHost(String host) throws 
NoSuchElementException
         {
-            return cassandraTestContext.instancesConfig.instanceFromHost(host);
+            return 
cassandraTestContext.instancesConfig().instanceFromHost(host);
         }
     }
 
     @Provides
     @Singleton
     public SidecarConfiguration sidecarConfiguration()
     {
-        return new SidecarConfigurationImpl(new 
ServiceConfigurationImpl("127.0.0.1"));
+        ServiceConfiguration conf = ServiceConfigurationImpl.builder()
+                                                            .host("0.0.0.0") 
// binds to all interfaces, potential security issue if left running for long

Review Comment:
   Will defer this one to @JeetKunDoug.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to