arjunashok commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1417772294
########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java: ########## @@ -132,37 +133,32 @@ public List<StreamResult> write(Iterator<Tuple2<DecoratedKey, Object[]>> sourceI Map<String, Object> valueMap = new HashMap<>(); try { - List<RingInstance> exclusions = failureHandler.getFailedInstances(); Set<Range<BigInteger>> newRanges = initialTokenRangeMapping.getRangeMap().asMapOfRanges().entrySet() .stream() - .filter(e -> !exclusions.contains(e.getValue())) .map(Map.Entry::getKey) .collect(Collectors.toSet()); + Range<BigInteger> tokenRange = getTokenRange(taskContext); + Set<Range<BigInteger>> subRanges = newRanges.contains(tokenRange) ? + Collections.singleton(tokenRange) : + getIntersectingSubRanges(newRanges, tokenRange); while (dataIterator.hasNext()) { Tuple2<DecoratedKey, Object[]> rowData = dataIterator.next(); - streamSession = maybeCreateStreamSession(taskContext, streamSession, rowData, newRanges, failureHandler); - - sessions.add(streamSession); + streamSession = maybeCreateStreamSession(taskContext, streamSession, rowData, subRanges, failureHandler, results); maybeCreateTableWriter(partitionId, baseDir); writeRow(rowData, valueMap, partitionId, streamSession.getTokenRange()); checkBatchSize(streamSession, partitionId, job); } - // Finalize SSTable for the last StreamSession - if (sstableWriter != null || (streamSession != null && batchSize != 0)) + // Cleanup SSTable writer and schedule the last stream Review Comment: Makes sense. ########## cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java: ########## @@ -346,19 +366,22 @@ void writeBuffered() private void validateSuccessfulWrite(MockBulkWriterContext writerContext, Iterator<Tuple2<DecoratedKey, Object[]>> data, - String[] columnNames) + String[] columnNames) throws InterruptedException { validateSuccessfulWrite(writerContext, data, columnNames, UPLOADED_TABLES); } private void validateSuccessfulWrite(MockBulkWriterContext writerContext, Iterator<Tuple2<DecoratedKey, Object[]>> data, String[] columnNames, - int uploadedTables) + int uploadedTables) throws InterruptedException { RecordWriter rw = new RecordWriter(writerContext, columnNames, () -> tc, SSTableWriter::new); rw.write(data); + // Wait for uploads to finish + Thread.sleep(500); Review Comment: In general, I agree with the flakiness introduced by sleep. This was added because when the entire test suite was executed, we did see the uploads not finishing before we look up the no. files that we uploaded. We could potentially use a latch in the `MockBulkWriterContext` to make this more deterministic. Will explore some more. ########## cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java: ########## @@ -90,16 +88,25 @@ public InstanceMetadata instanceFromId(int id) throws NoSuchElementException * @return instance meta information * @throws NoSuchElementException when the instance for {@code host} does not exist */ + @Override public InstanceMetadata instanceFromHost(String host) throws NoSuchElementException { - return cassandraTestContext.instancesConfig.instanceFromHost(host); + return cassandraTestContext.instancesConfig().instanceFromHost(host); } } @Provides @Singleton public SidecarConfiguration sidecarConfiguration() { - return new SidecarConfigurationImpl(new ServiceConfigurationImpl("127.0.0.1")); + ServiceConfiguration conf = ServiceConfigurationImpl.builder() + .host("0.0.0.0") // binds to all interfaces, potential security issue if left running for long Review Comment: Will defer this one to @JeetKunDoug. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org