Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
arjunashok commented on PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#issuecomment-1850889132 Attempt to reopen PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
arjunashok commented on PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#issuecomment-1850889037 Attempt to reopen PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
arjunashok commented on PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#issuecomment-1850885476 Attempt to reopen PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
arjunashok commented on PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#issuecomment-1850873871 Reopen attempt -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
arjunashok commented on PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#issuecomment-1850873520 Test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
arjunashok closed pull request #17: Cassandra 18852: Make bulk writer resilient to cluster resize events URL: https://github.com/apache/cassandra-analytics/pull/17 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
yifan-c commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1422438882 ## cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ResiliencyTestBase.java: ## @@ -0,0 +1,501 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.analytics; + +import java.io.ByteArrayOutputStream; +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.management.ManagementFactory; +import java.math.BigInteger; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Range; +import org.junit.jupiter.api.BeforeAll; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datastax.driver.core.ConsistencyLevel; +import o.a.c.analytics.sidecar.shaded.testing.adapters.base.StorageJmxOperations; +import o.a.c.analytics.sidecar.shaded.testing.common.JmxClient; +import o.a.c.analytics.sidecar.shaded.testing.common.data.QualifiedTableName; +import org.apache.cassandra.distributed.UpgradeableCluster; +import org.apache.cassandra.distributed.api.IInstanceConfig; +import org.apache.cassandra.distributed.api.IUpgradeableInstance; +import org.apache.cassandra.distributed.api.Row; +import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.sidecar.testing.IntegrationTestBase; +import org.apache.cassandra.spark.bulkwriter.DecoratedKey; +import org.apache.cassandra.spark.bulkwriter.Tokenizer; +import org.apache.cassandra.spark.common.schema.ColumnType; +import org.apache.cassandra.spark.common.schema.ColumnTypes; +import org.apache.cassandra.testing.CassandraIntegrationTest; +import org.apache.cassandra.testing.ConfigurableCassandraTestContext; +import scala.Tuple2; + +import static junit.framework.TestCase.assertTrue; +import static org.apache.cassandra.distributed.shared.NetworkTopology.dcAndRack; +import static org.apache.cassandra.distributed.shared.NetworkTopology.networkTopology; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Base class for resiliency tests. Contains helper methods for data generation and validation + */ +public abstract class ResiliencyTestBase extends IntegrationTestBase +{ +public static final int rowCount = 1000; +protected static final String retrieveRows = "select * from " + TEST_KEYSPACE + ".%s"; +private static final Logger LOGGER = LoggerFactory.getLogger(ResiliencyTestBase.class); +private static final String createTableStmt = "create table if not exists %s (id int, course text, marks int, primary key (id));"; + +private final ExecutorService executorService = Executors.newCachedThreadPool(); +private final AtomicReference errorOutput = new AtomicReference<>(); +private final AtomicReference outputBytes = new AtomicReference<>(); + + +public QualifiedTableName initializeSchema() +{ +return initializeSchema(ImmutableMap.of("datacenter1", 1)); +} + +public QualifiedTableName initializeSchema(Map rf) +{ +createTestKeyspace(rf); +return createTestTable(createTableStmt); +} + +public Set getDataForRange(Range range) +{ +// Iterate through all data entries; filter only entries that belong to range; convert to strings +return generateExpectedData().stream() + .filter(t -> range.contains(t._1().getToken())) +
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
yifan-c commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1422429361 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java: ## @@ -339,140 +336,188 @@ public String getLowestCassandraVersion() return cassandraVersion; } -public String getVersionFromFeature() +@Override +public Map getInstanceAvailability() { -return null; +TokenRangeMapping mapping = getTokenRangeMapping(true); +Map result = +mapping.getReplicaMetadata() + .stream() + .map(RingInstance::new) + .collect(Collectors.toMap(Function.identity(), this::determineInstanceAvailability)); + +if (LOGGER.isDebugEnabled()) +{ +result.forEach((inst, avail) -> LOGGER.debug("Instance {} has availability {}", inst, avail)); +} +return result; } -protected List getAllNodeSettings() +private InstanceAvailability determineInstanceAvailability(RingInstance instance) { -List allNodeSettings = FutureUtils.bestEffortGet(allNodeSettingFutures, - conf.getSidecarRequestMaxRetryDelayInSeconds(), - TimeUnit.SECONDS); - -if (allNodeSettings.isEmpty()) +if (!instanceIsUp(instance.getRingInstance())) { -throw new RuntimeException(String.format("Unable to determine the node settings. 0/%d instances available.", - allNodeSettingFutures.size())); +return InstanceAvailability.UNAVAILABLE_DOWN; } -else if (allNodeSettings.size() < allNodeSettingFutures.size()) +if (instanceIsBlocked(instance)) { -LOGGER.warn("{}/{} instances were used to determine the node settings", -allNodeSettings.size(), allNodeSettingFutures.size()); +return InstanceAvailability.UNAVAILABLE_BLOCKED; } - -return allNodeSettings; -} - -public String getVersionFromSidecar() -{ -NodeSettings nodeSettings = this.nodeSettings.get(); -if (nodeSettings != null) +if (instanceIsNormal(instance.getRingInstance()) || +instanceIsTransitioning(instance.getRingInstance()) || +instanceIsBeingReplaced(instance.getRingInstance())) { -return nodeSettings.releaseVersion(); +return InstanceAvailability.AVAILABLE; } -return getLowestVersion(getAllNodeSettings()); +LOGGER.info("No valid state found for instance {}", instance); +// If it's not one of the above, it's inherently INVALID. +return InstanceAvailability.INVALID_STATE; } -protected RingResponse getRingResponse() +private TokenRangeMapping getTokenRangeReplicas() { -RingResponse currentRingResponse = ringResponse; -if (currentRingResponse != null) -{ -return currentRingResponse; -} - -synchronized (this) +Map> writeReplicasByDC; +Map> pendingReplicasByDC; +Map replicaMetadata; +Set blockedInstances; +Set replacementInstances; +Multimap> tokenRangesByInstance; +try { -if (ringResponse == null) +TokenRangeReplicasResponse response = getTokenRangesAndReplicaSets(); +replicaMetadata = response.replicaMetadata(); + +tokenRangesByInstance = getTokenRangesByInstance(response.writeReplicas(), response.replicaMetadata()); +LOGGER.info("Retrieved token ranges for {} instances from write replica set ", +tokenRangesByInstance.size()); + +replacementInstances = response.replicaMetadata() + .values() + .stream() + .filter(m -> m.state().equalsIgnoreCase(InstanceState.REPLACING.toString())) + .map(RingInstance::new) + .collect(Collectors.toSet()); + +blockedInstances = response.replicaMetadata() + .values() + .stream() + .map(RingInstance::new) + .filter(this::instanceIsBlocked) + .collect(Collectors.toSet()); + +Set blockedIps = blockedInstances.stream().map(i -> i.getRingInstance().address()) + .collect(Collectors.toSet()); + +// Each token range has hosts by DC. We collate them across all ranges
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
arjunashok commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1422019437 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ConsistencyLevel.java: ## @@ -59,42 +87,34 @@ public boolean isLocal() } @Override -public boolean checkConsistency(Collection failedInsts, -ReplicationFactor replicationFactor, +public boolean checkConsistency(Set writeReplicas, +Set pendingReplicas, +Set replacementInstances, +Set blockedInstances, +Set failedInstanceIps, String localDC) { - Preconditions.checkArgument(replicationFactor.getReplicationStrategy() != ReplicationFactor.ReplicationStrategy.SimpleStrategy, -"EACH_QUORUM doesn't make sense for SimpleStrategy keyspaces"); - -for (String datacenter : replicationFactor.getOptions().keySet()) -{ -int rf = replicationFactor.getOptions().get(datacenter); -if (failedInsts.stream() - .filter(instance -> instance.getDataCenter().matches(datacenter)) - .count() > (rf - (rf / 2 + 1))) -{ -return false; -} -} - -return true; +return (failedInstanceIps.size() + blockedInstances.size()) <= (writeReplicas.size() - (writeReplicas.size() / 2 + 1)); Review Comment: Correct, the idea was to keep this class confined to CL validation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
arjunashok commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1422015166 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CommitCoordinator.java: ## @@ -128,61 +128,51 @@ private Stream> commit(Map> uploadRanges) { -if (cluster.instanceIsAvailable(instance)) Review Comment: The expectation is for the instance to be available or transitioning at this point. If the instance does happen to be down, it should result in a commit failure, which is further evaluated against the CL requirements. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
arjunashok commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1422010350 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java: ## @@ -94,6 +95,12 @@ public Object[] normalize(Object[] row) } public Object[] getKeyColumns(Object[] allColumns) +{ +return getKeyColumns(allColumns, keyFieldPositions); +} + +@NotNull +public static Object[] getKeyColumns(Object[] allColumns, List keyFieldPositions) Review Comment: Testing only. Added -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
arjunashok commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1422005002 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java: ## @@ -339,140 +336,190 @@ public String getLowestCassandraVersion() return cassandraVersion; } -public String getVersionFromFeature() +@Override +public Map getInstanceAvailability() { -return null; +TokenRangeMapping mapping = getTokenRangeMapping(true); +Map result = +mapping.getReplicaMetadata() + .stream() + .map(RingInstance::new) + .collect(Collectors.toMap(Function.identity(), this::determineInstanceAvailability)); + +if (LOGGER.isDebugEnabled()) +{ +result.forEach((inst, avail) -> LOGGER.debug("Instance {} has availability {}", inst, avail)); +} +return result; } -protected List getAllNodeSettings() +private InstanceAvailability determineInstanceAvailability(RingInstance instance) { -List allNodeSettings = FutureUtils.bestEffortGet(allNodeSettingFutures, - conf.getSidecarRequestMaxRetryDelayInSeconds(), - TimeUnit.SECONDS); - -if (allNodeSettings.isEmpty()) +if (!instanceIsUp(instance.getRingInstance())) { -throw new RuntimeException(String.format("Unable to determine the node settings. 0/%d instances available.", - allNodeSettingFutures.size())); +return InstanceAvailability.UNAVAILABLE_DOWN; } -else if (allNodeSettings.size() < allNodeSettingFutures.size()) +if (instanceIsBlocked(instance)) { -LOGGER.warn("{}/{} instances were used to determine the node settings", -allNodeSettings.size(), allNodeSettingFutures.size()); +return InstanceAvailability.UNAVAILABLE_BLOCKED; } - -return allNodeSettings; -} - -public String getVersionFromSidecar() -{ -NodeSettings nodeSettings = this.nodeSettings.get(); -if (nodeSettings != null) +if (instanceIsNormal(instance.getRingInstance()) || +instanceIsTransitioning(instance.getRingInstance()) || +instanceIsBeingReplaced(instance.getRingInstance())) { -return nodeSettings.releaseVersion(); +return InstanceAvailability.AVAILABLE; } -return getLowestVersion(getAllNodeSettings()); +LOGGER.info("No valid state found for instance {}", instance); +// If it's not one of the above, it's inherently INVALID. +return InstanceAvailability.INVALID_STATE; } -protected RingResponse getRingResponse() +private TokenRangeMapping getTokenRangeReplicas() { -RingResponse currentRingResponse = ringResponse; -if (currentRingResponse != null) +Map> writeReplicasByDC; +Map> pendingReplicasByDC; +List replicaMetadata; +Set blockedInstances; +Set replacementInstances; +Multimap> tokenRangesByInstance; +try { -return currentRingResponse; -} +TokenRangeReplicasResponse response = getTokenRangesAndReplicaSets(); +replicaMetadata = response.replicaMetadata(); -synchronized (this) -{ -if (ringResponse == null) +tokenRangesByInstance = getTokenRangesByInstance(response.writeReplicas(), response.replicaMetadata()); +LOGGER.info("Retrieved token ranges for {} instances from write replica set ", +tokenRangesByInstance.size()); + +replacementInstances = response.replicaMetadata() + .stream() + .filter(m -> m.state().equalsIgnoreCase(InstanceState.REPLACING.toString())) + .map(RingInstance::new) + .collect(Collectors.toSet()); + +blockedInstances = response.replicaMetadata().stream() + .map(RingInstance::new) + .filter(this::instanceIsBlocked) + .collect(Collectors.toSet()); + +Set blockedIps = blockedInstances.stream().map(i -> i.getRingInstance().address()) + .collect(Collectors.toSet()); + +// Each token range has hosts by DC. We collate them across all ranges into all hosts by DC +writeReplicasByDC = response.writeReplicas() +.stream() +
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
arjunashok commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1422004520 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java: ## @@ -339,140 +336,190 @@ public String getLowestCassandraVersion() return cassandraVersion; } -public String getVersionFromFeature() +@Override +public Map getInstanceAvailability() { -return null; +TokenRangeMapping mapping = getTokenRangeMapping(true); +Map result = +mapping.getReplicaMetadata() + .stream() + .map(RingInstance::new) + .collect(Collectors.toMap(Function.identity(), this::determineInstanceAvailability)); + +if (LOGGER.isDebugEnabled()) +{ +result.forEach((inst, avail) -> LOGGER.debug("Instance {} has availability {}", inst, avail)); +} +return result; } -protected List getAllNodeSettings() +private InstanceAvailability determineInstanceAvailability(RingInstance instance) { -List allNodeSettings = FutureUtils.bestEffortGet(allNodeSettingFutures, - conf.getSidecarRequestMaxRetryDelayInSeconds(), - TimeUnit.SECONDS); - -if (allNodeSettings.isEmpty()) +if (!instanceIsUp(instance.getRingInstance())) { -throw new RuntimeException(String.format("Unable to determine the node settings. 0/%d instances available.", - allNodeSettingFutures.size())); +return InstanceAvailability.UNAVAILABLE_DOWN; } -else if (allNodeSettings.size() < allNodeSettingFutures.size()) +if (instanceIsBlocked(instance)) { -LOGGER.warn("{}/{} instances were used to determine the node settings", -allNodeSettings.size(), allNodeSettingFutures.size()); +return InstanceAvailability.UNAVAILABLE_BLOCKED; } - -return allNodeSettings; -} - -public String getVersionFromSidecar() -{ -NodeSettings nodeSettings = this.nodeSettings.get(); -if (nodeSettings != null) +if (instanceIsNormal(instance.getRingInstance()) || +instanceIsTransitioning(instance.getRingInstance()) || +instanceIsBeingReplaced(instance.getRingInstance())) { -return nodeSettings.releaseVersion(); +return InstanceAvailability.AVAILABLE; } -return getLowestVersion(getAllNodeSettings()); +LOGGER.info("No valid state found for instance {}", instance); +// If it's not one of the above, it's inherently INVALID. +return InstanceAvailability.INVALID_STATE; } -protected RingResponse getRingResponse() +private TokenRangeMapping getTokenRangeReplicas() { -RingResponse currentRingResponse = ringResponse; -if (currentRingResponse != null) +Map> writeReplicasByDC; +Map> pendingReplicasByDC; +List replicaMetadata; +Set blockedInstances; +Set replacementInstances; +Multimap> tokenRangesByInstance; +try { -return currentRingResponse; -} +TokenRangeReplicasResponse response = getTokenRangesAndReplicaSets(); +replicaMetadata = response.replicaMetadata(); -synchronized (this) -{ -if (ringResponse == null) +tokenRangesByInstance = getTokenRangesByInstance(response.writeReplicas(), response.replicaMetadata()); +LOGGER.info("Retrieved token ranges for {} instances from write replica set ", +tokenRangesByInstance.size()); + +replacementInstances = response.replicaMetadata() + .stream() + .filter(m -> m.state().equalsIgnoreCase(InstanceState.REPLACING.toString())) + .map(RingInstance::new) + .collect(Collectors.toSet()); + +blockedInstances = response.replicaMetadata().stream() + .map(RingInstance::new) + .filter(this::instanceIsBlocked) + .collect(Collectors.toSet()); + +Set blockedIps = blockedInstances.stream().map(i -> i.getRingInstance().address()) + .collect(Collectors.toSet()); + +// Each token range has hosts by DC. We collate them across all ranges into all hosts by DC +writeReplicasByDC = response.writeReplicas() +.stream() +
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
arjunashok commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1420983422 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java: ## @@ -98,39 +95,19 @@ public void insert(@NotNull Dataset data, boolean overwrite) Tokenizer tokenizer = new Tokenizer(writerContext); TableSchema tableSchema = writerContext.schema().getTableSchema(); JavaPairRDD sortedRDD = data.toJavaRDD() -.map(Row::toSeq) -.map(seq -> JavaConverters.seqAsJavaListConverter(seq).asJava().toArray()) -.map(tableSchema::normalize) -.keyBy(tokenizer::getDecoratedKey) - .repartitionAndSortWithinPartitions(broadcastContext.getValue().job().getTokenPartitioner()); +.map(Row::toSeq) +.map(seq -> JavaConverters.seqAsJavaListConverter(seq).asJava().toArray()) + .map(tableSchema::normalize) + .keyBy(tokenizer::getDecoratedKey) + .repartitionAndSortWithinPartitions(broadcastContext.getValue().job().getTokenPartitioner()); Review Comment: Agreed, this is out of scope from this PR. I believe this was applied by the code-style plugin. Will leave it as-is for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
arjunashok commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1420981329 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidator.java: ## @@ -23,103 +23,120 @@ import java.util.AbstractMap; import java.util.Collection; import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; import com.google.common.collect.Multimap; import com.google.common.collect.Range; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler; +import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; -public class BulkWriteValidator implements AutoCloseable +public class BulkWriteValidator { private static final Logger LOGGER = LoggerFactory.getLogger(BulkWriteValidator.class); private final ReplicaAwareFailureHandler failureHandler; -private final CassandraRingMonitor monitor; private final JobInfo job; private String phase = "Initializing"; private final ClusterInfo cluster; public BulkWriteValidator(BulkWriterContext bulkWriterContext, - Consumer cancelJobFunc) throws Exception + ReplicaAwareFailureHandler failureHandler) { cluster = bulkWriterContext.cluster(); job = bulkWriterContext.job(); -failureHandler = new ReplicaAwareFailureHandler<>(cluster.getRing(true)); -monitor = new CassandraRingMonitor(cluster, cancelJobFunc, 1000, TimeUnit.MILLISECONDS); +this.failureHandler = failureHandler; } -public void setPhase(String phase) -{ -this.phase = phase; -} - -public String getPhase() -{ -return phase; -} - -public void validateInitialEnvironment() -{ -validateCLOrFail(); -} - -public void close() -{ -monitor.stop(); -} - -private void validateCLOrFail() -{ -updateInstanceAvailability(); -validateClOrFail(failureHandler, LOGGER, phase, job); -} - -public static void validateClOrFail(ReplicaAwareFailureHandler failureHandler, +public static void validateClOrFail(TokenRangeMapping tokenRangeMapping, + ReplicaAwareFailureHandler failureHandler, Logger logger, String phase, JobInfo job) { Collection, Multimap>> failedRanges = -failureHandler.getFailedEntries(job.getConsistencyLevel(), job.getLocalDC()); +failureHandler.getFailedEntries(tokenRangeMapping, job.getConsistencyLevel(), job.getLocalDC()); + if (failedRanges.isEmpty()) { logger.info("Succeeded {} with {}", phase, job.getConsistencyLevel()); } else { -String message = String.format("Failed to load %s ranges with %s for job %s in phase %s", +String message = String.format("Failed to load %s ranges with %s for job %s in phase %s.", failedRanges.size(), job.getConsistencyLevel(), job.getId(), phase); logger.error(message); -failedRanges.forEach(failedRange -> failedRange.getValue().keySet().forEach(instance -> -logger.error("Failed {} for {} on {}", phase, failedRange.getKey(), instance.toString(; +failedRanges.forEach(failedRange -> + failedRange.getValue() +.keySet() +.forEach(instance -> + logger.error("Failed {} for {} on {}", + phase, + failedRange.getKey(), + instance.toString(; throw new RuntimeException(message); } } -public static void updateFailureHandler(CommitResult commitResult, -String phase, +public String getPhase() +{ +return phase; +} + +public void setPhase(String phase) +{ +this.phase = phase; +} + +public static void updateFailureHandler(CommitResult commitResult, String phase, ReplicaAwareFailureHandler failureHandler) { LOGGER.debug("Commit Result: {}", commitResult); commitResult.failures.forEach((uuid, err) -> { LOGGER.warn("[{}]: {} failed on {} with message {}", -uuid, phase, commitResult.instance.getNodeName(), err.errMsg); +uuid,
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
arjunashok commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1420975817 ## cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ResiliencyTestBase.java: ## @@ -0,0 +1,501 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.analytics; + +import java.io.ByteArrayOutputStream; +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.management.ManagementFactory; +import java.math.BigInteger; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Range; +import org.junit.jupiter.api.BeforeAll; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datastax.driver.core.ConsistencyLevel; +import o.a.c.analytics.sidecar.shaded.testing.adapters.base.StorageJmxOperations; +import o.a.c.analytics.sidecar.shaded.testing.common.JmxClient; +import o.a.c.analytics.sidecar.shaded.testing.common.data.QualifiedTableName; +import org.apache.cassandra.distributed.UpgradeableCluster; +import org.apache.cassandra.distributed.api.IInstanceConfig; +import org.apache.cassandra.distributed.api.IUpgradeableInstance; +import org.apache.cassandra.distributed.api.Row; +import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.sidecar.testing.IntegrationTestBase; +import org.apache.cassandra.spark.bulkwriter.DecoratedKey; +import org.apache.cassandra.spark.bulkwriter.Tokenizer; +import org.apache.cassandra.spark.common.schema.ColumnType; +import org.apache.cassandra.spark.common.schema.ColumnTypes; +import org.apache.cassandra.testing.CassandraIntegrationTest; +import org.apache.cassandra.testing.ConfigurableCassandraTestContext; +import scala.Tuple2; + +import static junit.framework.TestCase.assertTrue; +import static org.apache.cassandra.distributed.shared.NetworkTopology.dcAndRack; +import static org.apache.cassandra.distributed.shared.NetworkTopology.networkTopology; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Base class for resiliency tests. Contains helper methods for data generation and validation + */ +public abstract class ResiliencyTestBase extends IntegrationTestBase +{ +public static final int rowCount = 1000; +protected static final String retrieveRows = "select * from " + TEST_KEYSPACE + ".%s"; +private static final Logger LOGGER = LoggerFactory.getLogger(ResiliencyTestBase.class); +private static final String createTableStmt = "create table if not exists %s (id int, course text, marks int, primary key (id));"; + +private final ExecutorService executorService = Executors.newCachedThreadPool(); +private final AtomicReference errorOutput = new AtomicReference<>(); +private final AtomicReference outputBytes = new AtomicReference<>(); + + +public QualifiedTableName initializeSchema() +{ +return initializeSchema(ImmutableMap.of("datacenter1", 1)); +} + +public QualifiedTableName initializeSchema(Map rf) +{ +createTestKeyspace(rf); +return createTestTable(createTableStmt); +} + +public Set getDataForRange(Range range) +{ +// Iterate through all data entries; filter only entries that belong to range; convert to strings +return generateExpectedData().stream() + .filter(t -> range.contains(t._1().getToken())) +
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
arjunashok commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1420974122 ## cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java: ## @@ -50,13 +51,36 @@ private TokenRangeMappingUtils() public static TokenRangeMapping buildTokenRangeMapping(final int initialToken, final ImmutableMap rfByDC, int instancesPerDC) { +return buildTokenRangeMapping(initialToken, rfByDC, instancesPerDC, false, -1); +} +public static TokenRangeMapping buildTokenRangeMappingWithFailures(int initialToken, + ImmutableMap rfByDC, + int instancesPerDC) +{ final List instances = getInstances(initialToken, rfByDC, instancesPerDC); +RingInstance instance = instances.remove(0); +RingEntry entry = instance.getRingInstance(); +RingEntry newEntry = new RingEntry.Builder() + .datacenter(entry.datacenter()) + .port(entry.port()) + .address(entry.address()) + .status(InstanceStatus.DOWN.name()) + .state(entry.state()) + .token(entry.token()) + .fqdn(entry.fqdn()) + .rack(entry.rack()) + .owns(entry.owns()) + .load(entry.load()) + .hostId(entry.hostId()) + .build(); +RingInstance newInstance = new RingInstance(newEntry); +instances.add(0, newInstance); ReplicationFactor replicationFactor = getReplicationFactor(rfByDC); Map> writeReplicas = -instances.stream() - .collect(Collectors.groupingBy(RingInstance::getDataCenter, - Collectors.mapping(RingInstance::getNodeName, Collectors.toSet(; + instances.stream().collect(Collectors.groupingBy(RingInstance::getDataCenter, + Collectors.mapping(RingInstance::getNodeName, + Collectors.toSet(; Review Comment: This is a new file though -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
arjunashok commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1420972314 ## cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java: ## @@ -69,64 +78,217 @@ public class RecordWriterTest @BeforeEach public void setUp() { -tw = new MockTableWriter(folder); -ring = RingUtils.buildRing(0, "DC1", "test", 12); -writerContext = new MockBulkWriterContext(ring); +tw = new MockTableWriter(folder.getRoot()); +tokenRangeMapping = TokenRangeMappingUtils.buildTokenRangeMapping(10, ImmutableMap.of("DC1", 3), 12); +writerContext = new MockBulkWriterContext(tokenRangeMapping); tc = new TestTaskContext(); range = writerContext.job().getTokenPartitioner().getTokenRange(tc.partitionId()); tokenizer = new Tokenizer(writerContext); } @Test -public void testSuccessfulWrite() +public void testWriteFailWhenTopologyChangeWithinTask() +{ +// Generate token range mapping to simulate node movement of the first node by assigning it a different token +// within the same partition +int moveTargetToken = 5; +TokenRangeMapping testMapping = +TokenRangeMappingUtils.buildTokenRangeMapping(10, + ImmutableMap.of("DC1", 3), + 12, + true, + moveTargetToken); + +MockBulkWriterContext m = Mockito.spy(writerContext); +rw = new RecordWriter(m, COLUMN_NAMES, () -> tc, SSTableWriter::new); + + when(m.getTokenRangeMapping(false)).thenCallRealMethod().thenReturn(testMapping); +Iterator> data = generateData(5, true); +RuntimeException ex = assertThrows(RuntimeException.class, () -> rw.write(data)); +assertThat(ex.getMessage(), endsWith("Token range mappings have changed since the task started")); +} + +@Test +public void testWriteWithExclusions() +{ +TokenRangeMapping testMapping = +TokenRangeMappingUtils.buildTokenRangeMappingWithFailures(10, + ImmutableMap.of("DC1", 3), + 12); + +MockBulkWriterContext m = Mockito.spy(writerContext); +rw = new RecordWriter(m, COLUMN_NAMES, () -> tc, SSTableWriter::new); + +when(m.getTokenRangeMapping(anyBoolean())).thenReturn(testMapping); +when(m.getInstanceAvailability()).thenCallRealMethod(); +Iterator> data = generateData(5, true); +rw.write(data); +Map> uploads = writerContext.getUploads(); +assertThat(uploads.keySet().size(), is(REPLICA_COUNT)); // Should upload to 3 replicas +} + +@Test +public void testSuccessfulWrite() throws InterruptedException { Iterator> data = generateData(5, true); validateSuccessfulWrite(writerContext, data, COLUMN_NAMES); } @Test -public void testWriteWithConstantTTL() +public void testSuccessfulWriteCheckUploads() { -MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(ring); +rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, SSTableWriter::new); +Iterator> data = generateData(5, true); +rw.write(data); +Map> uploads = writerContext.getUploads(); +assertThat(uploads.keySet().size(), is(REPLICA_COUNT)); // Should upload to 3 replicas +assertThat(uploads.values().stream().mapToInt(List::size).sum(), is(REPLICA_COUNT * FILES_PER_SSTABLE * UPLOADED_TABLES)); +List requests = uploads.values().stream().flatMap(List::stream).collect(Collectors.toList()); +for (UploadRequest ur : requests) +{ +assertNotNull(ur.fileHash); +} +} + +@Test +public void testWriteWithConstantTTL() throws InterruptedException +{ +MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(tokenRangeMapping); Iterator> data = generateData(5, true, false, false); validateSuccessfulWrite(bulkWriterContext, data, COLUMN_NAMES); } @Test -public void testWriteWithTTLColumn() +public void testWriteWithTTLColumn() throws InterruptedException { -MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(ring); +MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(tokenRangeMapping); Iterator> data = generateData(5, true, true, false); -String[] columnNamesWithTtl = {"id", "date", "course", "marks", "ttl"}; +String[] columnNamesWithTtl = +{ +"id", "date", "course", "marks", "ttl" +}; validateSuccessfulWrite(bulkWriterCon
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
arjunashok commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1420965872 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ReplicaAwareFailureHandler.java: ## @@ -71,64 +72,138 @@ public void addFailure(Range tokenRange, Instance casInstance, Strin for (Map.Entry, Multimap> entry : overlappingFailures.asMapOfRanges().entrySet()) { Multimap newErrorMap = ArrayListMultimap.create(entry.getValue()); - newErrorMap.put(casInstance, errMessage); mappingsToAdd.put(entry.getKey(), newErrorMap); } failedRangesMap.putAll(mappingsToAdd); } -public boolean hasFailed(ConsistencyLevel consistencyLevel, String localDC) +public List getFailedInstances() Review Comment: Yep, that should be a `Set` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
arjunashok commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1420954777 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TokenPartitioner.java: ## @@ -215,19 +221,19 @@ private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOE // In order to best utilize the number of Spark cores while minimizing the number of commit calls, // we calculate the number of splits that will just match or exceed the total number of available Spark cores. -// NOTE: The actual number of partitions that result from this should always be at least -// the number of token ranges times the number of splits, but can be slightly more. -public int calculateSplits(CassandraRing ring, +// Note that the actual number of partitions that result from this should always be at least the number of token ranges * the number of splits, +// but can be slightly more. +public int calculateSplits(TokenRangeMapping tokenRangeMapping, Integer numberSplits, int defaultParallelism, Integer cores) { -if (numberSplits >= 0) +if (numberSplits != -1) Review Comment: Yep, the previous check makes sense. Don't recall why this change was needed. Will revert. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
arjunashok commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1420911706 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamSession.java: ## @@ -145,32 +157,27 @@ private List commit(StreamResult streamResult) throws ExecutionExc @VisibleForTesting List getReplicas() { -Map, List> overlappingRanges = ring.getSubRanges(tokenRange).asMapOfRanges(); +List exclusions = failureHandler.getFailedInstances(); +final Map, List> overlappingRanges = tokenRangeMapping.getSubRanges(tokenRange).asMapOfRanges(); -Preconditions.checkState(overlappingRanges.keySet().size() == 1, - String.format("Partition range %s is mapping more than one range %s", - tokenRange, overlappingRanges)); +LOGGER.debug("[{}]: Stream session token range: {} overlaps with ring ranges: {}", sessionID, tokenRange, overlappingRanges); +Preconditions.checkState(!tokenRange.isEmpty(), Review Comment: Moved this up to the point where we extract the token-range for the task/partition. We do not need to do this check for overlapping ranges as is it implicitly validated if we do not have any resulting replicas. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
arjunashok commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1420859094 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java: ## @@ -175,30 +324,31 @@ public void writeRow(Map valueMap, } } -void checkBatchSize(StreamSession streamSession, int partitionId, JobInfo job) throws IOException +/** + * Stream to replicas; if batchSize is reached, "finalize" SST to "schedule" streamSession + */ +private void checkBatchSize(final StreamSession streamSession, final int partitionId, final JobInfo job) throws IOException Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
arjunashok commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1420858512 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java: ## @@ -136,35 +181,139 @@ public StreamResult write(Iterator> sourceIterato } } +private Map, List> taskTokenRangeMapping(TokenRangeMapping tokenRange, + Range taskTokenRange) +{ +return tokenRange.getSubRanges(taskTokenRange).asMapOfRanges(); +} + +private Set instancesFromMapping(Map, List> mapping) +{ +return mapping.values() + .stream() + .flatMap(Collection::stream) + .collect(Collectors.toSet()); +} + +/** + * Creates a new session if we have the current token range intersecting the ranges from write replica-set. + * If we do find the need to split a range into sub-ranges, we create the corresponding session for the sub-range + * if the token from the row data belongs to the range. + */ +private StreamSession maybeCreateStreamSession(TaskContext taskContext, + StreamSession streamSession, + Tuple2 rowData, + Set> subRanges, + ReplicaAwareFailureHandler failureHandler, + List results) +throws IOException, ExecutionException, InterruptedException +{ +BigInteger token = rowData._1().getToken(); +Range tokenRange = getTokenRange(taskContext); + +Preconditions.checkState(tokenRange.contains(token), + String.format("Received Token %s outside of expected range %s", token, tokenRange)); + +// We have split ranges likely resulting from pending nodes +// Evaluate creating a new session if the token from current row is part of a sub-range +if (subRanges.size() > 1) +{ +// Create session using sub-range that contains the token from current row +Range matchingSubRange = subRanges.stream().filter(r -> r.contains(token)).findFirst().get(); +Preconditions.checkState(matchingSubRange != null, + String.format("Received Token %s outside of expected range %s", token, matchingSubRange)); +streamSession = maybeCreateSubRangeSession(taskContext, streamSession, failureHandler, results, matchingSubRange); +} + +// If we do not have any stream session at this point, we create a session using the partition's token range +return (streamSession == null) ? createStreamSession(taskContext) : streamSession; +} + +/** + * Given that the token belongs to a sub-range, creates a new stream session if either + * 1) we do not have an existing stream session, or 2) the existing stream session corresponds to a range that + * does NOT match the sub-range the token belongs to. + */ +private StreamSession maybeCreateSubRangeSession(TaskContext taskContext, + StreamSession streamSession, + ReplicaAwareFailureHandler failureHandler, + List results, + Range matchingSubRange) +throws IOException, ExecutionException, InterruptedException +{ +if (streamSession == null || streamSession.getTokenRange() != matchingSubRange) +{ +LOGGER.debug("[{}] Creating stream session for range: {}", taskContext.partitionId(), matchingSubRange); +// Schedule data to be sent if we are processing a batch that has not been scheduled yet. +if (streamSession != null) +{ +// Complete existing batched writes (if any) before the existing stream session is closed +if (batchSize != 0) +{ +finalizeSSTable(streamSession, taskContext.partitionId(), sstableWriter, batchNumber, batchSize); +sstableWriter = null; +batchSize = 0; +} +results.add(streamSession.close()); +} +streamSession = new StreamSession(writerContext, getStreamId(taskContext), matchingSubRange, failureHandler); +} +return streamSession; +} + +/** + * Get ranges from the set that intersect and/or overlap with the provided token range + */ +private Set> getIntersectingSubRanges(Set> ranges, Range tokenRange) +{ +return ranges.stream() + .filter(r -> r.isConne
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
arjunashok commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1420855953 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java: ## @@ -110,20 +133,42 @@ public StreamResult write(Iterator> sourceIterato Map valueMap = new HashMap<>(); try { +Set> newRanges = initialTokenRangeMapping.getRangeMap().asMapOfRanges().entrySet() + .stream() + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); Review Comment: makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
arjunashok commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1419770583 ## cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ResiliencyTestBase.java: ## @@ -0,0 +1,501 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.analytics; + +import java.io.ByteArrayOutputStream; +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.management.ManagementFactory; +import java.math.BigInteger; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Range; +import org.junit.jupiter.api.BeforeAll; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datastax.driver.core.ConsistencyLevel; +import o.a.c.analytics.sidecar.shaded.testing.adapters.base.StorageJmxOperations; +import o.a.c.analytics.sidecar.shaded.testing.common.JmxClient; +import o.a.c.analytics.sidecar.shaded.testing.common.data.QualifiedTableName; +import org.apache.cassandra.distributed.UpgradeableCluster; +import org.apache.cassandra.distributed.api.IInstanceConfig; +import org.apache.cassandra.distributed.api.IUpgradeableInstance; +import org.apache.cassandra.distributed.api.Row; +import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.sidecar.testing.IntegrationTestBase; +import org.apache.cassandra.spark.bulkwriter.DecoratedKey; +import org.apache.cassandra.spark.bulkwriter.Tokenizer; +import org.apache.cassandra.spark.common.schema.ColumnType; +import org.apache.cassandra.spark.common.schema.ColumnTypes; +import org.apache.cassandra.testing.CassandraIntegrationTest; +import org.apache.cassandra.testing.ConfigurableCassandraTestContext; +import scala.Tuple2; + +import static junit.framework.TestCase.assertTrue; +import static org.apache.cassandra.distributed.shared.NetworkTopology.dcAndRack; +import static org.apache.cassandra.distributed.shared.NetworkTopology.networkTopology; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Base class for resiliency tests. Contains helper methods for data generation and validation + */ +public abstract class ResiliencyTestBase extends IntegrationTestBase +{ +public static final int rowCount = 1000; +protected static final String retrieveRows = "select * from " + TEST_KEYSPACE + ".%s"; +private static final Logger LOGGER = LoggerFactory.getLogger(ResiliencyTestBase.class); +private static final String createTableStmt = "create table if not exists %s (id int, course text, marks int, primary key (id));"; + +private final ExecutorService executorService = Executors.newCachedThreadPool(); +private final AtomicReference errorOutput = new AtomicReference<>(); +private final AtomicReference outputBytes = new AtomicReference<>(); + + +public QualifiedTableName initializeSchema() +{ +return initializeSchema(ImmutableMap.of("datacenter1", 1)); +} + +public QualifiedTableName initializeSchema(Map rf) +{ +createTestKeyspace(rf); +return createTestTable(createTableStmt); +} + +public Set getDataForRange(Range range) +{ +// Iterate through all data entries; filter only entries that belong to range; convert to strings +return generateExpectedData().stream() + .filter(t -> range.contains(t._1().getToken())) +
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
arjunashok commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1419767330 ## cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java: ## @@ -231,4 +273,84 @@ protected void waitForKeyspaceAndTable(String keyspaceName, String tableName) String.format("Keyspace/table %s/%s did not become visible on all sidecar instances", keyspaceName, tableName)); } + +/** + * A {@link DnsResolver} instance used for tests that provides fast DNS resolution, to avoid blocking + * DNS resolution at the JDK/OS-level. + * + * NOTE: The resolver assumes that the addresses are of the form 127.0.0.x, which is what is currently + * configured for integration tests. + */ +static class FastDnsResolver implements DnsResolver Review Comment: Renamed to `LocalhostResolver` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
arjunashok commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1419760670 ## cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ResiliencyTestBase.java: ## @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.analytics; + +import java.io.IOException; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Range; + +import com.datastax.driver.core.ConsistencyLevel; +import o.a.c.analytics.sidecar.shaded.testing.adapters.base.StorageJmxOperations; +import o.a.c.analytics.sidecar.shaded.testing.common.JmxClient; +import o.a.c.analytics.sidecar.shaded.testing.common.data.QualifiedTableName; +import org.apache.cassandra.distributed.UpgradeableCluster; +import org.apache.cassandra.distributed.api.IInstanceConfig; +import org.apache.cassandra.distributed.api.IUpgradeableInstance; +import org.apache.cassandra.distributed.api.Row; +import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.sidecar.testing.IntegrationTestBase; +import org.apache.cassandra.spark.KryoRegister; +import org.apache.cassandra.spark.bulkwriter.BulkSparkConf; +import org.apache.cassandra.spark.bulkwriter.DecoratedKey; +import org.apache.cassandra.spark.bulkwriter.Tokenizer; +import org.apache.cassandra.spark.common.schema.ColumnType; +import org.apache.cassandra.spark.common.schema.ColumnTypes; +import org.apache.cassandra.testing.CassandraIntegrationTest; +import org.apache.cassandra.testing.ConfigurableCassandraTestContext; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.DataFrameWriter; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.StructType; +import scala.Tuple2; + +import static junit.framework.TestCase.assertTrue; +import static org.apache.cassandra.distributed.shared.NetworkTopology.dcAndRack; +import static org.apache.cassandra.distributed.shared.NetworkTopology.networkTopology; +import static org.apache.spark.sql.types.DataTypes.IntegerType; +import static org.apache.spark.sql.types.DataTypes.StringType; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Base class for resiliency tests. Contains helper methods for data generation and validation + */ +public abstract class ResiliencyTestBase extends IntegrationTestBase +{ +private static final String createTableStmt = "create table if not exists %s (id int, course text, marks int, primary key (id));"; +protected static final String retrieveRows = "select * from " + TEST_KEYSPACE + ".%s"; +public static final int rowCount = 1000; + +public QualifiedTableName initializeSchema() +{ +return initializeSchema(ImmutableMap.of("datacenter1", 1)); +} + +public QualifiedTableName initializeSchema(Map rf) +{ +createTestKeyspace(rf); +return createTestTable(createTableStmt); +} + +public SparkConf generateSparkConf() +{ +SparkConf sparkConf = new SparkConf() + .setAppName("Integration test Spark Cassandra Bulk Reader Job") + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .set("spark.master", "local[8,4]"); +BulkSparkConf.setupSparkConf(sparkConf, true); +KryoRegister.setup(sparkConf); +return sparkConf; +} + +public SparkSession generateSparkSession(SparkConf sparkConf) +{ +return SparkSession.builder() + .config(sparkConf) +
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
yifan-c commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1419342286 ## cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/expansion/JoiningDoubleClusterTest.java: ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.analytics.expansion; + +import java.util.Collection; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.jupiter.api.TestInfo; + +import com.datastax.driver.core.ConsistencyLevel; +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.dynamic.ClassFileLocator; +import net.bytebuddy.dynamic.TypeResolutionStrategy; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.MethodDelegation; +import net.bytebuddy.implementation.bind.annotation.SuperCall; +import net.bytebuddy.pool.TypePool; +import org.apache.cassandra.analytics.TestUninterruptibles; +import org.apache.cassandra.testing.CassandraIntegrationTest; +import org.apache.cassandra.testing.ConfigurableCassandraTestContext; +import org.apache.cassandra.utils.Shared; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + + +public class JoiningDoubleClusterTest extends JoiningTestBase +{ +@CassandraIntegrationTest(nodesPerDc = 5, newNodesPerDc = 5, network = true, buildCluster = false) +void oneReadAllWrite(ConfigurableCassandraTestContext cassandraTestContext, TestInfo testInfo) throws Exception +{ +BBHelperDoubleClusterSize.reset(); +runJoiningTestScenario(cassandraTestContext, + BBHelperDoubleClusterSize::install, + BBHelperDoubleClusterSize.transientStateStart, + BBHelperDoubleClusterSize.transientStateEnd, + ConsistencyLevel.ONE, + ConsistencyLevel.ALL, + false, + testInfo.getDisplayName()); +} + +@CassandraIntegrationTest(nodesPerDc = 5, newNodesPerDc = 5, network = true, buildCluster = false) +void oneReadAllWriteFailure(ConfigurableCassandraTestContext cassandraTestContext, TestInfo testInfo) throws Exception +{ +BBHelperDoubleClusterSizeFailure.reset(); +runJoiningTestScenario(cassandraTestContext, + BBHelperDoubleClusterSizeFailure::install, + BBHelperDoubleClusterSizeFailure.transientStateStart, + BBHelperDoubleClusterSizeFailure.transientStateEnd, + ConsistencyLevel.ONE, + ConsistencyLevel.ALL, + true, + testInfo.getDisplayName()); +} + +@CassandraIntegrationTest(nodesPerDc = 5, newNodesPerDc = 5, network = true, buildCluster = false) +void quorumReadQuorumWrite(ConfigurableCassandraTestContext cassandraTestContext, TestInfo testInfo) throws Exception +{ +BBHelperDoubleClusterSize.reset(); +runJoiningTestScenario(cassandraTestContext, + BBHelperDoubleClusterSize::install, + BBHelperDoubleClusterSize.transientStateStart, + BBHelperDoubleClusterSize.transientStateEnd, + ConsistencyLevel.QUORUM, + ConsistencyLevel.QUORUM, + false, + testInfo.getDisplayName()); +} + +@CassandraIntegrationTest(nodesPerDc = 5, newNodesPerDc = 5, network = true, buildCluster = false) +void quorumReadQuorumWriteFailure(ConfigurableCassandraTestContext cassandraTestContext, TestInfo testInfo) throws Exception +{ +BBHelperDoubleClusterSizeFailure.reset(); +runJoiningTestScenario(c
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
yifan-c commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1419125362 ## cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java: ## @@ -346,19 +366,22 @@ void writeBuffered() private void validateSuccessfulWrite(MockBulkWriterContext writerContext, Iterator> data, - String[] columnNames) + String[] columnNames) throws InterruptedException { validateSuccessfulWrite(writerContext, data, columnNames, UPLOADED_TABLES); } private void validateSuccessfulWrite(MockBulkWriterContext writerContext, Iterator> data, String[] columnNames, - int uploadedTables) + int uploadedTables) throws InterruptedException { RecordWriter rw = new RecordWriter(writerContext, columnNames, () -> tc, SSTableWriter::new); rw.write(data); +// Wait for uploads to finish +Thread.sleep(500); Review Comment: Thanks for the improvement! ## cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java: ## @@ -346,19 +366,22 @@ void writeBuffered() private void validateSuccessfulWrite(MockBulkWriterContext writerContext, Iterator> data, - String[] columnNames) + String[] columnNames) throws InterruptedException { validateSuccessfulWrite(writerContext, data, columnNames, UPLOADED_TABLES); } private void validateSuccessfulWrite(MockBulkWriterContext writerContext, Iterator> data, String[] columnNames, - int uploadedTables) + int uploadedTables) throws InterruptedException { RecordWriter rw = new RecordWriter(writerContext, columnNames, () -> tc, SSTableWriter::new); rw.write(data); +// Wait for uploads to finish +Thread.sleep(500); Review Comment: Thanks for making the improvement! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
arjunashok commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1417772899 ## cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/CassandraIntegrationTest.java: ## @@ -59,6 +59,13 @@ */ int numDcs() default 1; +/** + * This is only applied in context of multi-DC tests. Returns true if the keyspace is replicated + * across multiple DCs. Defaults to {@code true} + * @return whether the multi-DC test uses a cross-DC keyspace + */ +boolean useCrossDcKeyspace() default true; Review Comment: Agreed. We can revisit this when we are refactoring these helpers to make them less error prone (minimize the no. params being passed). ## cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/expansion/JoiningBaseTest.java: ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.analytics.expansion; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; + +import com.google.common.util.concurrent.Uninterruptibles; + +import com.datastax.driver.core.ConsistencyLevel; +import o.a.c.analytics.sidecar.shaded.testing.common.data.QualifiedTableName; +import org.apache.cassandra.analytics.ResiliencyTestBase; +import org.apache.cassandra.analytics.TestTokenSupplier; +import org.apache.cassandra.distributed.UpgradeableCluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IUpgradeableInstance; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.distributed.shared.ClusterUtils; +import org.apache.cassandra.testing.CassandraIntegrationTest; +import org.apache.cassandra.testing.ConfigurableCassandraTestContext; + +import static junit.framework.TestCase.assertNotNull; +import static org.assertj.core.api.Assertions.assertThat; + +public class JoiningBaseTest extends ResiliencyTestBase Review Comment: In these scenario specific base classes, we're only grouping common functionality instead of defining a contract for subclasses to implement, so this seemed appropriate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
arjunashok commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1417772294 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java: ## @@ -132,37 +133,32 @@ public List write(Iterator> sourceI Map valueMap = new HashMap<>(); try { -List exclusions = failureHandler.getFailedInstances(); Set> newRanges = initialTokenRangeMapping.getRangeMap().asMapOfRanges().entrySet() .stream() - .filter(e -> !exclusions.contains(e.getValue())) .map(Map.Entry::getKey) .collect(Collectors.toSet()); +Range tokenRange = getTokenRange(taskContext); +Set> subRanges = newRanges.contains(tokenRange) ? + Collections.singleton(tokenRange) : + getIntersectingSubRanges(newRanges, tokenRange); while (dataIterator.hasNext()) { Tuple2 rowData = dataIterator.next(); -streamSession = maybeCreateStreamSession(taskContext, streamSession, rowData, newRanges, failureHandler); - -sessions.add(streamSession); +streamSession = maybeCreateStreamSession(taskContext, streamSession, rowData, subRanges, failureHandler, results); maybeCreateTableWriter(partitionId, baseDir); writeRow(rowData, valueMap, partitionId, streamSession.getTokenRange()); checkBatchSize(streamSession, partitionId, job); } -// Finalize SSTable for the last StreamSession -if (sstableWriter != null || (streamSession != null && batchSize != 0)) +// Cleanup SSTable writer and schedule the last stream Review Comment: Makes sense. ## cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java: ## @@ -346,19 +366,22 @@ void writeBuffered() private void validateSuccessfulWrite(MockBulkWriterContext writerContext, Iterator> data, - String[] columnNames) + String[] columnNames) throws InterruptedException { validateSuccessfulWrite(writerContext, data, columnNames, UPLOADED_TABLES); } private void validateSuccessfulWrite(MockBulkWriterContext writerContext, Iterator> data, String[] columnNames, - int uploadedTables) + int uploadedTables) throws InterruptedException { RecordWriter rw = new RecordWriter(writerContext, columnNames, () -> tc, SSTableWriter::new); rw.write(data); +// Wait for uploads to finish +Thread.sleep(500); Review Comment: In general, I agree with the flakiness introduced by sleep. This was added because when the entire test suite was executed, we did see the uploads not finishing before we look up the no. files that we uploaded. We could potentially use a latch in the `MockBulkWriterContext` to make this more deterministic. Will explore some more. ## cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java: ## @@ -90,16 +88,25 @@ public InstanceMetadata instanceFromId(int id) throws NoSuchElementException * @return instance meta information * @throws NoSuchElementException when the instance for {@code host} does not exist */ +@Override public InstanceMetadata instanceFromHost(String host) throws NoSuchElementException { -return cassandraTestContext.instancesConfig.instanceFromHost(host); +return cassandraTestContext.instancesConfig().instanceFromHost(host); } } @Provides @Singleton public SidecarConfiguration sidecarConfiguration() { -return new SidecarConfigurationImpl(new ServiceConfigurationImpl("127.0.0.1")); +ServiceConfiguration conf = ServiceConfigurationImpl.builder() +.host("0.0.0.0") // binds to all interfaces, potential security issue if left running for long Review Comment: Will defer this one to @JeetKunDoug. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail:
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
yifan-c commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1416591381 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java: ## @@ -207,46 +203,62 @@ private Set instancesFromMapping(Map, List rowData, - Set> newRanges, - ReplicaAwareFailureHandler failureHandler) throws IOException + Set> subRanges, + ReplicaAwareFailureHandler failureHandler, + List results) +throws IOException, ExecutionException, InterruptedException { BigInteger token = rowData._1().getToken(); Range tokenRange = getTokenRange(taskContext); Preconditions.checkState(tokenRange.contains(token), String.format("Received Token %s outside of expected range %s", token, tokenRange)); -// token range for this partition is not among the write-replica-set ranges -if (!newRanges.contains(tokenRange)) +// We have split ranges likely resulting from pending nodes +// Evaluate creating a new session if the token from current row is part of a sub-range +if (subRanges.size() > 1) { -Set> subRanges = getIntersectingSubRanges(newRanges, tokenRange); -// We have split ranges - likely resulting from pending nodes -if (subRanges.size() > 1) -{ -// Create session using sub-range that contains the token from current row -Range matchingRange = subRanges.stream().filter(r -> r.contains(token)).findFirst().get(); -Preconditions.checkState(matchingRange != null, - String.format("Received Token %s outside of expected range %s", token, matchingRange)); +// Create session using sub-range that contains the token from current row +Range matchingSubRange = subRanges.stream().filter(r -> r.contains(token)).findFirst().get(); +Preconditions.checkState(matchingSubRange != null, + String.format("Received Token %s outside of expected range %s", token, matchingSubRange)); Review Comment: The `checkState` will not ever see `matchingSubRange == null`. The reason is that at line#222, if the value is null, the `get()` operation throws exception already. If the intent to provide a more user friendly error message, can you not call `get()` and use `Optional> matchingSubRangeOpt` to capture the result and run `checkState` on the optional. ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java: ## @@ -132,37 +133,32 @@ public List write(Iterator> sourceI Map valueMap = new HashMap<>(); try { -List exclusions = failureHandler.getFailedInstances(); Set> newRanges = initialTokenRangeMapping.getRangeMap().asMapOfRanges().entrySet() .stream() - .filter(e -> !exclusions.contains(e.getValue())) .map(Map.Entry::getKey) .collect(Collectors.toSet()); +Range tokenRange = getTokenRange(taskContext); +Set> subRanges = newRanges.contains(tokenRange) ? + Collections.singleton(tokenRange) : + getIntersectingSubRanges(newRanges, tokenRange); while (dataIterator.hasNext()) { Tuple2 rowData = dataIterator.next(); -streamSession = maybeCreateStreamSession(taskContext, streamSession, rowData, newRanges, failureHandler); - -sessions.add(streamSession); +streamSession = maybeCreateStreamSession(taskContext, streamSession, rowData, subRanges, failureHandler, results); maybeCreateTableWriter(partitionId, baseDir); writeRow(rowData, valueMap, partitionId, streamSession.getTokenRange()); checkBatchSize(streamSession, partitionId, job); } -// Finalize SSTable for the last StreamSession -if (sstableWriter != null || (streamSession != null && batchSize != 0)) +// Cleanup SSTable writer and schedule the last stream Review Comment: "Cleanup SSTable writer" reads wrong to me. I would stick with "Finalize". The code is to flush any data to sstable by closing the writer. Cleanup leads me to th
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
arjunashok commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1411436951 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java: ## @@ -136,35 +185,122 @@ public StreamResult write(Iterator> sourceIterato } } +private Map, List> taskTokenRangeMapping(TokenRangeMapping tokenRange, + Range taskTokenRange) +{ +return tokenRange.getSubRanges(taskTokenRange).asMapOfRanges(); +} + +private Set instancesFromMapping(Map, List> mapping) +{ +return mapping.values() + .stream() + .flatMap(Collection::stream) + .collect(Collectors.toSet()); +} + +/** + * Creates a new session if we have the current token range intersecting the ranges from write replica-set. + * If we do find the need to split a range into sub-ranges, we create the corresponding session for the sub-range + * if the token from the row data belongs to the range. + */ +private StreamSession maybeCreateStreamSession(TaskContext taskContext, + StreamSession streamSession, + Tuple2 rowData, + Set> newRanges, + ReplicaAwareFailureHandler failureHandler) throws IOException +{ +BigInteger token = rowData._1().getToken(); +Range tokenRange = getTokenRange(taskContext); + +Preconditions.checkState(tokenRange.contains(token), + String.format("Received Token %s outside of expected range %s", token, tokenRange)); + +// token range for this partition is not among the write-replica-set ranges +if (!newRanges.contains(tokenRange)) Review Comment: Makes sense. Pulled this out of the loop. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
arjunashok commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1411436190 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java: ## @@ -110,20 +132,47 @@ public StreamResult write(Iterator> sourceIterato Map valueMap = new HashMap<>(); try { +List exclusions = failureHandler.getFailedInstances(); +Set> newRanges = initialTokenRangeMapping.getRangeMap().asMapOfRanges().entrySet() + .stream() + .filter(e -> !exclusions.contains(e.getValue())) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + while (dataIterator.hasNext()) { +Tuple2 rowData = dataIterator.next(); +streamSession = maybeCreateStreamSession(taskContext, streamSession, rowData, newRanges, failureHandler); + +sessions.add(streamSession); maybeCreateTableWriter(partitionId, baseDir); -writeRow(valueMap, dataIterator, partitionId, range); +writeRow(rowData, valueMap, partitionId, streamSession.getTokenRange()); checkBatchSize(streamSession, partitionId, job); } -if (sstableWriter != null) +// Finalize SSTable for the last StreamSession +if (sstableWriter != null || (streamSession != null && batchSize != 0)) Review Comment: Good point, removing redundant checks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
arjunashok commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1411436061 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java: ## @@ -110,20 +132,47 @@ public StreamResult write(Iterator> sourceIterato Map valueMap = new HashMap<>(); try { +List exclusions = failureHandler.getFailedInstances(); +Set> newRanges = initialTokenRangeMapping.getRangeMap().asMapOfRanges().entrySet() + .stream() + .filter(e -> !exclusions.contains(e.getValue())) Review Comment: Good catch. Evaluating if we even need this filter anymore. Will update. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
arjunashok commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1411327830 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java: ## @@ -110,20 +132,47 @@ public StreamResult write(Iterator> sourceIterato Map valueMap = new HashMap<>(); try { +List exclusions = failureHandler.getFailedInstances(); +Set> newRanges = initialTokenRangeMapping.getRangeMap().asMapOfRanges().entrySet() + .stream() + .filter(e -> !exclusions.contains(e.getValue())) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + while (dataIterator.hasNext()) { +Tuple2 rowData = dataIterator.next(); +streamSession = maybeCreateStreamSession(taskContext, streamSession, rowData, newRanges, failureHandler); + +sessions.add(streamSession); Review Comment: This was done to separate the session closures instead of having it scattered with checks for non-existent sessions. However, on looking further it seems like we do not really need to eagerly create the session for the partition's token range. Will update for it to be created lazily. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
arjunashok commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1409995629 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java: ## @@ -136,35 +185,122 @@ public StreamResult write(Iterator> sourceIterato } } +private Map, List> taskTokenRangeMapping(TokenRangeMapping tokenRange, + Range taskTokenRange) +{ +return tokenRange.getSubRanges(taskTokenRange).asMapOfRanges(); +} + +private Set instancesFromMapping(Map, List> mapping) +{ +return mapping.values() + .stream() + .flatMap(Collection::stream) + .collect(Collectors.toSet()); +} + +/** + * Creates a new session if we have the current token range intersecting the ranges from write replica-set. + * If we do find the need to split a range into sub-ranges, we create the corresponding session for the sub-range + * if the token from the row data belongs to the range. + */ +private StreamSession maybeCreateStreamSession(TaskContext taskContext, + StreamSession streamSession, + Tuple2 rowData, + Set> newRanges, + ReplicaAwareFailureHandler failureHandler) throws IOException +{ +BigInteger token = rowData._1().getToken(); +Range tokenRange = getTokenRange(taskContext); + +Preconditions.checkState(tokenRange.contains(token), + String.format("Received Token %s outside of expected range %s", token, tokenRange)); + +// token range for this partition is not among the write-replica-set ranges +if (!newRanges.contains(tokenRange)) +{ +Set> subRanges = getIntersectingSubRanges(newRanges, tokenRange); +// We have split ranges - likely resulting from pending nodes +if (subRanges.size() > 1) +{ +// Create session using sub-range that contains the token from current row +Range matchingRange = subRanges.stream().filter(r -> r.contains(token)).findFirst().get(); +Preconditions.checkState(matchingRange != null, + String.format("Received Token %s outside of expected range %s", token, matchingRange)); + +if (streamSession != null && streamSession.getTokenRange() == matchingRange) +{ +return streamSession; +} +else +{ +LOGGER.debug(String.format("[{}] Creating stream session for range: %s", matchingRange), taskContext.partitionId()); +if (streamSession != null && batchSize != 0) +{ +finalizeSSTable(streamSession, taskContext.partitionId(), sstableWriter, batchNumber, batchSize); +sstableWriter = null; +batchSize = 0; +} +return new StreamSession(writerContext, getStreamId(taskContext), matchingRange, failureHandler); +} +} +} + +return (streamSession != null) ? streamSession : createStreamSession(taskContext); +} + +/** + * Get ranges from the set that intersect and/or overlap with the provided token range + */ +private Set> getIntersectingSubRanges(Set> ranges, Range tokenRange) +{ +return ranges.stream() + .filter(r -> r.isConnected(tokenRange) && !r.intersection(tokenRange).isEmpty()) + .collect(Collectors.toSet()); +} + +private boolean haveTokenRangeMappingsChanged(TokenRangeMapping startTaskMapping, TaskContext taskContext) +{ +Range taskTokenRange = getTokenRange(taskContext); +// Get the uncached, current view of the ring to compare with initial ring +TokenRangeMapping endTaskMapping = writerContext.cluster().getTokenRangeMapping(false); +Map, List> startMapping = taskTokenRangeMapping(startTaskMapping, taskTokenRange); +Map, List> endMapping = taskTokenRangeMapping(endTaskMapping, taskTokenRange); + +return !(startMapping.keySet().equals(endMapping.keySet()) && + instancesFromMapping(startMapping).equals(instancesFromMapping(endMapping))); Review Comment: Thanks for the input. We're checking if either the token-ranges or the set of instances differ between the start and end of the task to determine if it should fail the task (and retry). In your example, if the the node is joining at the beginning of the task and has completed join
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
yifan-c commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1409303414 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java: ## @@ -136,35 +185,122 @@ public StreamResult write(Iterator> sourceIterato } } +private Map, List> taskTokenRangeMapping(TokenRangeMapping tokenRange, + Range taskTokenRange) +{ +return tokenRange.getSubRanges(taskTokenRange).asMapOfRanges(); +} + +private Set instancesFromMapping(Map, List> mapping) +{ +return mapping.values() + .stream() + .flatMap(Collection::stream) + .collect(Collectors.toSet()); +} + +/** + * Creates a new session if we have the current token range intersecting the ranges from write replica-set. + * If we do find the need to split a range into sub-ranges, we create the corresponding session for the sub-range + * if the token from the row data belongs to the range. + */ +private StreamSession maybeCreateStreamSession(TaskContext taskContext, + StreamSession streamSession, + Tuple2 rowData, + Set> newRanges, + ReplicaAwareFailureHandler failureHandler) throws IOException +{ +BigInteger token = rowData._1().getToken(); +Range tokenRange = getTokenRange(taskContext); + +Preconditions.checkState(tokenRange.contains(token), + String.format("Received Token %s outside of expected range %s", token, tokenRange)); + +// token range for this partition is not among the write-replica-set ranges +if (!newRanges.contains(tokenRange)) +{ +Set> subRanges = getIntersectingSubRanges(newRanges, tokenRange); +// We have split ranges - likely resulting from pending nodes +if (subRanges.size() > 1) +{ +// Create session using sub-range that contains the token from current row +Range matchingRange = subRanges.stream().filter(r -> r.contains(token)).findFirst().get(); +Preconditions.checkState(matchingRange != null, + String.format("Received Token %s outside of expected range %s", token, matchingRange)); + +if (streamSession != null && streamSession.getTokenRange() == matchingRange) +{ +return streamSession; +} +else +{ +LOGGER.debug(String.format("[{}] Creating stream session for range: %s", matchingRange), taskContext.partitionId()); +if (streamSession != null && batchSize != 0) +{ +finalizeSSTable(streamSession, taskContext.partitionId(), sstableWriter, batchNumber, batchSize); +sstableWriter = null; +batchSize = 0; +} +return new StreamSession(writerContext, getStreamId(taskContext), matchingRange, failureHandler); +} +} +} + +return (streamSession != null) ? streamSession : createStreamSession(taskContext); +} + +/** + * Get ranges from the set that intersect and/or overlap with the provided token range + */ +private Set> getIntersectingSubRanges(Set> ranges, Range tokenRange) +{ +return ranges.stream() + .filter(r -> r.isConnected(tokenRange) && !r.intersection(tokenRange).isEmpty()) + .collect(Collectors.toSet()); +} + +private boolean haveTokenRangeMappingsChanged(TokenRangeMapping startTaskMapping, TaskContext taskContext) +{ +Range taskTokenRange = getTokenRange(taskContext); +// Get the uncached, current view of the ring to compare with initial ring +TokenRangeMapping endTaskMapping = writerContext.cluster().getTokenRangeMapping(false); +Map, List> startMapping = taskTokenRangeMapping(startTaskMapping, taskTokenRange); +Map, List> endMapping = taskTokenRangeMapping(endTaskMapping, taskTokenRange); + +return !(startMapping.keySet().equals(endMapping.keySet()) && + instancesFromMapping(startMapping).equals(instancesFromMapping(endMapping))); Review Comment: nit: I think this is easier to read with less parenthesis. ```suggestion return !Objects.equals(startMapping.keySet(), endMapping.keySet()) || !Objects.equals(instancesFromMapping(startMapping), instancesFromMapping(endMapping)); ``` A follow up question. I
Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]
arjunashok commented on code in PR #17: URL: https://github.com/apache/cassandra-analytics/pull/17#discussion_r1372321724 ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ConsistencyLevel.java: ## @@ -20,18 +20,43 @@ package org.apache.cassandra.spark.bulkwriter.token; import java.util.Collection; +import java.util.Collections; +import java.util.Set; +import java.util.stream.Collectors; import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.spark.common.model.CassandraInstance; import org.apache.cassandra.spark.data.ReplicationFactor; public interface ConsistencyLevel { boolean isLocal(); -boolean checkConsistency(Collection failedInsts, ReplicationFactor replicationFactor, String localDC); +Logger LOGGER = LoggerFactory.getLogger(ConsistencyLevel.class); + +/** + * Checks if the consistency guarantees are maintained, given the failed, blocked and replacing instances, consistency-level and the replication-factor. + * + * - QUORUM based consistency levels check for quorum using the write-replica-set (instead of RF) as they include healthy and pending nodes. + * This is done to ensure that writes go to a quorum of healthy nodes while accounting for potential failure in pending nodes becoming healthy. + * - ONE and TWO consistency guarantees are maintained by ensuring that the failures leave us with at-least the corresponding healthy + * (and non-pending) nodes. + * + * For both the above cases, blocked instances are also considered as failures while performing consistency checks. + * Write replicas are adjusted to exclude replacement nodes for consistency checks, if we have replacement nodes that are not among the failed instances. + * This is to ensure that we are writing to sufficient non-replacement nodes as replacements can potentially fail leaving us with fewer nodes. + * + */ +boolean checkConsistency(CassandraRing ring, + TokenRangeMapping tokenRangeMapping, Review Comment: Addressed ## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ConsistencyLevel.java: ## @@ -82,19 +119,25 @@ public boolean checkConsistency(Collection failedIn }, QUORUM { +// Keyspaces exist with RF 1 or 2 @Override public boolean isLocal() { return false; } @Override -public boolean checkConsistency(Collection failedInsts, -ReplicationFactor replicationFactor, -String localDC) +public boolean checkConsistency(final CassandraRing ring, +final TokenRangeMapping tokenRangeMapping, +final Collection failedInsts, +final String localDC) { -int rf = replicationFactor.getTotalReplicationFactor(); -return failedInsts.size() <= (rf - (rf / 2 + 1)); +Set replacingInstances = tokenRangeMapping.getReplacementInstances(); +Set failedInstanceIPs = failedInsts.stream().map(CassandraInstance::getIpAddress).collect(Collectors.toSet()); +final long writeReplicaCount = maybeUpdateWriteReplicasForReplacements(tokenRangeMapping.getWriteReplicas(), + replacingInstances, + failedInstanceIPs); Review Comment: Addressed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org