yifan-c commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1416591381
########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java: ########## @@ -207,46 +203,62 @@ private Set<RingInstance> instancesFromMapping(Map<Range<BigInteger>, List<RingI private StreamSession maybeCreateStreamSession(TaskContext taskContext, StreamSession streamSession, Tuple2<DecoratedKey, Object[]> rowData, - Set<Range<BigInteger>> newRanges, - ReplicaAwareFailureHandler<RingInstance> failureHandler) throws IOException + Set<Range<BigInteger>> subRanges, + ReplicaAwareFailureHandler<RingInstance> failureHandler, + List<StreamResult> results) + throws IOException, ExecutionException, InterruptedException { BigInteger token = rowData._1().getToken(); Range<BigInteger> tokenRange = getTokenRange(taskContext); Preconditions.checkState(tokenRange.contains(token), String.format("Received Token %s outside of expected range %s", token, tokenRange)); - // token range for this partition is not among the write-replica-set ranges - if (!newRanges.contains(tokenRange)) + // We have split ranges likely resulting from pending nodes + // Evaluate creating a new session if the token from current row is part of a sub-range + if (subRanges.size() > 1) { - Set<Range<BigInteger>> subRanges = getIntersectingSubRanges(newRanges, tokenRange); - // We have split ranges - likely resulting from pending nodes - if (subRanges.size() > 1) - { - // Create session using sub-range that contains the token from current row - Range<BigInteger> matchingRange = subRanges.stream().filter(r -> r.contains(token)).findFirst().get(); - Preconditions.checkState(matchingRange != null, - String.format("Received Token %s outside of expected range %s", token, matchingRange)); + // Create session using sub-range that contains the token from current row + Range<BigInteger> matchingSubRange = subRanges.stream().filter(r -> r.contains(token)).findFirst().get(); + Preconditions.checkState(matchingSubRange != null, + String.format("Received Token %s outside of expected range %s", token, matchingSubRange)); Review Comment: The `checkState` will not ever see `matchingSubRange == null`. The reason is that at line#222, if the value is null, the `get()` operation throws exception already. If the intent to provide a more user friendly error message, can you not call `get()` and use `Optional<Range<BigInteger>> matchingSubRangeOpt` to capture the result and run `checkState` on the optional. ########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java: ########## @@ -132,37 +133,32 @@ public List<StreamResult> write(Iterator<Tuple2<DecoratedKey, Object[]>> sourceI Map<String, Object> valueMap = new HashMap<>(); try { - List<RingInstance> exclusions = failureHandler.getFailedInstances(); Set<Range<BigInteger>> newRanges = initialTokenRangeMapping.getRangeMap().asMapOfRanges().entrySet() .stream() - .filter(e -> !exclusions.contains(e.getValue())) .map(Map.Entry::getKey) .collect(Collectors.toSet()); + Range<BigInteger> tokenRange = getTokenRange(taskContext); + Set<Range<BigInteger>> subRanges = newRanges.contains(tokenRange) ? + Collections.singleton(tokenRange) : + getIntersectingSubRanges(newRanges, tokenRange); while (dataIterator.hasNext()) { Tuple2<DecoratedKey, Object[]> rowData = dataIterator.next(); - streamSession = maybeCreateStreamSession(taskContext, streamSession, rowData, newRanges, failureHandler); - - sessions.add(streamSession); + streamSession = maybeCreateStreamSession(taskContext, streamSession, rowData, subRanges, failureHandler, results); maybeCreateTableWriter(partitionId, baseDir); writeRow(rowData, valueMap, partitionId, streamSession.getTokenRange()); checkBatchSize(streamSession, partitionId, job); } - // Finalize SSTable for the last StreamSession - if (sstableWriter != null || (streamSession != null && batchSize != 0)) + // Cleanup SSTable writer and schedule the last stream Review Comment: "Cleanup SSTable writer" reads wrong to me. I would stick with "Finalize". The code is to flush any data to sstable by closing the writer. Cleanup leads me to think it remove files. ########## cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java: ########## @@ -50,13 +51,36 @@ private TokenRangeMappingUtils() public static TokenRangeMapping<RingInstance> buildTokenRangeMapping(final int initialToken, final ImmutableMap<String, Integer> rfByDC, int instancesPerDC) { + return buildTokenRangeMapping(initialToken, rfByDC, instancesPerDC, false, -1); + } + public static TokenRangeMapping<RingInstance> buildTokenRangeMappingWithFailures(int initialToken, + ImmutableMap<String, Integer> rfByDC, + int instancesPerDC) + { final List<RingInstance> instances = getInstances(initialToken, rfByDC, instancesPerDC); + RingInstance instance = instances.remove(0); + RingEntry entry = instance.getRingInstance(); + RingEntry newEntry = new RingEntry.Builder() + .datacenter(entry.datacenter()) + .port(entry.port()) + .address(entry.address()) + .status(InstanceStatus.DOWN.name()) + .state(entry.state()) + .token(entry.token()) + .fqdn(entry.fqdn()) + .rack(entry.rack()) + .owns(entry.owns()) + .load(entry.load()) + .hostId(entry.hostId()) + .build(); + RingInstance newInstance = new RingInstance(newEntry); + instances.add(0, newInstance); ReplicationFactor replicationFactor = getReplicationFactor(rfByDC); Map<String, Set<String>> writeReplicas = - instances.stream() - .collect(Collectors.groupingBy(RingInstance::getDataCenter, - Collectors.mapping(RingInstance::getNodeName, Collectors.toSet()))); + instances.stream().collect(Collectors.groupingBy(RingInstance::getDataCenter, + Collectors.mapping(RingInstance::getNodeName, + Collectors.toSet()))); Review Comment: nit: prefer to not reformatting if there is no actual code change. ########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidator.java: ########## @@ -23,103 +23,120 @@ import java.util.AbstractMap; import java.util.Collection; import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; import com.google.common.collect.Multimap; import com.google.common.collect.Range; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler; +import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; -public class BulkWriteValidator implements AutoCloseable +public class BulkWriteValidator { private static final Logger LOGGER = LoggerFactory.getLogger(BulkWriteValidator.class); private final ReplicaAwareFailureHandler<RingInstance> failureHandler; - private final CassandraRingMonitor monitor; private final JobInfo job; private String phase = "Initializing"; private final ClusterInfo cluster; public BulkWriteValidator(BulkWriterContext bulkWriterContext, - Consumer<CancelJobEvent> cancelJobFunc) throws Exception + ReplicaAwareFailureHandler<RingInstance> failureHandler) { cluster = bulkWriterContext.cluster(); job = bulkWriterContext.job(); - failureHandler = new ReplicaAwareFailureHandler<>(cluster.getRing(true)); - monitor = new CassandraRingMonitor(cluster, cancelJobFunc, 1000, TimeUnit.MILLISECONDS); + this.failureHandler = failureHandler; } - public void setPhase(String phase) - { - this.phase = phase; - } - - public String getPhase() - { - return phase; - } - - public void validateInitialEnvironment() - { - validateCLOrFail(); - } - - public void close() - { - monitor.stop(); - } - - private void validateCLOrFail() - { - updateInstanceAvailability(); - validateClOrFail(failureHandler, LOGGER, phase, job); - } - - public static void validateClOrFail(ReplicaAwareFailureHandler<RingInstance> failureHandler, + public static void validateClOrFail(TokenRangeMapping<RingInstance> tokenRangeMapping, + ReplicaAwareFailureHandler<RingInstance> failureHandler, Logger logger, String phase, JobInfo job) { Collection<AbstractMap.SimpleEntry<Range<BigInteger>, Multimap<RingInstance, String>>> failedRanges = - failureHandler.getFailedEntries(job.getConsistencyLevel(), job.getLocalDC()); + failureHandler.getFailedEntries(tokenRangeMapping, job.getConsistencyLevel(), job.getLocalDC()); + if (failedRanges.isEmpty()) { logger.info("Succeeded {} with {}", phase, job.getConsistencyLevel()); } else { - String message = String.format("Failed to load %s ranges with %s for job %s in phase %s", + String message = String.format("Failed to load %s ranges with %s for job %s in phase %s.", failedRanges.size(), job.getConsistencyLevel(), job.getId(), phase); logger.error(message); - failedRanges.forEach(failedRange -> failedRange.getValue().keySet().forEach(instance -> - logger.error("Failed {} for {} on {}", phase, failedRange.getKey(), instance.toString()))); + failedRanges.forEach(failedRange -> + failedRange.getValue() + .keySet() + .forEach(instance -> + logger.error("Failed {} for {} on {}", + phase, + failedRange.getKey(), + instance.toString()))); throw new RuntimeException(message); } } - public static void updateFailureHandler(CommitResult commitResult, - String phase, + public String getPhase() + { + return phase; + } + + public void setPhase(String phase) + { + this.phase = phase; + } + + public static void updateFailureHandler(CommitResult commitResult, String phase, ReplicaAwareFailureHandler<RingInstance> failureHandler) { LOGGER.debug("Commit Result: {}", commitResult); commitResult.failures.forEach((uuid, err) -> { LOGGER.warn("[{}]: {} failed on {} with message {}", - uuid, phase, commitResult.instance.getNodeName(), err.errMsg); + uuid, + phase, + commitResult.instance.getNodeName(), + err.errMsg); failureHandler.addFailure(err.tokenRange, commitResult.instance, err.errMsg); }); } + public void validateClOrFail(TokenRangeMapping<RingInstance> tokenRangeMapping) + { + // Updates failures by looking up instance metadata + updateInstanceAvailability(); + // Fails if the failures violate consistency requirements + validateClOrFail(tokenRangeMapping, failureHandler, LOGGER, phase, job); + } + private void updateInstanceAvailability() { cluster.refreshClusterInfo(); Map<RingInstance, InstanceAvailability> availability = cluster.getInstanceAvailability(); availability.forEach(this::checkInstance); } + private void updateFailureHandler(RingInstance instance, InstanceAvailability availability) Review Comment: This method is only called in `private void checkInstance(RingInstance instance, InstanceAvailability availability)`, which throws exception when `availability == InstanceAvailability.INVALID_STATE`. It voids the need to check the condition again in this method. ########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CommitCoordinator.java: ########## @@ -128,61 +128,51 @@ private Stream<ListenableFuture<CommitResult>> commit(Map<RingInstance, Listenin RingInstance instance, Map<String, Range<BigInteger>> uploadRanges) { - if (cluster.instanceIsAvailable(instance)) Review Comment: What if the instance is down? The new code will try to commit the sstable on those down instances. Should it skip contacting sidecar and mark it as failure directly? ########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamSession.java: ########## @@ -145,32 +157,27 @@ private List<CommitResult> commit(StreamResult streamResult) throws ExecutionExc @VisibleForTesting List<RingInstance> getReplicas() { - Map<Range<BigInteger>, List<RingInstance>> overlappingRanges = ring.getSubRanges(tokenRange).asMapOfRanges(); + List<RingInstance> exclusions = failureHandler.getFailedInstances(); + final Map<Range<BigInteger>, List<RingInstance>> overlappingRanges = tokenRangeMapping.getSubRanges(tokenRange).asMapOfRanges(); Review Comment: drop the `final` for local scoped variable. ########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ReplicaAwareFailureHandler.java: ########## @@ -71,64 +72,138 @@ public void addFailure(Range<BigInteger> tokenRange, Instance casInstance, Strin for (Map.Entry<Range<BigInteger>, Multimap<Instance, String>> entry : overlappingFailures.asMapOfRanges().entrySet()) { Multimap<Instance, String> newErrorMap = ArrayListMultimap.create(entry.getValue()); - newErrorMap.put(casInstance, errMessage); mappingsToAdd.put(entry.getKey(), newErrorMap); } failedRangesMap.putAll(mappingsToAdd); } - public boolean hasFailed(ConsistencyLevel consistencyLevel, String localDC) + public List<Instance> getFailedInstances() Review Comment: How about returning a set instead of list? ########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ConsistencyLevel.java: ########## @@ -19,19 +19,42 @@ package org.apache.cassandra.spark.bulkwriter.token; -import java.util.Collection; +import java.util.Set; -import com.google.common.base.Preconditions; - -import org.apache.cassandra.spark.common.model.CassandraInstance; -import org.apache.cassandra.spark.data.ReplicationFactor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public interface ConsistencyLevel { boolean isLocal(); - boolean checkConsistency(Collection<? extends CassandraInstance> failedInsts, ReplicationFactor replicationFactor, String localDC); - + Logger LOGGER = LoggerFactory.getLogger(ConsistencyLevel.class); + + /** + * Checks if the consistency guarantees are maintained, given the failed, blocked and replacing instances, consistency-level and the replication-factor. + * <pre> + * - QUORUM based consistency levels check for quorum using the write-replica-set (instead of RF) as they include healthy and pending nodes. + * This is done to ensure that writes go to a quorum of healthy nodes while accounting for potential failure in pending nodes becoming healthy. + * - ONE and TWO consistency guarantees are maintained by ensuring that the failures leave us with at-least the corresponding healthy + * (and non-pending) nodes. + * + * For both the above cases, blocked instances are also considered as failures while performing consistency checks. + * Write replicas are adjusted to exclude replacement nodes for consistency checks, if we have replacement nodes that are not among the failed instances. + * This is to ensure that we are writing to sufficient non-replacement nodes as replacements can potentially fail leaving us with fewer nodes. + * </pre> + * + * TODO javadocs Review Comment: Is it addressed? ########## cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java: ########## @@ -69,64 +78,217 @@ public class RecordWriterTest @BeforeEach public void setUp() { - tw = new MockTableWriter(folder); - ring = RingUtils.buildRing(0, "DC1", "test", 12); - writerContext = new MockBulkWriterContext(ring); + tw = new MockTableWriter(folder.getRoot()); + tokenRangeMapping = TokenRangeMappingUtils.buildTokenRangeMapping(100000, ImmutableMap.of("DC1", 3), 12); + writerContext = new MockBulkWriterContext(tokenRangeMapping); tc = new TestTaskContext(); range = writerContext.job().getTokenPartitioner().getTokenRange(tc.partitionId()); tokenizer = new Tokenizer(writerContext); } @Test - public void testSuccessfulWrite() + public void testWriteFailWhenTopologyChangeWithinTask() + { + // Generate token range mapping to simulate node movement of the first node by assigning it a different token + // within the same partition + int moveTargetToken = 50000; + TokenRangeMapping<RingInstance> testMapping = + TokenRangeMappingUtils.buildTokenRangeMapping(100000, + ImmutableMap.of("DC1", 3), + 12, + true, + moveTargetToken); + + MockBulkWriterContext m = Mockito.spy(writerContext); + rw = new RecordWriter(m, COLUMN_NAMES, () -> tc, SSTableWriter::new); + + when(m.getTokenRangeMapping(false)).thenCallRealMethod().thenReturn(testMapping); + Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true); + RuntimeException ex = assertThrows(RuntimeException.class, () -> rw.write(data)); + assertThat(ex.getMessage(), endsWith("Token range mappings have changed since the task started")); + } + + @Test + public void testWriteWithExclusions() + { + TokenRangeMapping<RingInstance> testMapping = + TokenRangeMappingUtils.buildTokenRangeMappingWithFailures(100000, + ImmutableMap.of("DC1", 3), + 12); + + MockBulkWriterContext m = Mockito.spy(writerContext); + rw = new RecordWriter(m, COLUMN_NAMES, () -> tc, SSTableWriter::new); + + when(m.getTokenRangeMapping(anyBoolean())).thenReturn(testMapping); + when(m.getInstanceAvailability()).thenCallRealMethod(); + Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true); + rw.write(data); + Map<CassandraInstance, List<UploadRequest>> uploads = writerContext.getUploads(); + assertThat(uploads.keySet().size(), is(REPLICA_COUNT)); // Should upload to 3 replicas + } + + @Test + public void testSuccessfulWrite() throws InterruptedException { Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true); validateSuccessfulWrite(writerContext, data, COLUMN_NAMES); } @Test - public void testWriteWithConstantTTL() + public void testSuccessfulWriteCheckUploads() { - MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(ring); + rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, SSTableWriter::new); + Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true); + rw.write(data); + Map<CassandraInstance, List<UploadRequest>> uploads = writerContext.getUploads(); + assertThat(uploads.keySet().size(), is(REPLICA_COUNT)); // Should upload to 3 replicas + assertThat(uploads.values().stream().mapToInt(List::size).sum(), is(REPLICA_COUNT * FILES_PER_SSTABLE * UPLOADED_TABLES)); + List<UploadRequest> requests = uploads.values().stream().flatMap(List::stream).collect(Collectors.toList()); + for (UploadRequest ur : requests) + { + assertNotNull(ur.fileHash); + } + } + + @Test + public void testWriteWithConstantTTL() throws InterruptedException + { + MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(tokenRangeMapping); Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, false, false); validateSuccessfulWrite(bulkWriterContext, data, COLUMN_NAMES); } @Test - public void testWriteWithTTLColumn() + public void testWriteWithTTLColumn() throws InterruptedException { - MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(ring); + MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(tokenRangeMapping); Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, true, false); - String[] columnNamesWithTtl = {"id", "date", "course", "marks", "ttl"}; + String[] columnNamesWithTtl = + { + "id", "date", "course", "marks", "ttl" + }; validateSuccessfulWrite(bulkWriterContext, data, columnNamesWithTtl); } @Test - public void testWriteWithConstantTimestamp() + public void testWriteWithConstantTimestamp() throws InterruptedException { - MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(ring); + MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(tokenRangeMapping); Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, false, false); validateSuccessfulWrite(bulkWriterContext, data, COLUMN_NAMES); } @Test - public void testWriteWithTimestampColumn() + public void testWriteWithTimestampColumn() throws InterruptedException { - MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(ring); + MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(tokenRangeMapping); Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, false, true); - String[] columnNamesWithTimestamp = {"id", "date", "course", "marks", "timestamp"}; + String[] columnNamesWithTimestamp = + { + "id", "date", "course", "marks", "timestamp" + }; validateSuccessfulWrite(bulkWriterContext, data, columnNamesWithTimestamp); } @Test - public void testWriteWithTimestampAndTTLColumn() + public void testWriteWithTimestampAndTTLColumn() throws InterruptedException { - MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(ring); + MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(tokenRangeMapping); Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, true, true); - String[] columnNames = {"id", "date", "course", "marks", "ttl", "timestamp"}; + String[] columnNames = + { + "id", "date", "course", "marks", "ttl", "timestamp" + }; validateSuccessfulWrite(bulkWriterContext, data, columnNames); } + @Test + public void testWriteWithSubRanges() + { + MockBulkWriterContext m = Mockito.spy(writerContext); + TokenPartitioner mtp = Mockito.mock(TokenPartitioner.class); + when(m.job().getTokenPartitioner()).thenReturn(mtp); + + // Override partition's token range to span across ranges to force a split into sub-ranges + Range<BigInteger> overlapRange = Range.closed(BigInteger.valueOf(-9223372036854775808L), BigInteger.valueOf(200000)); + when(mtp.getTokenRange(anyInt())).thenReturn(overlapRange); + + rw = new RecordWriter(m, COLUMN_NAMES, () -> tc, SSTableWriter::new); + Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true); + List<StreamResult> res = rw.write(data); + assertEquals(1, res.size()); + assertNotEquals(overlapRange, res.get(0).tokenRange); + final Map<CassandraInstance, List<UploadRequest>> uploads = m.getUploads(); + // Should upload to 3 replicas + assertEquals(uploads.keySet().size(), REPLICA_COUNT); + assertThat(uploads.values().stream().mapToInt(List::size).sum(), is(REPLICA_COUNT * FILES_PER_SSTABLE * UPLOADED_TABLES)); + List<UploadRequest> requests = uploads.values().stream().flatMap(List::stream).collect(Collectors.toList()); + for (UploadRequest ur : requests) + { + assertNotNull(ur.fileHash); + } + } + + @Test + public void testWriteWithSubRanges2() Review Comment: can the test name be more descriptive? If comment helps, please add comments too. ########## cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java: ########## @@ -90,16 +88,25 @@ public InstanceMetadata instanceFromId(int id) throws NoSuchElementException * @return instance meta information * @throws NoSuchElementException when the instance for {@code host} does not exist */ + @Override public InstanceMetadata instanceFromHost(String host) throws NoSuchElementException { - return cassandraTestContext.instancesConfig.instanceFromHost(host); + return cassandraTestContext.instancesConfig().instanceFromHost(host); } } @Provides @Singleton public SidecarConfiguration sidecarConfiguration() { - return new SidecarConfigurationImpl(new ServiceConfigurationImpl("127.0.0.1")); + ServiceConfiguration conf = ServiceConfigurationImpl.builder() + .host("0.0.0.0") // binds to all interfaces, potential security issue if left running for long Review Comment: Is it required to bind to `0.0.0.0`? ########## cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java: ########## @@ -82,6 +106,7 @@ public static TokenRangeMapping<RingInstance> buildTokenRangeMapping(final int i Collections.emptySet()); } + Review Comment: nit: remove duplicated empty lines ########## cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java: ########## @@ -346,19 +366,22 @@ void writeBuffered() private void validateSuccessfulWrite(MockBulkWriterContext writerContext, Iterator<Tuple2<DecoratedKey, Object[]>> data, - String[] columnNames) + String[] columnNames) throws InterruptedException { validateSuccessfulWrite(writerContext, data, columnNames, UPLOADED_TABLES); } private void validateSuccessfulWrite(MockBulkWriterContext writerContext, Iterator<Tuple2<DecoratedKey, Object[]>> data, String[] columnNames, - int uploadedTables) + int uploadedTables) throws InterruptedException { RecordWriter rw = new RecordWriter(writerContext, columnNames, () -> tc, SSTableWriter::new); rw.write(data); + // Wait for uploads to finish + Thread.sleep(500); Map<CassandraInstance, List<UploadRequest>> uploads = writerContext.getUploads(); + System.out.println("Uploads in test: " + uploads.values().stream().mapToInt(List::size).sum()); Review Comment: remove it or use logger. ########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java: ########## @@ -98,39 +95,19 @@ public void insert(@NotNull Dataset<Row> data, boolean overwrite) Tokenizer tokenizer = new Tokenizer(writerContext); TableSchema tableSchema = writerContext.schema().getTableSchema(); JavaPairRDD<DecoratedKey, Object[]> sortedRDD = data.toJavaRDD() - .map(Row::toSeq) - .map(seq -> JavaConverters.seqAsJavaListConverter(seq).asJava().toArray()) - .map(tableSchema::normalize) - .keyBy(tokenizer::getDecoratedKey) - .repartitionAndSortWithinPartitions(broadcastContext.getValue().job().getTokenPartitioner()); + .map(Row::toSeq) + .map(seq -> JavaConverters.seqAsJavaListConverter(seq).asJava().toArray()) + .map(tableSchema::normalize) + .keyBy(tokenizer::getDecoratedKey) + .repartitionAndSortWithinPartitions(broadcastContext.getValue().job().getTokenPartitioner()); Review Comment: The indentation was so for the sake of readability. The reformatting has no other change but just increasing indentations. Such reformatting is not preferred as it creates noise for review. ########## cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java: ########## @@ -346,19 +366,22 @@ void writeBuffered() private void validateSuccessfulWrite(MockBulkWriterContext writerContext, Iterator<Tuple2<DecoratedKey, Object[]>> data, - String[] columnNames) + String[] columnNames) throws InterruptedException { validateSuccessfulWrite(writerContext, data, columnNames, UPLOADED_TABLES); } private void validateSuccessfulWrite(MockBulkWriterContext writerContext, Iterator<Tuple2<DecoratedKey, Object[]>> data, String[] columnNames, - int uploadedTables) + int uploadedTables) throws InterruptedException { RecordWriter rw = new RecordWriter(writerContext, columnNames, () -> tc, SSTableWriter::new); rw.write(data); + // Wait for uploads to finish + Thread.sleep(500); Review Comment: using `sleep` is prone to flaky test. It depends a lot on the test runtime. We should do the best to avoid the usage. The `rw.write` is already blocking. It blocks until the sstables are uploaded. I am not sure why sleep is needed. ########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java: ########## @@ -339,140 +336,190 @@ public String getLowestCassandraVersion() return cassandraVersion; } - public String getVersionFromFeature() + @Override + public Map<RingInstance, InstanceAvailability> getInstanceAvailability() { - return null; + TokenRangeMapping<RingInstance> mapping = getTokenRangeMapping(true); + Map<RingInstance, InstanceAvailability> result = + mapping.getReplicaMetadata() + .stream() + .map(RingInstance::new) + .collect(Collectors.toMap(Function.identity(), this::determineInstanceAvailability)); + + if (LOGGER.isDebugEnabled()) + { + result.forEach((inst, avail) -> LOGGER.debug("Instance {} has availability {}", inst, avail)); + } + return result; } - protected List<NodeSettings> getAllNodeSettings() + private InstanceAvailability determineInstanceAvailability(RingInstance instance) { - List<NodeSettings> allNodeSettings = FutureUtils.bestEffortGet(allNodeSettingFutures, - conf.getSidecarRequestMaxRetryDelayInSeconds(), - TimeUnit.SECONDS); - - if (allNodeSettings.isEmpty()) + if (!instanceIsUp(instance.getRingInstance())) { - throw new RuntimeException(String.format("Unable to determine the node settings. 0/%d instances available.", - allNodeSettingFutures.size())); + return InstanceAvailability.UNAVAILABLE_DOWN; } - else if (allNodeSettings.size() < allNodeSettingFutures.size()) + if (instanceIsBlocked(instance)) { - LOGGER.warn("{}/{} instances were used to determine the node settings", - allNodeSettings.size(), allNodeSettingFutures.size()); + return InstanceAvailability.UNAVAILABLE_BLOCKED; } - - return allNodeSettings; - } - - public String getVersionFromSidecar() - { - NodeSettings nodeSettings = this.nodeSettings.get(); - if (nodeSettings != null) + if (instanceIsNormal(instance.getRingInstance()) || + instanceIsTransitioning(instance.getRingInstance()) || + instanceIsBeingReplaced(instance.getRingInstance())) { - return nodeSettings.releaseVersion(); + return InstanceAvailability.AVAILABLE; } - return getLowestVersion(getAllNodeSettings()); + LOGGER.info("No valid state found for instance {}", instance); + // If it's not one of the above, it's inherently INVALID. + return InstanceAvailability.INVALID_STATE; } - protected RingResponse getRingResponse() + private TokenRangeMapping<RingInstance> getTokenRangeReplicas() { - RingResponse currentRingResponse = ringResponse; - if (currentRingResponse != null) + Map<String, Set<String>> writeReplicasByDC; + Map<String, Set<String>> pendingReplicasByDC; + List<ReplicaMetadata> replicaMetadata; + Set<RingInstance> blockedInstances; + Set<RingInstance> replacementInstances; + Multimap<RingInstance, Range<BigInteger>> tokenRangesByInstance; + try { - return currentRingResponse; - } + TokenRangeReplicasResponse response = getTokenRangesAndReplicaSets(); + replicaMetadata = response.replicaMetadata(); - synchronized (this) - { - if (ringResponse == null) + tokenRangesByInstance = getTokenRangesByInstance(response.writeReplicas(), response.replicaMetadata()); + LOGGER.info("Retrieved token ranges for {} instances from write replica set ", + tokenRangesByInstance.size()); + + replacementInstances = response.replicaMetadata() + .stream() + .filter(m -> m.state().equalsIgnoreCase(InstanceState.REPLACING.toString())) + .map(RingInstance::new) + .collect(Collectors.toSet()); + + blockedInstances = response.replicaMetadata().stream() + .map(RingInstance::new) + .filter(this::instanceIsBlocked) + .collect(Collectors.toSet()); + + Set<String> blockedIps = blockedInstances.stream().map(i -> i.getRingInstance().address()) + .collect(Collectors.toSet()); + + // Each token range has hosts by DC. We collate them across all ranges into all hosts by DC + writeReplicasByDC = response.writeReplicas() + .stream() + .flatMap(wr -> wr.replicasByDatacenter().entrySet().stream()) + .collect(Collectors.toMap(Map.Entry::getKey, e -> new HashSet<>(e.getValue()), + (l1, l2) -> filterAndMergeInstances(l1, l2, blockedIps))); + + pendingReplicasByDC = getPendingReplicas(response, writeReplicasByDC); + + if (LOGGER.isDebugEnabled()) { - try - { - ringResponse = getCurrentRingResponse(); - } - catch (Exception exception) - { - LOGGER.error("Failed to load Cassandra ring", exception); - throw new RuntimeException(exception); - } + LOGGER.debug("Fetched token-ranges with dcs={}, write_replica_count={}, pending_replica_count={}", + writeReplicasByDC.keySet(), + writeReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size(), + pendingReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size()); } - return ringResponse; } - } + catch (ExecutionException | InterruptedException exception) + { + LOGGER.error("Failed to get token ranges, ", exception); + throw new RuntimeException(exception); + } - private RingResponse getCurrentRingResponse() throws Exception - { - return getCassandraContext().getSidecarClient().ring(conf.keyspace).get(); + // Include availability info so CL checks can use it to exclude replacement hosts + return new TokenRangeMapping<>(getPartitioner(), + getReplicationFactor(), + writeReplicasByDC, + pendingReplicasByDC, + tokenRangesByInstance, + replicaMetadata, + blockedInstances, + replacementInstances); } - private static List<RingInstance> getSerializableInstances(RingResponse ringResponse) + private Set<String> filterAndMergeInstances(Set<String> instancesList1, Set<String> instancesList2, Set<String> blockedIPs) { - return ringResponse.stream() - .map(RingInstance::new) - .collect(Collectors.toList()); + Set<String> merged = new HashSet<>(); + // Removes blocked instances. If this is included, remove blockedInstances from CL checks + merged.addAll(instancesList1.stream().filter(i -> !blockedIPs.contains(i)).collect(Collectors.toSet())); + merged.addAll(instancesList2.stream().filter(i -> !blockedIPs.contains(i)).collect(Collectors.toSet())); + + return merged; } - private static RingInstance getCasInstanceMethodsImpl(RingEntry ringEntry) + // Pending replicas are currently calculated by extracting the non-read-replicas from the write-replica-set + // This will be replaced by the instance state metadata when it is supported by the token-ranges API + private Map<String, Set<String>> getPendingReplicas(TokenRangeReplicasResponse response, Map<String, Set<String>> writeReplicasByDC) { - return new RingInstance(ringEntry); + Set<String> readReplicas = readReplicasFromTokenRangeResponse(response); + return writeReplicasByDC.entrySet() + .stream() + .filter(entry -> entry.getValue().stream().noneMatch(readReplicas::contains)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } - protected GossipInfoResponse getGossipInfo(boolean forceRefresh) + private Multimap<RingInstance, Range<BigInteger>> getTokenRangesByInstance(List<ReplicaInfo> writeReplicas, + List<ReplicaMetadata> replicaMetadata) { - GossipInfoResponse currentGossipInfo = gossipInfo; - if (!forceRefresh && currentGossipInfo != null) - { - return currentGossipInfo; - } - - synchronized (this) + Multimap<RingInstance, Range<BigInteger>> instanceToRangeMap = ArrayListMultimap.create(); + for (ReplicaInfo rInfo : writeReplicas) { - if (forceRefresh || gossipInfo == null) + Range<BigInteger> range = Range.openClosed(new BigInteger(rInfo.start()), new BigInteger(rInfo.end())); + for (Map.Entry<String, List<String>> dcReplicaEntry : rInfo.replicasByDatacenter().entrySet()) { - try - { - gossipInfo = cassandraContext.getSidecarClient().gossipInfo().get(conf.getHttpResponseTimeoutMs(), - TimeUnit.MILLISECONDS); - } - catch (ExecutionException | InterruptedException exception) - { - LOGGER.error("Failed to retrieve gossip information"); - throw new RuntimeException("Failed to retrieve gossip information", exception); - } - catch (TimeoutException exception) - { - Thread.currentThread().interrupt(); - throw new RuntimeException("Failed to retrieve gossip information", exception); - } + // For each writeReplica, get metadata and update map to include range + dcReplicaEntry.getValue().forEach(ipAddress -> { + // Get metadata for this IP; Create RingInstance + // TODO: Temporary change to extract IP from 'ip:port' string. THis will go oway once + // corresponding change in sidecar is merged. + ReplicaMetadata replica = replicaMetadata.stream() + .filter(r -> + r.address().equals(ipAddress.split(":")[0])) Review Comment: what if it is an IPv6 address? ########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java: ########## @@ -339,140 +336,190 @@ public String getLowestCassandraVersion() return cassandraVersion; } - public String getVersionFromFeature() + @Override + public Map<RingInstance, InstanceAvailability> getInstanceAvailability() { - return null; + TokenRangeMapping<RingInstance> mapping = getTokenRangeMapping(true); + Map<RingInstance, InstanceAvailability> result = + mapping.getReplicaMetadata() + .stream() + .map(RingInstance::new) + .collect(Collectors.toMap(Function.identity(), this::determineInstanceAvailability)); + + if (LOGGER.isDebugEnabled()) + { + result.forEach((inst, avail) -> LOGGER.debug("Instance {} has availability {}", inst, avail)); + } + return result; } - protected List<NodeSettings> getAllNodeSettings() + private InstanceAvailability determineInstanceAvailability(RingInstance instance) { - List<NodeSettings> allNodeSettings = FutureUtils.bestEffortGet(allNodeSettingFutures, - conf.getSidecarRequestMaxRetryDelayInSeconds(), - TimeUnit.SECONDS); - - if (allNodeSettings.isEmpty()) + if (!instanceIsUp(instance.getRingInstance())) { - throw new RuntimeException(String.format("Unable to determine the node settings. 0/%d instances available.", - allNodeSettingFutures.size())); + return InstanceAvailability.UNAVAILABLE_DOWN; } - else if (allNodeSettings.size() < allNodeSettingFutures.size()) + if (instanceIsBlocked(instance)) { - LOGGER.warn("{}/{} instances were used to determine the node settings", - allNodeSettings.size(), allNodeSettingFutures.size()); + return InstanceAvailability.UNAVAILABLE_BLOCKED; } - - return allNodeSettings; - } - - public String getVersionFromSidecar() - { - NodeSettings nodeSettings = this.nodeSettings.get(); - if (nodeSettings != null) + if (instanceIsNormal(instance.getRingInstance()) || + instanceIsTransitioning(instance.getRingInstance()) || + instanceIsBeingReplaced(instance.getRingInstance())) { - return nodeSettings.releaseVersion(); + return InstanceAvailability.AVAILABLE; } - return getLowestVersion(getAllNodeSettings()); + LOGGER.info("No valid state found for instance {}", instance); + // If it's not one of the above, it's inherently INVALID. + return InstanceAvailability.INVALID_STATE; } - protected RingResponse getRingResponse() + private TokenRangeMapping<RingInstance> getTokenRangeReplicas() { - RingResponse currentRingResponse = ringResponse; - if (currentRingResponse != null) + Map<String, Set<String>> writeReplicasByDC; + Map<String, Set<String>> pendingReplicasByDC; + List<ReplicaMetadata> replicaMetadata; + Set<RingInstance> blockedInstances; + Set<RingInstance> replacementInstances; + Multimap<RingInstance, Range<BigInteger>> tokenRangesByInstance; + try { - return currentRingResponse; - } + TokenRangeReplicasResponse response = getTokenRangesAndReplicaSets(); + replicaMetadata = response.replicaMetadata(); - synchronized (this) - { - if (ringResponse == null) + tokenRangesByInstance = getTokenRangesByInstance(response.writeReplicas(), response.replicaMetadata()); + LOGGER.info("Retrieved token ranges for {} instances from write replica set ", + tokenRangesByInstance.size()); + + replacementInstances = response.replicaMetadata() + .stream() + .filter(m -> m.state().equalsIgnoreCase(InstanceState.REPLACING.toString())) + .map(RingInstance::new) + .collect(Collectors.toSet()); + + blockedInstances = response.replicaMetadata().stream() + .map(RingInstance::new) + .filter(this::instanceIsBlocked) + .collect(Collectors.toSet()); + + Set<String> blockedIps = blockedInstances.stream().map(i -> i.getRingInstance().address()) + .collect(Collectors.toSet()); + + // Each token range has hosts by DC. We collate them across all ranges into all hosts by DC + writeReplicasByDC = response.writeReplicas() + .stream() + .flatMap(wr -> wr.replicasByDatacenter().entrySet().stream()) + .collect(Collectors.toMap(Map.Entry::getKey, e -> new HashSet<>(e.getValue()), + (l1, l2) -> filterAndMergeInstances(l1, l2, blockedIps))); + + pendingReplicasByDC = getPendingReplicas(response, writeReplicasByDC); + + if (LOGGER.isDebugEnabled()) { - try - { - ringResponse = getCurrentRingResponse(); - } - catch (Exception exception) - { - LOGGER.error("Failed to load Cassandra ring", exception); - throw new RuntimeException(exception); - } + LOGGER.debug("Fetched token-ranges with dcs={}, write_replica_count={}, pending_replica_count={}", + writeReplicasByDC.keySet(), + writeReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size(), + pendingReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size()); } - return ringResponse; } - } + catch (ExecutionException | InterruptedException exception) + { + LOGGER.error("Failed to get token ranges, ", exception); + throw new RuntimeException(exception); + } - private RingResponse getCurrentRingResponse() throws Exception - { - return getCassandraContext().getSidecarClient().ring(conf.keyspace).get(); + // Include availability info so CL checks can use it to exclude replacement hosts + return new TokenRangeMapping<>(getPartitioner(), + getReplicationFactor(), + writeReplicasByDC, + pendingReplicasByDC, + tokenRangesByInstance, + replicaMetadata, + blockedInstances, + replacementInstances); } - private static List<RingInstance> getSerializableInstances(RingResponse ringResponse) + private Set<String> filterAndMergeInstances(Set<String> instancesList1, Set<String> instancesList2, Set<String> blockedIPs) { - return ringResponse.stream() - .map(RingInstance::new) - .collect(Collectors.toList()); + Set<String> merged = new HashSet<>(); + // Removes blocked instances. If this is included, remove blockedInstances from CL checks + merged.addAll(instancesList1.stream().filter(i -> !blockedIPs.contains(i)).collect(Collectors.toSet())); + merged.addAll(instancesList2.stream().filter(i -> !blockedIPs.contains(i)).collect(Collectors.toSet())); + + return merged; } - private static RingInstance getCasInstanceMethodsImpl(RingEntry ringEntry) + // Pending replicas are currently calculated by extracting the non-read-replicas from the write-replica-set + // This will be replaced by the instance state metadata when it is supported by the token-ranges API Review Comment: Replica metadata is exposed from the `TokenRangeReplicasResponse` now. Is the implementation outdated? ########## cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ResiliencyTestBase.java: ########## @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.analytics; + +import java.io.IOException; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Range; + +import com.datastax.driver.core.ConsistencyLevel; +import o.a.c.analytics.sidecar.shaded.testing.adapters.base.StorageJmxOperations; +import o.a.c.analytics.sidecar.shaded.testing.common.JmxClient; +import o.a.c.analytics.sidecar.shaded.testing.common.data.QualifiedTableName; +import org.apache.cassandra.distributed.UpgradeableCluster; +import org.apache.cassandra.distributed.api.IInstanceConfig; +import org.apache.cassandra.distributed.api.IUpgradeableInstance; +import org.apache.cassandra.distributed.api.Row; +import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.sidecar.testing.IntegrationTestBase; +import org.apache.cassandra.spark.KryoRegister; +import org.apache.cassandra.spark.bulkwriter.BulkSparkConf; +import org.apache.cassandra.spark.bulkwriter.DecoratedKey; +import org.apache.cassandra.spark.bulkwriter.Tokenizer; +import org.apache.cassandra.spark.common.schema.ColumnType; +import org.apache.cassandra.spark.common.schema.ColumnTypes; +import org.apache.cassandra.testing.CassandraIntegrationTest; +import org.apache.cassandra.testing.ConfigurableCassandraTestContext; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.DataFrameWriter; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.StructType; +import scala.Tuple2; + +import static junit.framework.TestCase.assertTrue; +import static org.apache.cassandra.distributed.shared.NetworkTopology.dcAndRack; +import static org.apache.cassandra.distributed.shared.NetworkTopology.networkTopology; +import static org.apache.spark.sql.types.DataTypes.IntegerType; +import static org.apache.spark.sql.types.DataTypes.StringType; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Base class for resiliency tests. Contains helper methods for data generation and validation + */ +public abstract class ResiliencyTestBase extends IntegrationTestBase +{ + private static final String createTableStmt = "create table if not exists %s (id int, course text, marks int, primary key (id));"; + protected static final String retrieveRows = "select * from " + TEST_KEYSPACE + ".%s"; + public static final int rowCount = 1000; + + public QualifiedTableName initializeSchema() + { + return initializeSchema(ImmutableMap.of("datacenter1", 1)); + } + + public QualifiedTableName initializeSchema(Map<String, Integer> rf) + { + createTestKeyspace(rf); + return createTestTable(createTableStmt); + } + + public SparkConf generateSparkConf() + { + SparkConf sparkConf = new SparkConf() + .setAppName("Integration test Spark Cassandra Bulk Reader Job") + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .set("spark.master", "local[8,4]"); + BulkSparkConf.setupSparkConf(sparkConf, true); + KryoRegister.setup(sparkConf); + return sparkConf; + } + + public SparkSession generateSparkSession(SparkConf sparkConf) + { + return SparkSession.builder() + .config(sparkConf) + .getOrCreate(); + } + + public Set<String> getDataForRange(Range<BigInteger> range) + { + // Iterate through all data entries; filter only entries that belong to range; convert to strings + return generateExpectedData().stream() + .filter(t -> range.contains(t._1().getToken())) + .map(t -> t._2()[0] + ":" + t._2()[1] + ":" + t._2()[2]) + .collect(Collectors.toSet()); + } + + public List<Tuple2<DecoratedKey, Object[]>> generateExpectedData() + { + // "create table if not exists %s (id int, course text, marks int, primary key (id));"; + List<ColumnType<?>> columnTypes = Arrays.asList(ColumnTypes.INT); + Tokenizer tokenizer = new Tokenizer(Arrays.asList(0), + Arrays.asList("id"), + columnTypes, + true + ); + return IntStream.range(0, rowCount).mapToObj(recordNum -> { + Object[] columns = new Object[] + { + recordNum, "course" + recordNum, recordNum + }; + return Tuple2.apply(tokenizer.getDecoratedKey(columns), columns); + }).collect(Collectors.toList()); + } + + public Map<IUpgradeableInstance, Set<String>> getInstanceData(List<IUpgradeableInstance> instances, + boolean isPending) + { + + return instances.stream().collect(Collectors.toMap(Function.identity(), + i -> filterTokenRangeData(getRangesForInstance(i, isPending)))); + } + + public Set<String> filterTokenRangeData(List<Range<BigInteger>> ranges) + { + return ranges.stream() + .map(r -> getDataForRange(r)) + .flatMap(Collection::stream) + .collect(Collectors.toSet()); + } + + private List<Range<BigInteger>> getRangesForInstance(IUpgradeableInstance instance, boolean isPending) + { + IInstanceConfig config = instance.config(); + JmxClient client = JmxClient.builder() + .host(config.broadcastAddress().getAddress().getHostAddress()) + .port(config.jmxPort()) + .build(); + StorageJmxOperations ss = client.proxy(StorageJmxOperations.class, "org.apache.cassandra.db:type=StorageService"); + + Map<List<String>, List<String>> ranges = isPending ? ss.getPendingRangeToEndpointWithPortMap(TEST_KEYSPACE) + : ss.getRangeToEndpointWithPortMap(TEST_KEYSPACE); + + // filter ranges that belong to the instance + return ranges.entrySet() + .stream() + .filter(e -> e.getValue().contains(instance.broadcastAddress().getAddress().getHostAddress() + + ":" + instance.broadcastAddress().getPort())) + .map(e -> unwrapRanges(e.getKey())) + .flatMap(Collection::stream) + .collect(Collectors.toList()); Review Comment: not aligned ########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java: ########## @@ -339,140 +336,190 @@ public String getLowestCassandraVersion() return cassandraVersion; } - public String getVersionFromFeature() + @Override + public Map<RingInstance, InstanceAvailability> getInstanceAvailability() { - return null; + TokenRangeMapping<RingInstance> mapping = getTokenRangeMapping(true); + Map<RingInstance, InstanceAvailability> result = + mapping.getReplicaMetadata() + .stream() + .map(RingInstance::new) + .collect(Collectors.toMap(Function.identity(), this::determineInstanceAvailability)); + + if (LOGGER.isDebugEnabled()) + { + result.forEach((inst, avail) -> LOGGER.debug("Instance {} has availability {}", inst, avail)); + } + return result; } - protected List<NodeSettings> getAllNodeSettings() + private InstanceAvailability determineInstanceAvailability(RingInstance instance) { - List<NodeSettings> allNodeSettings = FutureUtils.bestEffortGet(allNodeSettingFutures, - conf.getSidecarRequestMaxRetryDelayInSeconds(), - TimeUnit.SECONDS); - - if (allNodeSettings.isEmpty()) + if (!instanceIsUp(instance.getRingInstance())) { - throw new RuntimeException(String.format("Unable to determine the node settings. 0/%d instances available.", - allNodeSettingFutures.size())); + return InstanceAvailability.UNAVAILABLE_DOWN; } - else if (allNodeSettings.size() < allNodeSettingFutures.size()) + if (instanceIsBlocked(instance)) { - LOGGER.warn("{}/{} instances were used to determine the node settings", - allNodeSettings.size(), allNodeSettingFutures.size()); + return InstanceAvailability.UNAVAILABLE_BLOCKED; } - - return allNodeSettings; - } - - public String getVersionFromSidecar() - { - NodeSettings nodeSettings = this.nodeSettings.get(); - if (nodeSettings != null) + if (instanceIsNormal(instance.getRingInstance()) || + instanceIsTransitioning(instance.getRingInstance()) || + instanceIsBeingReplaced(instance.getRingInstance())) { - return nodeSettings.releaseVersion(); + return InstanceAvailability.AVAILABLE; } - return getLowestVersion(getAllNodeSettings()); + LOGGER.info("No valid state found for instance {}", instance); + // If it's not one of the above, it's inherently INVALID. + return InstanceAvailability.INVALID_STATE; } - protected RingResponse getRingResponse() + private TokenRangeMapping<RingInstance> getTokenRangeReplicas() { - RingResponse currentRingResponse = ringResponse; - if (currentRingResponse != null) + Map<String, Set<String>> writeReplicasByDC; + Map<String, Set<String>> pendingReplicasByDC; + List<ReplicaMetadata> replicaMetadata; + Set<RingInstance> blockedInstances; + Set<RingInstance> replacementInstances; + Multimap<RingInstance, Range<BigInteger>> tokenRangesByInstance; + try { - return currentRingResponse; - } + TokenRangeReplicasResponse response = getTokenRangesAndReplicaSets(); + replicaMetadata = response.replicaMetadata(); - synchronized (this) - { - if (ringResponse == null) + tokenRangesByInstance = getTokenRangesByInstance(response.writeReplicas(), response.replicaMetadata()); + LOGGER.info("Retrieved token ranges for {} instances from write replica set ", + tokenRangesByInstance.size()); + + replacementInstances = response.replicaMetadata() + .stream() + .filter(m -> m.state().equalsIgnoreCase(InstanceState.REPLACING.toString())) + .map(RingInstance::new) + .collect(Collectors.toSet()); + + blockedInstances = response.replicaMetadata().stream() + .map(RingInstance::new) + .filter(this::instanceIsBlocked) + .collect(Collectors.toSet()); + + Set<String> blockedIps = blockedInstances.stream().map(i -> i.getRingInstance().address()) + .collect(Collectors.toSet()); + + // Each token range has hosts by DC. We collate them across all ranges into all hosts by DC + writeReplicasByDC = response.writeReplicas() + .stream() + .flatMap(wr -> wr.replicasByDatacenter().entrySet().stream()) + .collect(Collectors.toMap(Map.Entry::getKey, e -> new HashSet<>(e.getValue()), + (l1, l2) -> filterAndMergeInstances(l1, l2, blockedIps))); + + pendingReplicasByDC = getPendingReplicas(response, writeReplicasByDC); + + if (LOGGER.isDebugEnabled()) { - try - { - ringResponse = getCurrentRingResponse(); - } - catch (Exception exception) - { - LOGGER.error("Failed to load Cassandra ring", exception); - throw new RuntimeException(exception); - } + LOGGER.debug("Fetched token-ranges with dcs={}, write_replica_count={}, pending_replica_count={}", + writeReplicasByDC.keySet(), + writeReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size(), + pendingReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size()); } - return ringResponse; } - } + catch (ExecutionException | InterruptedException exception) + { + LOGGER.error("Failed to get token ranges, ", exception); + throw new RuntimeException(exception); + } - private RingResponse getCurrentRingResponse() throws Exception - { - return getCassandraContext().getSidecarClient().ring(conf.keyspace).get(); + // Include availability info so CL checks can use it to exclude replacement hosts + return new TokenRangeMapping<>(getPartitioner(), + getReplicationFactor(), + writeReplicasByDC, + pendingReplicasByDC, + tokenRangesByInstance, + replicaMetadata, + blockedInstances, + replacementInstances); } - private static List<RingInstance> getSerializableInstances(RingResponse ringResponse) + private Set<String> filterAndMergeInstances(Set<String> instancesList1, Set<String> instancesList2, Set<String> blockedIPs) { - return ringResponse.stream() - .map(RingInstance::new) - .collect(Collectors.toList()); + Set<String> merged = new HashSet<>(); + // Removes blocked instances. If this is included, remove blockedInstances from CL checks + merged.addAll(instancesList1.stream().filter(i -> !blockedIPs.contains(i)).collect(Collectors.toSet())); + merged.addAll(instancesList2.stream().filter(i -> !blockedIPs.contains(i)).collect(Collectors.toSet())); + + return merged; } - private static RingInstance getCasInstanceMethodsImpl(RingEntry ringEntry) + // Pending replicas are currently calculated by extracting the non-read-replicas from the write-replica-set + // This will be replaced by the instance state metadata when it is supported by the token-ranges API + private Map<String, Set<String>> getPendingReplicas(TokenRangeReplicasResponse response, Map<String, Set<String>> writeReplicasByDC) { - return new RingInstance(ringEntry); + Set<String> readReplicas = readReplicasFromTokenRangeResponse(response); + return writeReplicasByDC.entrySet() + .stream() + .filter(entry -> entry.getValue().stream().noneMatch(readReplicas::contains)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } - protected GossipInfoResponse getGossipInfo(boolean forceRefresh) + private Multimap<RingInstance, Range<BigInteger>> getTokenRangesByInstance(List<ReplicaInfo> writeReplicas, + List<ReplicaMetadata> replicaMetadata) { - GossipInfoResponse currentGossipInfo = gossipInfo; - if (!forceRefresh && currentGossipInfo != null) - { - return currentGossipInfo; - } - - synchronized (this) + Multimap<RingInstance, Range<BigInteger>> instanceToRangeMap = ArrayListMultimap.create(); + for (ReplicaInfo rInfo : writeReplicas) { - if (forceRefresh || gossipInfo == null) + Range<BigInteger> range = Range.openClosed(new BigInteger(rInfo.start()), new BigInteger(rInfo.end())); + for (Map.Entry<String, List<String>> dcReplicaEntry : rInfo.replicasByDatacenter().entrySet()) { - try - { - gossipInfo = cassandraContext.getSidecarClient().gossipInfo().get(conf.getHttpResponseTimeoutMs(), - TimeUnit.MILLISECONDS); - } - catch (ExecutionException | InterruptedException exception) - { - LOGGER.error("Failed to retrieve gossip information"); - throw new RuntimeException("Failed to retrieve gossip information", exception); - } - catch (TimeoutException exception) - { - Thread.currentThread().interrupt(); - throw new RuntimeException("Failed to retrieve gossip information", exception); - } + // For each writeReplica, get metadata and update map to include range + dcReplicaEntry.getValue().forEach(ipAddress -> { + // Get metadata for this IP; Create RingInstance + // TODO: Temporary change to extract IP from 'ip:port' string. THis will go oway once + // corresponding change in sidecar is merged. Review Comment: Is the todo resolved now? ########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java: ########## @@ -339,140 +336,190 @@ public String getLowestCassandraVersion() return cassandraVersion; } - public String getVersionFromFeature() + @Override + public Map<RingInstance, InstanceAvailability> getInstanceAvailability() { - return null; + TokenRangeMapping<RingInstance> mapping = getTokenRangeMapping(true); + Map<RingInstance, InstanceAvailability> result = + mapping.getReplicaMetadata() + .stream() + .map(RingInstance::new) + .collect(Collectors.toMap(Function.identity(), this::determineInstanceAvailability)); + + if (LOGGER.isDebugEnabled()) + { + result.forEach((inst, avail) -> LOGGER.debug("Instance {} has availability {}", inst, avail)); + } + return result; } - protected List<NodeSettings> getAllNodeSettings() + private InstanceAvailability determineInstanceAvailability(RingInstance instance) { - List<NodeSettings> allNodeSettings = FutureUtils.bestEffortGet(allNodeSettingFutures, - conf.getSidecarRequestMaxRetryDelayInSeconds(), - TimeUnit.SECONDS); - - if (allNodeSettings.isEmpty()) + if (!instanceIsUp(instance.getRingInstance())) { - throw new RuntimeException(String.format("Unable to determine the node settings. 0/%d instances available.", - allNodeSettingFutures.size())); + return InstanceAvailability.UNAVAILABLE_DOWN; } - else if (allNodeSettings.size() < allNodeSettingFutures.size()) + if (instanceIsBlocked(instance)) { - LOGGER.warn("{}/{} instances were used to determine the node settings", - allNodeSettings.size(), allNodeSettingFutures.size()); + return InstanceAvailability.UNAVAILABLE_BLOCKED; } - - return allNodeSettings; - } - - public String getVersionFromSidecar() - { - NodeSettings nodeSettings = this.nodeSettings.get(); - if (nodeSettings != null) + if (instanceIsNormal(instance.getRingInstance()) || + instanceIsTransitioning(instance.getRingInstance()) || + instanceIsBeingReplaced(instance.getRingInstance())) { - return nodeSettings.releaseVersion(); + return InstanceAvailability.AVAILABLE; } - return getLowestVersion(getAllNodeSettings()); + LOGGER.info("No valid state found for instance {}", instance); + // If it's not one of the above, it's inherently INVALID. + return InstanceAvailability.INVALID_STATE; } - protected RingResponse getRingResponse() + private TokenRangeMapping<RingInstance> getTokenRangeReplicas() { - RingResponse currentRingResponse = ringResponse; - if (currentRingResponse != null) + Map<String, Set<String>> writeReplicasByDC; + Map<String, Set<String>> pendingReplicasByDC; + List<ReplicaMetadata> replicaMetadata; + Set<RingInstance> blockedInstances; + Set<RingInstance> replacementInstances; + Multimap<RingInstance, Range<BigInteger>> tokenRangesByInstance; + try { - return currentRingResponse; - } + TokenRangeReplicasResponse response = getTokenRangesAndReplicaSets(); + replicaMetadata = response.replicaMetadata(); - synchronized (this) - { - if (ringResponse == null) + tokenRangesByInstance = getTokenRangesByInstance(response.writeReplicas(), response.replicaMetadata()); + LOGGER.info("Retrieved token ranges for {} instances from write replica set ", + tokenRangesByInstance.size()); + + replacementInstances = response.replicaMetadata() + .stream() + .filter(m -> m.state().equalsIgnoreCase(InstanceState.REPLACING.toString())) + .map(RingInstance::new) + .collect(Collectors.toSet()); + + blockedInstances = response.replicaMetadata().stream() + .map(RingInstance::new) + .filter(this::instanceIsBlocked) + .collect(Collectors.toSet()); + + Set<String> blockedIps = blockedInstances.stream().map(i -> i.getRingInstance().address()) + .collect(Collectors.toSet()); + + // Each token range has hosts by DC. We collate them across all ranges into all hosts by DC + writeReplicasByDC = response.writeReplicas() + .stream() + .flatMap(wr -> wr.replicasByDatacenter().entrySet().stream()) + .collect(Collectors.toMap(Map.Entry::getKey, e -> new HashSet<>(e.getValue()), + (l1, l2) -> filterAndMergeInstances(l1, l2, blockedIps))); + + pendingReplicasByDC = getPendingReplicas(response, writeReplicasByDC); + + if (LOGGER.isDebugEnabled()) { - try - { - ringResponse = getCurrentRingResponse(); - } - catch (Exception exception) - { - LOGGER.error("Failed to load Cassandra ring", exception); - throw new RuntimeException(exception); - } + LOGGER.debug("Fetched token-ranges with dcs={}, write_replica_count={}, pending_replica_count={}", + writeReplicasByDC.keySet(), + writeReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size(), + pendingReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size()); } - return ringResponse; } - } + catch (ExecutionException | InterruptedException exception) + { + LOGGER.error("Failed to get token ranges, ", exception); + throw new RuntimeException(exception); + } - private RingResponse getCurrentRingResponse() throws Exception - { - return getCassandraContext().getSidecarClient().ring(conf.keyspace).get(); + // Include availability info so CL checks can use it to exclude replacement hosts + return new TokenRangeMapping<>(getPartitioner(), + getReplicationFactor(), + writeReplicasByDC, + pendingReplicasByDC, + tokenRangesByInstance, + replicaMetadata, + blockedInstances, + replacementInstances); } - private static List<RingInstance> getSerializableInstances(RingResponse ringResponse) + private Set<String> filterAndMergeInstances(Set<String> instancesList1, Set<String> instancesList2, Set<String> blockedIPs) { - return ringResponse.stream() - .map(RingInstance::new) - .collect(Collectors.toList()); + Set<String> merged = new HashSet<>(); + // Removes blocked instances. If this is included, remove blockedInstances from CL checks + merged.addAll(instancesList1.stream().filter(i -> !blockedIPs.contains(i)).collect(Collectors.toSet())); + merged.addAll(instancesList2.stream().filter(i -> !blockedIPs.contains(i)).collect(Collectors.toSet())); + + return merged; } - private static RingInstance getCasInstanceMethodsImpl(RingEntry ringEntry) + // Pending replicas are currently calculated by extracting the non-read-replicas from the write-replica-set + // This will be replaced by the instance state metadata when it is supported by the token-ranges API + private Map<String, Set<String>> getPendingReplicas(TokenRangeReplicasResponse response, Map<String, Set<String>> writeReplicasByDC) { - return new RingInstance(ringEntry); + Set<String> readReplicas = readReplicasFromTokenRangeResponse(response); + return writeReplicasByDC.entrySet() + .stream() + .filter(entry -> entry.getValue().stream().noneMatch(readReplicas::contains)) Review Comment: If my understanding is correct, the method is to find the pending nodes. If so, the filter to only include that list of instances (instances in a DC) that none is contained in `readReplicas` always returns `false`. The predicate feels wrong to me. ########## cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/expansion/JoiningBaseTest.java: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.analytics.expansion; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; + +import com.google.common.util.concurrent.Uninterruptibles; + +import com.datastax.driver.core.ConsistencyLevel; +import o.a.c.analytics.sidecar.shaded.testing.common.data.QualifiedTableName; +import org.apache.cassandra.analytics.ResiliencyTestBase; +import org.apache.cassandra.analytics.TestTokenSupplier; +import org.apache.cassandra.distributed.UpgradeableCluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IUpgradeableInstance; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.distributed.shared.ClusterUtils; +import org.apache.cassandra.testing.CassandraIntegrationTest; +import org.apache.cassandra.testing.ConfigurableCassandraTestContext; + +import static junit.framework.TestCase.assertNotNull; +import static org.assertj.core.api.Assertions.assertThat; + +public class JoiningBaseTest extends ResiliencyTestBase Review Comment: should it be abstract class? ########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java: ########## @@ -175,30 +324,31 @@ public void writeRow(Map<String, Object> valueMap, } } - void checkBatchSize(StreamSession streamSession, int partitionId, JobInfo job) throws IOException + /** + * Stream to replicas; if batchSize is reached, "finalize" SST to "schedule" streamSession + */ + private void checkBatchSize(final StreamSession streamSession, final int partitionId, final JobInfo job) throws IOException Review Comment: drop unnecessary `final` in the method parameters. ########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java: ########## @@ -136,35 +181,139 @@ public StreamResult write(Iterator<Tuple2<DecoratedKey, Object[]>> sourceIterato } } + private Map<Range<BigInteger>, List<RingInstance>> taskTokenRangeMapping(TokenRangeMapping<RingInstance> tokenRange, + Range<BigInteger> taskTokenRange) + { + return tokenRange.getSubRanges(taskTokenRange).asMapOfRanges(); + } + + private Set<RingInstance> instancesFromMapping(Map<Range<BigInteger>, List<RingInstance>> mapping) + { + return mapping.values() + .stream() + .flatMap(Collection::stream) + .collect(Collectors.toSet()); + } + + /** + * Creates a new session if we have the current token range intersecting the ranges from write replica-set. + * If we do find the need to split a range into sub-ranges, we create the corresponding session for the sub-range + * if the token from the row data belongs to the range. + */ + private StreamSession maybeCreateStreamSession(TaskContext taskContext, + StreamSession streamSession, + Tuple2<DecoratedKey, Object[]> rowData, + Set<Range<BigInteger>> subRanges, + ReplicaAwareFailureHandler<RingInstance> failureHandler, + List<StreamResult> results) + throws IOException, ExecutionException, InterruptedException + { + BigInteger token = rowData._1().getToken(); + Range<BigInteger> tokenRange = getTokenRange(taskContext); + + Preconditions.checkState(tokenRange.contains(token), + String.format("Received Token %s outside of expected range %s", token, tokenRange)); + + // We have split ranges likely resulting from pending nodes + // Evaluate creating a new session if the token from current row is part of a sub-range + if (subRanges.size() > 1) + { + // Create session using sub-range that contains the token from current row + Range<BigInteger> matchingSubRange = subRanges.stream().filter(r -> r.contains(token)).findFirst().get(); + Preconditions.checkState(matchingSubRange != null, + String.format("Received Token %s outside of expected range %s", token, matchingSubRange)); + streamSession = maybeCreateSubRangeSession(taskContext, streamSession, failureHandler, results, matchingSubRange); + } + + // If we do not have any stream session at this point, we create a session using the partition's token range + return (streamSession == null) ? createStreamSession(taskContext) : streamSession; + } + + /** + * Given that the token belongs to a sub-range, creates a new stream session if either + * 1) we do not have an existing stream session, or 2) the existing stream session corresponds to a range that + * does NOT match the sub-range the token belongs to. + */ + private StreamSession maybeCreateSubRangeSession(TaskContext taskContext, + StreamSession streamSession, + ReplicaAwareFailureHandler<RingInstance> failureHandler, + List<StreamResult> results, + Range<BigInteger> matchingSubRange) + throws IOException, ExecutionException, InterruptedException + { + if (streamSession == null || streamSession.getTokenRange() != matchingSubRange) + { + LOGGER.debug("[{}] Creating stream session for range: {}", taskContext.partitionId(), matchingSubRange); + // Schedule data to be sent if we are processing a batch that has not been scheduled yet. + if (streamSession != null) + { + // Complete existing batched writes (if any) before the existing stream session is closed + if (batchSize != 0) + { + finalizeSSTable(streamSession, taskContext.partitionId(), sstableWriter, batchNumber, batchSize); + sstableWriter = null; + batchSize = 0; + } + results.add(streamSession.close()); + } + streamSession = new StreamSession(writerContext, getStreamId(taskContext), matchingSubRange, failureHandler); + } + return streamSession; + } + + /** + * Get ranges from the set that intersect and/or overlap with the provided token range + */ + private Set<Range<BigInteger>> getIntersectingSubRanges(Set<Range<BigInteger>> ranges, Range<BigInteger> tokenRange) + { + return ranges.stream() + .filter(r -> r.isConnected(tokenRange) && !r.intersection(tokenRange).isEmpty()) + .collect(Collectors.toSet()); + } + + private boolean haveTokenRangeMappingsChanged(TokenRangeMapping<RingInstance> startTaskMapping, TaskContext taskContext) + { + Range<BigInteger> taskTokenRange = getTokenRange(taskContext); + // Get the uncached, current view of the ring to compare with initial ring + TokenRangeMapping<RingInstance> endTaskMapping = writerContext.cluster().getTokenRangeMapping(false); + Map<Range<BigInteger>, List<RingInstance>> startMapping = taskTokenRangeMapping(startTaskMapping, taskTokenRange); + Map<Range<BigInteger>, List<RingInstance>> endMapping = taskTokenRangeMapping(endTaskMapping, taskTokenRange); + + // Token ranges are identical and overall instance list is same + return !(startMapping.keySet().equals(endMapping.keySet()) && + instancesFromMapping(startMapping).equals(instancesFromMapping(endMapping))); + } + private void validateAcceptableTimeSkewOrThrow(List<RingInstance> replicas) { - TimeSkewResponse timeSkewResponse = writerContext.cluster().getTimeSkew(replicas); - Instant localNow = Instant.now(); - Instant remoteNow = Instant.ofEpochMilli(timeSkewResponse.currentTime); - Duration range = Duration.ofMinutes(timeSkewResponse.allowableSkewInMinutes); - if (localNow.isBefore(remoteNow.minus(range)) || localNow.isAfter(remoteNow.plus(range))) + if (!replicas.isEmpty()) { - String message = String.format("Time skew between Spark and Cassandra is too large. " - + "Allowable skew is %d minutes. " - + "Spark executor time is %s, Cassandra instance time is %s", - timeSkewResponse.allowableSkewInMinutes, localNow, remoteNow); - throw new UnsupportedOperationException(message); + TimeSkewResponse timeSkewResponse = writerContext.cluster().getTimeSkew(replicas); + Instant localNow = Instant.now(); + Instant remoteNow = Instant.ofEpochMilli(timeSkewResponse.currentTime); + Duration range = Duration.ofMinutes(timeSkewResponse.allowableSkewInMinutes); + if (localNow.isBefore(remoteNow.minus(range)) || localNow.isAfter(remoteNow.plus(range))) + { + final String message = String.format("Time skew between Spark and Cassandra is too large. " + + "Allowable skew is %d minutes. Spark executor time is %s, Cassandra instance time is %s", + timeSkewResponse.allowableSkewInMinutes, localNow, remoteNow); + throw new UnsupportedOperationException(message); + } Review Comment: looks like it only adds the check for whether `replicas` is empty. The following has less indentation. ```java private void validateAcceptableTimeSkewOrThrow(List<RingInstance> replicas) { if (replicas.isEmpty()) { return; } // original logic } ``` ########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java: ########## @@ -110,20 +133,42 @@ public StreamResult write(Iterator<Tuple2<DecoratedKey, Object[]>> sourceIterato Map<String, Object> valueMap = new HashMap<>(); try { + Set<Range<BigInteger>> newRanges = initialTokenRangeMapping.getRangeMap().asMapOfRanges().entrySet() + .stream() + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); Review Comment: nit: this is simpler to get a copy of the key set ```suggestion Set<Range<BigInteger>> newRanges = new HashSet<>(initialTokenRangeMapping.getRangeMap().asMapOfRanges().keySet()); ``` ########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java: ########## @@ -339,140 +336,190 @@ public String getLowestCassandraVersion() return cassandraVersion; } - public String getVersionFromFeature() + @Override + public Map<RingInstance, InstanceAvailability> getInstanceAvailability() { - return null; + TokenRangeMapping<RingInstance> mapping = getTokenRangeMapping(true); + Map<RingInstance, InstanceAvailability> result = + mapping.getReplicaMetadata() + .stream() + .map(RingInstance::new) + .collect(Collectors.toMap(Function.identity(), this::determineInstanceAvailability)); + + if (LOGGER.isDebugEnabled()) + { + result.forEach((inst, avail) -> LOGGER.debug("Instance {} has availability {}", inst, avail)); + } + return result; } - protected List<NodeSettings> getAllNodeSettings() + private InstanceAvailability determineInstanceAvailability(RingInstance instance) { - List<NodeSettings> allNodeSettings = FutureUtils.bestEffortGet(allNodeSettingFutures, - conf.getSidecarRequestMaxRetryDelayInSeconds(), - TimeUnit.SECONDS); - - if (allNodeSettings.isEmpty()) + if (!instanceIsUp(instance.getRingInstance())) { - throw new RuntimeException(String.format("Unable to determine the node settings. 0/%d instances available.", - allNodeSettingFutures.size())); + return InstanceAvailability.UNAVAILABLE_DOWN; } - else if (allNodeSettings.size() < allNodeSettingFutures.size()) + if (instanceIsBlocked(instance)) { - LOGGER.warn("{}/{} instances were used to determine the node settings", - allNodeSettings.size(), allNodeSettingFutures.size()); + return InstanceAvailability.UNAVAILABLE_BLOCKED; } - - return allNodeSettings; - } - - public String getVersionFromSidecar() - { - NodeSettings nodeSettings = this.nodeSettings.get(); - if (nodeSettings != null) + if (instanceIsNormal(instance.getRingInstance()) || + instanceIsTransitioning(instance.getRingInstance()) || + instanceIsBeingReplaced(instance.getRingInstance())) { - return nodeSettings.releaseVersion(); + return InstanceAvailability.AVAILABLE; } - return getLowestVersion(getAllNodeSettings()); + LOGGER.info("No valid state found for instance {}", instance); + // If it's not one of the above, it's inherently INVALID. + return InstanceAvailability.INVALID_STATE; } - protected RingResponse getRingResponse() + private TokenRangeMapping<RingInstance> getTokenRangeReplicas() { - RingResponse currentRingResponse = ringResponse; - if (currentRingResponse != null) + Map<String, Set<String>> writeReplicasByDC; + Map<String, Set<String>> pendingReplicasByDC; + List<ReplicaMetadata> replicaMetadata; + Set<RingInstance> blockedInstances; + Set<RingInstance> replacementInstances; + Multimap<RingInstance, Range<BigInteger>> tokenRangesByInstance; + try { - return currentRingResponse; - } + TokenRangeReplicasResponse response = getTokenRangesAndReplicaSets(); + replicaMetadata = response.replicaMetadata(); - synchronized (this) - { - if (ringResponse == null) + tokenRangesByInstance = getTokenRangesByInstance(response.writeReplicas(), response.replicaMetadata()); + LOGGER.info("Retrieved token ranges for {} instances from write replica set ", + tokenRangesByInstance.size()); + + replacementInstances = response.replicaMetadata() + .stream() + .filter(m -> m.state().equalsIgnoreCase(InstanceState.REPLACING.toString())) + .map(RingInstance::new) + .collect(Collectors.toSet()); + + blockedInstances = response.replicaMetadata().stream() + .map(RingInstance::new) + .filter(this::instanceIsBlocked) + .collect(Collectors.toSet()); + + Set<String> blockedIps = blockedInstances.stream().map(i -> i.getRingInstance().address()) + .collect(Collectors.toSet()); + + // Each token range has hosts by DC. We collate them across all ranges into all hosts by DC + writeReplicasByDC = response.writeReplicas() + .stream() + .flatMap(wr -> wr.replicasByDatacenter().entrySet().stream()) + .collect(Collectors.toMap(Map.Entry::getKey, e -> new HashSet<>(e.getValue()), + (l1, l2) -> filterAndMergeInstances(l1, l2, blockedIps))); + + pendingReplicasByDC = getPendingReplicas(response, writeReplicasByDC); + + if (LOGGER.isDebugEnabled()) { - try - { - ringResponse = getCurrentRingResponse(); - } - catch (Exception exception) - { - LOGGER.error("Failed to load Cassandra ring", exception); - throw new RuntimeException(exception); - } + LOGGER.debug("Fetched token-ranges with dcs={}, write_replica_count={}, pending_replica_count={}", + writeReplicasByDC.keySet(), + writeReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size(), + pendingReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size()); } - return ringResponse; } - } + catch (ExecutionException | InterruptedException exception) + { + LOGGER.error("Failed to get token ranges, ", exception); + throw new RuntimeException(exception); + } - private RingResponse getCurrentRingResponse() throws Exception - { - return getCassandraContext().getSidecarClient().ring(conf.keyspace).get(); + // Include availability info so CL checks can use it to exclude replacement hosts + return new TokenRangeMapping<>(getPartitioner(), + getReplicationFactor(), + writeReplicasByDC, + pendingReplicasByDC, + tokenRangesByInstance, + replicaMetadata, + blockedInstances, + replacementInstances); } - private static List<RingInstance> getSerializableInstances(RingResponse ringResponse) + private Set<String> filterAndMergeInstances(Set<String> instancesList1, Set<String> instancesList2, Set<String> blockedIPs) { - return ringResponse.stream() - .map(RingInstance::new) - .collect(Collectors.toList()); + Set<String> merged = new HashSet<>(); + // Removes blocked instances. If this is included, remove blockedInstances from CL checks + merged.addAll(instancesList1.stream().filter(i -> !blockedIPs.contains(i)).collect(Collectors.toSet())); + merged.addAll(instancesList2.stream().filter(i -> !blockedIPs.contains(i)).collect(Collectors.toSet())); + + return merged; } - private static RingInstance getCasInstanceMethodsImpl(RingEntry ringEntry) + // Pending replicas are currently calculated by extracting the non-read-replicas from the write-replica-set + // This will be replaced by the instance state metadata when it is supported by the token-ranges API + private Map<String, Set<String>> getPendingReplicas(TokenRangeReplicasResponse response, Map<String, Set<String>> writeReplicasByDC) { - return new RingInstance(ringEntry); + Set<String> readReplicas = readReplicasFromTokenRangeResponse(response); + return writeReplicasByDC.entrySet() + .stream() + .filter(entry -> entry.getValue().stream().noneMatch(readReplicas::contains)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } - protected GossipInfoResponse getGossipInfo(boolean forceRefresh) + private Multimap<RingInstance, Range<BigInteger>> getTokenRangesByInstance(List<ReplicaInfo> writeReplicas, + List<ReplicaMetadata> replicaMetadata) { - GossipInfoResponse currentGossipInfo = gossipInfo; - if (!forceRefresh && currentGossipInfo != null) - { - return currentGossipInfo; - } - - synchronized (this) + Multimap<RingInstance, Range<BigInteger>> instanceToRangeMap = ArrayListMultimap.create(); + for (ReplicaInfo rInfo : writeReplicas) { - if (forceRefresh || gossipInfo == null) + Range<BigInteger> range = Range.openClosed(new BigInteger(rInfo.start()), new BigInteger(rInfo.end())); + for (Map.Entry<String, List<String>> dcReplicaEntry : rInfo.replicasByDatacenter().entrySet()) { - try - { - gossipInfo = cassandraContext.getSidecarClient().gossipInfo().get(conf.getHttpResponseTimeoutMs(), - TimeUnit.MILLISECONDS); - } - catch (ExecutionException | InterruptedException exception) - { - LOGGER.error("Failed to retrieve gossip information"); - throw new RuntimeException("Failed to retrieve gossip information", exception); - } - catch (TimeoutException exception) - { - Thread.currentThread().interrupt(); - throw new RuntimeException("Failed to retrieve gossip information", exception); - } + // For each writeReplica, get metadata and update map to include range + dcReplicaEntry.getValue().forEach(ipAddress -> { + // Get metadata for this IP; Create RingInstance + // TODO: Temporary change to extract IP from 'ip:port' string. THis will go oway once + // corresponding change in sidecar is merged. + ReplicaMetadata replica = replicaMetadata.stream() + .filter(r -> + r.address().equals(ipAddress.split(":")[0])) + .findFirst().get(); Review Comment: can you handle the potential NoSuchElementException from `get()` gracefully? ########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamSession.java: ########## @@ -145,32 +157,27 @@ private List<CommitResult> commit(StreamResult streamResult) throws ExecutionExc @VisibleForTesting List<RingInstance> getReplicas() { - Map<Range<BigInteger>, List<RingInstance>> overlappingRanges = ring.getSubRanges(tokenRange).asMapOfRanges(); + List<RingInstance> exclusions = failureHandler.getFailedInstances(); + final Map<Range<BigInteger>, List<RingInstance>> overlappingRanges = tokenRangeMapping.getSubRanges(tokenRange).asMapOfRanges(); - Preconditions.checkState(overlappingRanges.keySet().size() == 1, - String.format("Partition range %s is mapping more than one range %s", - tokenRange, overlappingRanges)); + LOGGER.debug("[{}]: Stream session token range: {} overlaps with ring ranges: {}", sessionID, tokenRange, overlappingRanges); + Preconditions.checkState(!tokenRange.isEmpty(), Review Comment: maybe move this check before getting the `overlappingRanges`, so that `tokenRange` is validated before using. Or.. do you mean to check `overlappingRanges` that is not empty? ########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TokenPartitioner.java: ########## @@ -215,19 +221,19 @@ private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOE // In order to best utilize the number of Spark cores while minimizing the number of commit calls, // we calculate the number of splits that will just match or exceed the total number of available Spark cores. - // NOTE: The actual number of partitions that result from this should always be at least - // the number of token ranges times the number of splits, but can be slightly more. - public int calculateSplits(CassandraRing<RingInstance> ring, + // Note that the actual number of partitions that result from this should always be at least the number of token ranges * the number of splits, + // but can be slightly more. + public int calculateSplits(TokenRangeMapping<RingInstance> tokenRangeMapping, Integer numberSplits, int defaultParallelism, Integer cores) { - if (numberSplits >= 0) + if (numberSplits != -1) { return numberSplits; } - int tasksToRun = Math.max(cores, defaultParallelism); - Map<Range<BigInteger>, List<RingInstance>> rangeListMap = ring.getRangeMap().asMapOfRanges(); + final int tasksToRun = Math.max(cores, defaultParallelism); + final Map<Range<BigInteger>, List<RingInstance>> rangeListMap = tokenRangeMapping.getRangeMap().asMapOfRanges(); Review Comment: drop the `final` ########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TokenPartitioner.java: ########## @@ -138,15 +143,15 @@ private void validate() private void validateRangesDoNotOverlap() { List<Range<BigInteger>> sortedRanges = partitionMap.asMapOfRanges().keySet().stream() - .sorted(Comparator.comparing(Range::lowerEndpoint)) - .collect(Collectors.toList()); + .sorted(Comparator.comparing(Range::lowerEndpoint)) + .collect(Collectors.toList()); Range<BigInteger> previous = null; for (Range<BigInteger> current : sortedRanges) { if (previous != null) { Preconditions.checkState(!current.isConnected(previous) || current.intersection(previous).isEmpty(), - String.format("Two ranges in partition map are overlapping %s %s", previous, current)); + String.format("Two ranges in partition map are overlapping %s %s", previous, current)); Review Comment: since this line is changed.. You can use the other checkState api to lazily evaluate the error message only when the check fails. ```suggestion "Two ranges in partition map are overlapping %s %s", previous, current); ``` ########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ConsistencyLevel.java: ########## @@ -59,42 +87,34 @@ public boolean isLocal() } @Override - public boolean checkConsistency(Collection<? extends CassandraInstance> failedInsts, - ReplicationFactor replicationFactor, + public boolean checkConsistency(Set<String> writeReplicas, + Set<String> pendingReplicas, + Set<String> replacementInstances, + Set<String> blockedInstances, + Set<String> failedInstanceIps, String localDC) { - Preconditions.checkArgument(replicationFactor.getReplicationStrategy() != ReplicationFactor.ReplicationStrategy.SimpleStrategy, - "EACH_QUORUM doesn't make sense for SimpleStrategy keyspaces"); - - for (String datacenter : replicationFactor.getOptions().keySet()) - { - int rf = replicationFactor.getOptions().get(datacenter); - if (failedInsts.stream() - .filter(instance -> instance.getDataCenter().matches(datacenter)) - .count() > (rf - (rf / 2 + 1))) - { - return false; - } - } - - return true; + return (failedInstanceIps.size() + blockedInstances.size()) <= (writeReplicas.size() - (writeReplicas.size() / 2 + 1)); Review Comment: The implementation looks wrong to me. For EACH_QUORUM, the success condition is to meet quorum in each DC. The replication factor could vary per DC. ########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TokenPartitioner.java: ########## @@ -215,19 +221,19 @@ private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOE // In order to best utilize the number of Spark cores while minimizing the number of commit calls, // we calculate the number of splits that will just match or exceed the total number of available Spark cores. - // NOTE: The actual number of partitions that result from this should always be at least - // the number of token ranges times the number of splits, but can be slightly more. - public int calculateSplits(CassandraRing<RingInstance> ring, + // Note that the actual number of partitions that result from this should always be at least the number of token ranges * the number of splits, + // but can be slightly more. + public int calculateSplits(TokenRangeMapping<RingInstance> tokenRangeMapping, Integer numberSplits, int defaultParallelism, Integer cores) { - if (numberSplits >= 0) + if (numberSplits != -1) Review Comment: This change is worrisome. There is no validation on the value of `numberSplits`. End user can provide a wrong value, say `-10`, the original code can handle it, but the changed code break. ########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java: ########## @@ -94,6 +95,12 @@ public Object[] normalize(Object[] row) } public Object[] getKeyColumns(Object[] allColumns) + { + return getKeyColumns(allColumns, keyFieldPositions); + } + + @NotNull + public static Object[] getKeyColumns(Object[] allColumns, List<Integer> keyFieldPositions) Review Comment: What is the benefit of allow specifying the key field positions? Looks like the method is only used for testing via the `Tokenizer` constructor. If so, can you add the `@VisibleForTesting` annotation? ########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ConsistencyLevel.java: ########## @@ -59,42 +87,34 @@ public boolean isLocal() } @Override - public boolean checkConsistency(Collection<? extends CassandraInstance> failedInsts, - ReplicationFactor replicationFactor, + public boolean checkConsistency(Set<String> writeReplicas, + Set<String> pendingReplicas, + Set<String> replacementInstances, + Set<String> blockedInstances, + Set<String> failedInstanceIps, String localDC) { - Preconditions.checkArgument(replicationFactor.getReplicationStrategy() != ReplicationFactor.ReplicationStrategy.SimpleStrategy, - "EACH_QUORUM doesn't make sense for SimpleStrategy keyspaces"); - - for (String datacenter : replicationFactor.getOptions().keySet()) - { - int rf = replicationFactor.getOptions().get(datacenter); - if (failedInsts.stream() - .filter(instance -> instance.getDataCenter().matches(datacenter)) - .count() > (rf - (rf / 2 + 1))) - { - return false; - } - } - - return true; + return (failedInstanceIps.size() + blockedInstances.size()) <= (writeReplicas.size() - (writeReplicas.size() / 2 + 1)); } }, QUORUM { + // Keyspaces exist with RF 1 or 2 Review Comment: This comment does not make sense to be included in QUORUM. ########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ReplicaAwareFailureHandler.java: ########## @@ -71,64 +72,138 @@ public void addFailure(Range<BigInteger> tokenRange, Instance casInstance, Strin for (Map.Entry<Range<BigInteger>, Multimap<Instance, String>> entry : overlappingFailures.asMapOfRanges().entrySet()) { Multimap<Instance, String> newErrorMap = ArrayListMultimap.create(entry.getValue()); - newErrorMap.put(casInstance, errMessage); mappingsToAdd.put(entry.getKey(), newErrorMap); } failedRangesMap.putAll(mappingsToAdd); } - public boolean hasFailed(ConsistencyLevel consistencyLevel, String localDC) + public List<Instance> getFailedInstances() { - return !getFailedEntries(consistencyLevel, localDC).isEmpty(); + return failedRangesMap.asMapOfRanges().values() + .stream() + .map(Multimap::keySet) + .flatMap(Collection::stream) + .collect(Collectors.toList()); } - @SuppressWarnings("unused") // Convenience method can become useful in the future - public Collection<Range<BigInteger>> getFailedRanges(ConsistencyLevel consistencyLevel, String localDC) + /** + * Given the number of failed instances for each token range, validates if the consistency guarantees are maintained + * for the size of the ring and the consistency level. + * + * @return list of failed entries for token ranges that break consistency. This should ideally be empty for a + * successful operation. + */ + public Collection<AbstractMap.SimpleEntry<Range<BigInteger>, Multimap<Instance, String>>> + getFailedEntries(TokenRangeMapping<? extends CassandraInstance> tokenRangeMapping, + ConsistencyLevel cl, + String localDC) { - return getFailedEntries(consistencyLevel, localDC).stream() - .map(AbstractMap.SimpleEntry::getKey) - .collect(Collectors.toList()); - } - @SuppressWarnings("unused") // Convenience method can become useful in the future - public Multimap<Instance, String> getFailedInstances(ConsistencyLevel consistencyLevel, String localDC) - { - Multimap<Instance, String> failedInstances = ArrayListMultimap.create(); - getFailedEntries(consistencyLevel, localDC).stream() - .flatMap(failedMultiEntry -> failedMultiEntry.getValue().entries().stream()) - .forEach(failedEntry -> failedInstances.put(failedEntry.getKey(), failedEntry.getValue())); + List<AbstractMap.SimpleEntry<Range<BigInteger>, Multimap<Instance, String>>> failedEntries = + new ArrayList<>(); + + for (Map.Entry<Range<BigInteger>, Multimap<Instance, String>> failedRangeEntry + : failedRangesMap.asMapOfRanges().entrySet()) Review Comment: it is unusual to break it into 2 lines.. ########## cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java: ########## @@ -209,31 +222,49 @@ private InstancesConfig buildInstancesConfig(CassandraVersionProvider versionPro int nativeTransportPort = tryGetIntConfig(config, "native_transport_port", 9042); InetSocketAddress address = InetSocketAddress.createUnresolved(hostName, nativeTransportPort); - CQLSessionProvider sessionProvider = new CQLSessionProvider(address, new NettyOptions()); + addresses.add(address); + } + for (int i = 0; i < cluster.size(); i++) + { + IUpgradeableInstance instance = cluster.get(i + 1); // 1-based indexing to match node names; + IInstanceConfig config = instance.config(); + String hostName = JMXUtil.getJmxHost(config); + int nativeTransportPort = tryGetIntConfig(config, "native_transport_port", 9042); + InetSocketAddress address = InetSocketAddress.createUnresolved(hostName, + nativeTransportPort); + TemporaryCqlSessionProvider sessionProvider = new TemporaryCqlSessionProvider(address, new NettyOptions(), addresses); this.sessionProviders.add(sessionProvider); - JmxClient jmxClient = new JmxClient(hostName, config.jmxPort()); + // The in-jvm dtest framework sometimes returns a cluster before all the jmx infrastructure is initialized. + // In these cases, we want to wait longer than the default retry/delay settings to connect. Review Comment: 👍 thanks for the comment ########## cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/CassandraIntegrationTest.java: ########## @@ -59,6 +59,13 @@ */ int numDcs() default 1; + /** + * This is only applied in context of multi-DC tests. Returns true if the keyspace is replicated + * across multiple DCs. Defaults to {@code true} + * @return whether the multi-DC test uses a cross-DC keyspace + */ + boolean useCrossDcKeyspace() default true; Review Comment: It is more flexible to define keyspaces in the test directly with some helper method, instead of declaring an option in the annotation. For example, you can define keyspace with varying number of DCs and RFs per test. That being said. I am fine with the current code. ########## cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenPartitionerTest.java: ########## @@ -58,27 +58,34 @@ public void testOneSplit() @Test public void testTwoSplits() { - CassandraRing<RingInstance> ring = RingUtils.buildRing(0, "DC1", "test"); - partitioner = new TokenPartitioner(ring, 2, 2, 1, false); - assertEquals(10, partitioner.numPartitions()); - assertEquals(0, getPartitionForToken(new BigInteger("-4611686018427387905"))); - assertEquals(1, getPartitionForToken(new BigInteger("-4611686018427387904"))); - assertEquals(1, getPartitionForToken(-1)); - assertEquals(2, getPartitionForToken(0)); // Single token range - assertEquals(3, getPartitionForToken(1)); - assertEquals(3, getPartitionForToken(50)); - assertEquals(4, getPartitionForToken(51000)); - assertEquals(4, getPartitionForToken(51100)); - assertEquals(5, getPartitionForToken(100001)); - assertEquals(5, getPartitionForToken(100150)); - assertEquals(5, getPartitionForToken(150000)); - assertEquals(6, getPartitionForToken(150001)); - assertEquals(6, getPartitionForToken(200000)); - assertEquals(7, getPartitionForToken(200001)); - assertEquals(7, getPartitionForToken(new BigInteger("4611686018427388003"))); - assertEquals(7, getPartitionForToken(new BigInteger("4611686018427487903"))); - assertEquals(8, getPartitionForToken(new BigInteger("4611686018427487904"))); - assertEquals(9, getPartitionForToken(new BigInteger("9223372036854775807"))); // Single token range + final TokenRangeMapping<RingInstance> tokenRangeMapping = TokenRangeMappingUtils.buildTokenRangeMapping(0, ImmutableMap.of("DC1", 3), 3); Review Comment: drop the final ########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/model/CassandraInstance.java: ########## @@ -26,4 +26,6 @@ public interface CassandraInstance String getNodeName(); String getDataCenter(); + + String getIpAddress(); Review Comment: add javadoc for the new method. It is fine to ignore the existing methods that are w/o javadoc. ########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ConsistencyLevel.java: ########## @@ -59,42 +87,34 @@ public boolean isLocal() } @Override - public boolean checkConsistency(Collection<? extends CassandraInstance> failedInsts, - ReplicationFactor replicationFactor, + public boolean checkConsistency(Set<String> writeReplicas, + Set<String> pendingReplicas, + Set<String> replacementInstances, + Set<String> blockedInstances, + Set<String> failedInstanceIps, String localDC) { - Preconditions.checkArgument(replicationFactor.getReplicationStrategy() != ReplicationFactor.ReplicationStrategy.SimpleStrategy, - "EACH_QUORUM doesn't make sense for SimpleStrategy keyspaces"); - - for (String datacenter : replicationFactor.getOptions().keySet()) - { - int rf = replicationFactor.getOptions().get(datacenter); - if (failedInsts.stream() - .filter(instance -> instance.getDataCenter().matches(datacenter)) - .count() > (rf - (rf / 2 + 1))) - { - return false; - } - } - - return true; + return (failedInstanceIps.size() + blockedInstances.size()) <= (writeReplicas.size() - (writeReplicas.size() / 2 + 1)); Review Comment: Looks like you moved the per DC check in `ReplicaAwareFailureHandler#validateConsistency` ########## cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/TemporaryCqlSessionProvider.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.testing; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.NettyOptions; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.DriverException; +import com.datastax.driver.core.exceptions.DriverInternalError; +import com.datastax.driver.core.policies.ExponentialReconnectionPolicy; +import com.datastax.driver.core.policies.ReconnectionPolicy; +import org.apache.cassandra.sidecar.common.CQLSessionProvider; +import org.jetbrains.annotations.Nullable; + +public class TemporaryCqlSessionProvider extends CQLSessionProvider +{ + private static final Logger logger = LoggerFactory.getLogger(TemporaryCqlSessionProvider.class); + private Session localSession; + private final InetSocketAddress inet; + private final NettyOptions nettyOptions; + private final ReconnectionPolicy reconnectionPolicy; + private final List<InetSocketAddress> addresses = new ArrayList<>(); + + public TemporaryCqlSessionProvider(InetSocketAddress target, NettyOptions options, List<InetSocketAddress> addresses) + { + super(target, options); + inet = target; + nettyOptions = options; + reconnectionPolicy = new ExponentialReconnectionPolicy(100, 1000); + this.addresses.addAll(addresses); + } + + @Override + public synchronized @Nullable Session localCql() Review Comment: nit ```suggestion @Override @Nullable public synchronized Session localCql() ``` ########## cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java: ########## @@ -209,31 +222,49 @@ private InstancesConfig buildInstancesConfig(CassandraVersionProvider versionPro int nativeTransportPort = tryGetIntConfig(config, "native_transport_port", 9042); InetSocketAddress address = InetSocketAddress.createUnresolved(hostName, nativeTransportPort); - CQLSessionProvider sessionProvider = new CQLSessionProvider(address, new NettyOptions()); + addresses.add(address); + } + for (int i = 0; i < cluster.size(); i++) + { + IUpgradeableInstance instance = cluster.get(i + 1); // 1-based indexing to match node names; + IInstanceConfig config = instance.config(); + String hostName = JMXUtil.getJmxHost(config); + int nativeTransportPort = tryGetIntConfig(config, "native_transport_port", 9042); + InetSocketAddress address = InetSocketAddress.createUnresolved(hostName, + nativeTransportPort); + TemporaryCqlSessionProvider sessionProvider = new TemporaryCqlSessionProvider(address, new NettyOptions(), addresses); this.sessionProviders.add(sessionProvider); - JmxClient jmxClient = new JmxClient(hostName, config.jmxPort()); + // The in-jvm dtest framework sometimes returns a cluster before all the jmx infrastructure is initialized. + // In these cases, we want to wait longer than the default retry/delay settings to connect. Review Comment: 👍 thanks for the comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org