Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-12-11 Thread via GitHub


arjunashok commented on PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#issuecomment-1850889132

   Attempt to reopen PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-12-11 Thread via GitHub


arjunashok commented on PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#issuecomment-1850889037

   Attempt to reopen PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-12-11 Thread via GitHub


arjunashok commented on PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#issuecomment-1850885476

   Attempt to reopen PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-12-11 Thread via GitHub


arjunashok commented on PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#issuecomment-1850873871

   Reopen attempt


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-12-11 Thread via GitHub


arjunashok commented on PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#issuecomment-1850873520

   Test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-12-11 Thread via GitHub


arjunashok closed pull request #17: Cassandra 18852: Make bulk writer resilient 
to cluster resize events
URL: https://github.com/apache/cassandra-analytics/pull/17


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-12-11 Thread via GitHub


yifan-c commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1422438882


##
cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ResiliencyTestBase.java:
##
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.management.ManagementFactory;
+import java.math.BigInteger;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Range;
+import org.junit.jupiter.api.BeforeAll;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.ConsistencyLevel;
+import 
o.a.c.analytics.sidecar.shaded.testing.adapters.base.StorageJmxOperations;
+import o.a.c.analytics.sidecar.shaded.testing.common.JmxClient;
+import o.a.c.analytics.sidecar.shaded.testing.common.data.QualifiedTableName;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.spark.bulkwriter.DecoratedKey;
+import org.apache.cassandra.spark.bulkwriter.Tokenizer;
+import org.apache.cassandra.spark.common.schema.ColumnType;
+import org.apache.cassandra.spark.common.schema.ColumnTypes;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+import scala.Tuple2;
+
+import static junit.framework.TestCase.assertTrue;
+import static 
org.apache.cassandra.distributed.shared.NetworkTopology.dcAndRack;
+import static 
org.apache.cassandra.distributed.shared.NetworkTopology.networkTopology;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Base class for resiliency tests. Contains helper methods for data 
generation and validation
+ */
+public abstract class ResiliencyTestBase extends IntegrationTestBase
+{
+public static final int rowCount = 1000;
+protected static final String retrieveRows = "select * from " + 
TEST_KEYSPACE + ".%s";
+private static final Logger LOGGER = 
LoggerFactory.getLogger(ResiliencyTestBase.class);
+private static final String createTableStmt = "create table if not exists 
%s (id int, course text, marks int, primary key (id));";
+
+private final ExecutorService executorService = 
Executors.newCachedThreadPool();
+private final AtomicReference errorOutput = new 
AtomicReference<>();
+private final AtomicReference outputBytes = new 
AtomicReference<>();
+
+
+public QualifiedTableName initializeSchema()
+{
+return initializeSchema(ImmutableMap.of("datacenter1", 1));
+}
+
+public QualifiedTableName initializeSchema(Map rf)
+{
+createTestKeyspace(rf);
+return createTestTable(createTableStmt);
+}
+
+public Set getDataForRange(Range range)
+{
+// Iterate through all data entries; filter only entries that belong 
to range; convert to strings
+return generateExpectedData().stream()
+ .filter(t -> 
range.contains(t._1().getToken()))
+  

Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-12-11 Thread via GitHub


yifan-c commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1422429361


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java:
##
@@ -339,140 +336,188 @@ public String getLowestCassandraVersion()
 return cassandraVersion;
 }
 
-public String getVersionFromFeature()
+@Override
+public Map getInstanceAvailability()
 {
-return null;
+TokenRangeMapping mapping = getTokenRangeMapping(true);
+Map result =
+mapping.getReplicaMetadata()
+   .stream()
+   .map(RingInstance::new)
+   .collect(Collectors.toMap(Function.identity(), 
this::determineInstanceAvailability));
+
+if (LOGGER.isDebugEnabled())
+{
+result.forEach((inst, avail) -> LOGGER.debug("Instance {} has 
availability {}", inst, avail));
+}
+return result;
 }
 
-protected List getAllNodeSettings()
+private InstanceAvailability determineInstanceAvailability(RingInstance 
instance)
 {
-List allNodeSettings = 
FutureUtils.bestEffortGet(allNodeSettingFutures,
-   
conf.getSidecarRequestMaxRetryDelayInSeconds(),
-   
TimeUnit.SECONDS);
-
-if (allNodeSettings.isEmpty())
+if (!instanceIsUp(instance.getRingInstance()))
 {
-throw new RuntimeException(String.format("Unable to determine the 
node settings. 0/%d instances available.",
- 
allNodeSettingFutures.size()));
+return InstanceAvailability.UNAVAILABLE_DOWN;
 }
-else if (allNodeSettings.size() < allNodeSettingFutures.size())
+if (instanceIsBlocked(instance))
 {
-LOGGER.warn("{}/{} instances were used to determine the node 
settings",
-allNodeSettings.size(), allNodeSettingFutures.size());
+return InstanceAvailability.UNAVAILABLE_BLOCKED;
 }
-
-return allNodeSettings;
-}
-
-public String getVersionFromSidecar()
-{
-NodeSettings nodeSettings = this.nodeSettings.get();
-if (nodeSettings != null)
+if (instanceIsNormal(instance.getRingInstance()) ||
+instanceIsTransitioning(instance.getRingInstance()) ||
+instanceIsBeingReplaced(instance.getRingInstance()))
 {
-return nodeSettings.releaseVersion();
+return InstanceAvailability.AVAILABLE;
 }
 
-return getLowestVersion(getAllNodeSettings());
+LOGGER.info("No valid state found for instance {}", instance);
+// If it's not one of the above, it's inherently INVALID.
+return InstanceAvailability.INVALID_STATE;
 }
 
-protected RingResponse getRingResponse()
+private TokenRangeMapping getTokenRangeReplicas()
 {
-RingResponse currentRingResponse = ringResponse;
-if (currentRingResponse != null)
-{
-return currentRingResponse;
-}
-
-synchronized (this)
+Map> writeReplicasByDC;
+Map> pendingReplicasByDC;
+Map replicaMetadata;
+Set blockedInstances;
+Set replacementInstances;
+Multimap> tokenRangesByInstance;
+try
 {
-if (ringResponse == null)
+TokenRangeReplicasResponse response = 
getTokenRangesAndReplicaSets();
+replicaMetadata = response.replicaMetadata();
+
+tokenRangesByInstance = 
getTokenRangesByInstance(response.writeReplicas(), response.replicaMetadata());
+LOGGER.info("Retrieved token ranges for {} instances from write 
replica set ",
+tokenRangesByInstance.size());
+
+replacementInstances = response.replicaMetadata()
+   .values()
+   .stream()
+   .filter(m -> 
m.state().equalsIgnoreCase(InstanceState.REPLACING.toString()))
+   .map(RingInstance::new)
+   .collect(Collectors.toSet());
+
+blockedInstances = response.replicaMetadata()
+   .values()
+   .stream()
+   .map(RingInstance::new)
+   .filter(this::instanceIsBlocked)
+   .collect(Collectors.toSet());
+
+Set blockedIps = blockedInstances.stream().map(i -> 
i.getRingInstance().address())
+ 
.collect(Collectors.toSet());
+
+// Each token range has hosts by DC. We collate them across all 
ranges 

Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-12-10 Thread via GitHub


arjunashok commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1422019437


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ConsistencyLevel.java:
##
@@ -59,42 +87,34 @@ public boolean isLocal()
 }
 
 @Override
-public boolean checkConsistency(Collection failedInsts,
-ReplicationFactor 
replicationFactor,
+public boolean checkConsistency(Set writeReplicas,
+Set pendingReplicas,
+Set replacementInstances,
+Set blockedInstances,
+Set failedInstanceIps,
 String localDC)
 {
-
Preconditions.checkArgument(replicationFactor.getReplicationStrategy() != 
ReplicationFactor.ReplicationStrategy.SimpleStrategy,
-"EACH_QUORUM doesn't make sense 
for SimpleStrategy keyspaces");
-
-for (String datacenter : 
replicationFactor.getOptions().keySet())
-{
-int rf = replicationFactor.getOptions().get(datacenter);
-if (failedInsts.stream()
-   .filter(instance -> 
instance.getDataCenter().matches(datacenter))
-   .count() > (rf - (rf / 2 + 1)))
-{
-return false;
-}
-}
-
-return true;
+return (failedInstanceIps.size() + blockedInstances.size()) <= 
(writeReplicas.size() - (writeReplicas.size() / 2 + 1));

Review Comment:
   Correct, the idea was to keep this class confined to CL validation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-12-10 Thread via GitHub


arjunashok commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1422015166


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CommitCoordinator.java:
##
@@ -128,61 +128,51 @@ private Stream> 
commit(Map> uploadRanges)
 {
-if (cluster.instanceIsAvailable(instance))

Review Comment:
   The expectation is for the instance to be available or transitioning at this 
point. If the instance does happen to be down, it should result in a commit 
failure, which is further evaluated against the CL requirements.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-12-10 Thread via GitHub


arjunashok commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1422010350


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java:
##
@@ -94,6 +95,12 @@ public Object[] normalize(Object[] row)
 }
 
 public Object[] getKeyColumns(Object[] allColumns)
+{
+return getKeyColumns(allColumns, keyFieldPositions);
+}
+
+@NotNull
+public static Object[] getKeyColumns(Object[] allColumns, List 
keyFieldPositions)

Review Comment:
   Testing only. Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-12-10 Thread via GitHub


arjunashok commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1422005002


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java:
##
@@ -339,140 +336,190 @@ public String getLowestCassandraVersion()
 return cassandraVersion;
 }
 
-public String getVersionFromFeature()
+@Override
+public Map getInstanceAvailability()
 {
-return null;
+TokenRangeMapping mapping = getTokenRangeMapping(true);
+Map result =
+mapping.getReplicaMetadata()
+   .stream()
+   .map(RingInstance::new)
+   .collect(Collectors.toMap(Function.identity(), 
this::determineInstanceAvailability));
+
+if (LOGGER.isDebugEnabled())
+{
+result.forEach((inst, avail) -> LOGGER.debug("Instance {} has 
availability {}", inst, avail));
+}
+return result;
 }
 
-protected List getAllNodeSettings()
+private InstanceAvailability determineInstanceAvailability(RingInstance 
instance)
 {
-List allNodeSettings = 
FutureUtils.bestEffortGet(allNodeSettingFutures,
-   
conf.getSidecarRequestMaxRetryDelayInSeconds(),
-   
TimeUnit.SECONDS);
-
-if (allNodeSettings.isEmpty())
+if (!instanceIsUp(instance.getRingInstance()))
 {
-throw new RuntimeException(String.format("Unable to determine the 
node settings. 0/%d instances available.",
- 
allNodeSettingFutures.size()));
+return InstanceAvailability.UNAVAILABLE_DOWN;
 }
-else if (allNodeSettings.size() < allNodeSettingFutures.size())
+if (instanceIsBlocked(instance))
 {
-LOGGER.warn("{}/{} instances were used to determine the node 
settings",
-allNodeSettings.size(), allNodeSettingFutures.size());
+return InstanceAvailability.UNAVAILABLE_BLOCKED;
 }
-
-return allNodeSettings;
-}
-
-public String getVersionFromSidecar()
-{
-NodeSettings nodeSettings = this.nodeSettings.get();
-if (nodeSettings != null)
+if (instanceIsNormal(instance.getRingInstance()) ||
+instanceIsTransitioning(instance.getRingInstance()) ||
+instanceIsBeingReplaced(instance.getRingInstance()))
 {
-return nodeSettings.releaseVersion();
+return InstanceAvailability.AVAILABLE;
 }
 
-return getLowestVersion(getAllNodeSettings());
+LOGGER.info("No valid state found for instance {}", instance);
+// If it's not one of the above, it's inherently INVALID.
+return InstanceAvailability.INVALID_STATE;
 }
 
-protected RingResponse getRingResponse()
+private TokenRangeMapping getTokenRangeReplicas()
 {
-RingResponse currentRingResponse = ringResponse;
-if (currentRingResponse != null)
+Map> writeReplicasByDC;
+Map> pendingReplicasByDC;
+List replicaMetadata;
+Set blockedInstances;
+Set replacementInstances;
+Multimap> tokenRangesByInstance;
+try
 {
-return currentRingResponse;
-}
+TokenRangeReplicasResponse response = 
getTokenRangesAndReplicaSets();
+replicaMetadata = response.replicaMetadata();
 
-synchronized (this)
-{
-if (ringResponse == null)
+tokenRangesByInstance = 
getTokenRangesByInstance(response.writeReplicas(), response.replicaMetadata());
+LOGGER.info("Retrieved token ranges for {} instances from write 
replica set ",
+tokenRangesByInstance.size());
+
+replacementInstances = response.replicaMetadata()
+   .stream()
+   .filter(m -> 
m.state().equalsIgnoreCase(InstanceState.REPLACING.toString()))
+   .map(RingInstance::new)
+   .collect(Collectors.toSet());
+
+blockedInstances = response.replicaMetadata().stream()
+   .map(RingInstance::new)
+   .filter(this::instanceIsBlocked)
+   .collect(Collectors.toSet());
+
+Set blockedIps = blockedInstances.stream().map(i -> 
i.getRingInstance().address())
+ 
.collect(Collectors.toSet());
+
+// Each token range has hosts by DC. We collate them across all 
ranges into all hosts by DC
+writeReplicasByDC = response.writeReplicas()
+.stream()
+

Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-12-10 Thread via GitHub


arjunashok commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1422004520


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java:
##
@@ -339,140 +336,190 @@ public String getLowestCassandraVersion()
 return cassandraVersion;
 }
 
-public String getVersionFromFeature()
+@Override
+public Map getInstanceAvailability()
 {
-return null;
+TokenRangeMapping mapping = getTokenRangeMapping(true);
+Map result =
+mapping.getReplicaMetadata()
+   .stream()
+   .map(RingInstance::new)
+   .collect(Collectors.toMap(Function.identity(), 
this::determineInstanceAvailability));
+
+if (LOGGER.isDebugEnabled())
+{
+result.forEach((inst, avail) -> LOGGER.debug("Instance {} has 
availability {}", inst, avail));
+}
+return result;
 }
 
-protected List getAllNodeSettings()
+private InstanceAvailability determineInstanceAvailability(RingInstance 
instance)
 {
-List allNodeSettings = 
FutureUtils.bestEffortGet(allNodeSettingFutures,
-   
conf.getSidecarRequestMaxRetryDelayInSeconds(),
-   
TimeUnit.SECONDS);
-
-if (allNodeSettings.isEmpty())
+if (!instanceIsUp(instance.getRingInstance()))
 {
-throw new RuntimeException(String.format("Unable to determine the 
node settings. 0/%d instances available.",
- 
allNodeSettingFutures.size()));
+return InstanceAvailability.UNAVAILABLE_DOWN;
 }
-else if (allNodeSettings.size() < allNodeSettingFutures.size())
+if (instanceIsBlocked(instance))
 {
-LOGGER.warn("{}/{} instances were used to determine the node 
settings",
-allNodeSettings.size(), allNodeSettingFutures.size());
+return InstanceAvailability.UNAVAILABLE_BLOCKED;
 }
-
-return allNodeSettings;
-}
-
-public String getVersionFromSidecar()
-{
-NodeSettings nodeSettings = this.nodeSettings.get();
-if (nodeSettings != null)
+if (instanceIsNormal(instance.getRingInstance()) ||
+instanceIsTransitioning(instance.getRingInstance()) ||
+instanceIsBeingReplaced(instance.getRingInstance()))
 {
-return nodeSettings.releaseVersion();
+return InstanceAvailability.AVAILABLE;
 }
 
-return getLowestVersion(getAllNodeSettings());
+LOGGER.info("No valid state found for instance {}", instance);
+// If it's not one of the above, it's inherently INVALID.
+return InstanceAvailability.INVALID_STATE;
 }
 
-protected RingResponse getRingResponse()
+private TokenRangeMapping getTokenRangeReplicas()
 {
-RingResponse currentRingResponse = ringResponse;
-if (currentRingResponse != null)
+Map> writeReplicasByDC;
+Map> pendingReplicasByDC;
+List replicaMetadata;
+Set blockedInstances;
+Set replacementInstances;
+Multimap> tokenRangesByInstance;
+try
 {
-return currentRingResponse;
-}
+TokenRangeReplicasResponse response = 
getTokenRangesAndReplicaSets();
+replicaMetadata = response.replicaMetadata();
 
-synchronized (this)
-{
-if (ringResponse == null)
+tokenRangesByInstance = 
getTokenRangesByInstance(response.writeReplicas(), response.replicaMetadata());
+LOGGER.info("Retrieved token ranges for {} instances from write 
replica set ",
+tokenRangesByInstance.size());
+
+replacementInstances = response.replicaMetadata()
+   .stream()
+   .filter(m -> 
m.state().equalsIgnoreCase(InstanceState.REPLACING.toString()))
+   .map(RingInstance::new)
+   .collect(Collectors.toSet());
+
+blockedInstances = response.replicaMetadata().stream()
+   .map(RingInstance::new)
+   .filter(this::instanceIsBlocked)
+   .collect(Collectors.toSet());
+
+Set blockedIps = blockedInstances.stream().map(i -> 
i.getRingInstance().address())
+ 
.collect(Collectors.toSet());
+
+// Each token range has hosts by DC. We collate them across all 
ranges into all hosts by DC
+writeReplicasByDC = response.writeReplicas()
+.stream()
+

Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-12-08 Thread via GitHub


arjunashok commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1420983422


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java:
##
@@ -98,39 +95,19 @@ public void insert(@NotNull Dataset data, boolean 
overwrite)
 Tokenizer tokenizer = new Tokenizer(writerContext);
 TableSchema tableSchema = writerContext.schema().getTableSchema();
 JavaPairRDD sortedRDD = data.toJavaRDD()
-.map(Row::toSeq)
-.map(seq -> 
JavaConverters.seqAsJavaListConverter(seq).asJava().toArray())
-.map(tableSchema::normalize)
-.keyBy(tokenizer::getDecoratedKey)
-
.repartitionAndSortWithinPartitions(broadcastContext.getValue().job().getTokenPartitioner());
+.map(Row::toSeq)
+.map(seq -> 
JavaConverters.seqAsJavaListConverter(seq).asJava().toArray())
+
.map(tableSchema::normalize)
+
.keyBy(tokenizer::getDecoratedKey)
+
.repartitionAndSortWithinPartitions(broadcastContext.getValue().job().getTokenPartitioner());

Review Comment:
   Agreed, this is out of scope from this PR. I believe this was applied by the 
code-style plugin. Will leave it as-is for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-12-08 Thread via GitHub


arjunashok commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1420981329


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidator.java:
##
@@ -23,103 +23,120 @@
 import java.util.AbstractMap;
 import java.util.Collection;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
 
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Range;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
+import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
 
-public class BulkWriteValidator implements AutoCloseable
+public class BulkWriteValidator
 {
 private static final Logger LOGGER = 
LoggerFactory.getLogger(BulkWriteValidator.class);
 
 private final ReplicaAwareFailureHandler failureHandler;
-private final CassandraRingMonitor monitor;
 private final JobInfo job;
 private String phase = "Initializing";
 
 private final ClusterInfo cluster;
 
 public BulkWriteValidator(BulkWriterContext bulkWriterContext,
-  Consumer cancelJobFunc) throws 
Exception
+  ReplicaAwareFailureHandler 
failureHandler)
 {
 cluster = bulkWriterContext.cluster();
 job = bulkWriterContext.job();
-failureHandler = new 
ReplicaAwareFailureHandler<>(cluster.getRing(true));
-monitor = new CassandraRingMonitor(cluster, cancelJobFunc, 1000, 
TimeUnit.MILLISECONDS);
+this.failureHandler = failureHandler;
 }
 
-public void setPhase(String phase)
-{
-this.phase = phase;
-}
-
-public String getPhase()
-{
-return phase;
-}
-
-public void validateInitialEnvironment()
-{
-validateCLOrFail();
-}
-
-public void close()
-{
-monitor.stop();
-}
-
-private void validateCLOrFail()
-{
-updateInstanceAvailability();
-validateClOrFail(failureHandler, LOGGER, phase, job);
-}
-
-public static void 
validateClOrFail(ReplicaAwareFailureHandler failureHandler,
+public static void validateClOrFail(TokenRangeMapping 
tokenRangeMapping,
+
ReplicaAwareFailureHandler failureHandler,
 Logger logger,
 String phase,
 JobInfo job)
 {
 Collection, 
Multimap>> failedRanges =
-failureHandler.getFailedEntries(job.getConsistencyLevel(), 
job.getLocalDC());
+failureHandler.getFailedEntries(tokenRangeMapping, 
job.getConsistencyLevel(), job.getLocalDC());
+
 if (failedRanges.isEmpty())
 {
 logger.info("Succeeded {} with {}", phase, 
job.getConsistencyLevel());
 }
 else
 {
-String message = String.format("Failed to load %s ranges with %s 
for job %s in phase %s",
+String message = String.format("Failed to load %s ranges with %s 
for job %s in phase %s.",
failedRanges.size(), 
job.getConsistencyLevel(), job.getId(), phase);
 logger.error(message);
-failedRanges.forEach(failedRange -> 
failedRange.getValue().keySet().forEach(instance ->
-logger.error("Failed {} for {} on {}", phase, 
failedRange.getKey(), instance.toString(;
+failedRanges.forEach(failedRange ->
+ failedRange.getValue()
+.keySet()
+.forEach(instance ->
+ logger.error("Failed {} 
for {} on {}",
+  phase,
+  
failedRange.getKey(),
+  
instance.toString(;
 throw new RuntimeException(message);
 }
 }
 
-public static void updateFailureHandler(CommitResult commitResult,
-String phase,
+public String getPhase()
+{
+return phase;
+}
+
+public void setPhase(String phase)
+{
+this.phase = phase;
+}
+
+public static void updateFailureHandler(CommitResult commitResult, String 
phase,
 
ReplicaAwareFailureHandler failureHandler)
 {
 LOGGER.debug("Commit Result: {}", commitResult);
 commitResult.failures.forEach((uuid, err) -> {
 LOGGER.warn("[{}]: {} failed on {} with message {}",
-uuid, phase, commitResult.instance.getNodeName(), 
err.errMsg);
+uuid,

Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-12-08 Thread via GitHub


arjunashok commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1420975817


##
cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ResiliencyTestBase.java:
##
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.management.ManagementFactory;
+import java.math.BigInteger;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Range;
+import org.junit.jupiter.api.BeforeAll;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.ConsistencyLevel;
+import 
o.a.c.analytics.sidecar.shaded.testing.adapters.base.StorageJmxOperations;
+import o.a.c.analytics.sidecar.shaded.testing.common.JmxClient;
+import o.a.c.analytics.sidecar.shaded.testing.common.data.QualifiedTableName;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.spark.bulkwriter.DecoratedKey;
+import org.apache.cassandra.spark.bulkwriter.Tokenizer;
+import org.apache.cassandra.spark.common.schema.ColumnType;
+import org.apache.cassandra.spark.common.schema.ColumnTypes;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+import scala.Tuple2;
+
+import static junit.framework.TestCase.assertTrue;
+import static 
org.apache.cassandra.distributed.shared.NetworkTopology.dcAndRack;
+import static 
org.apache.cassandra.distributed.shared.NetworkTopology.networkTopology;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Base class for resiliency tests. Contains helper methods for data 
generation and validation
+ */
+public abstract class ResiliencyTestBase extends IntegrationTestBase
+{
+public static final int rowCount = 1000;
+protected static final String retrieveRows = "select * from " + 
TEST_KEYSPACE + ".%s";
+private static final Logger LOGGER = 
LoggerFactory.getLogger(ResiliencyTestBase.class);
+private static final String createTableStmt = "create table if not exists 
%s (id int, course text, marks int, primary key (id));";
+
+private final ExecutorService executorService = 
Executors.newCachedThreadPool();
+private final AtomicReference errorOutput = new 
AtomicReference<>();
+private final AtomicReference outputBytes = new 
AtomicReference<>();
+
+
+public QualifiedTableName initializeSchema()
+{
+return initializeSchema(ImmutableMap.of("datacenter1", 1));
+}
+
+public QualifiedTableName initializeSchema(Map rf)
+{
+createTestKeyspace(rf);
+return createTestTable(createTableStmt);
+}
+
+public Set getDataForRange(Range range)
+{
+// Iterate through all data entries; filter only entries that belong 
to range; convert to strings
+return generateExpectedData().stream()
+ .filter(t -> 
range.contains(t._1().getToken()))
+   

Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-12-08 Thread via GitHub


arjunashok commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1420974122


##
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java:
##
@@ -50,13 +51,36 @@ private TokenRangeMappingUtils()
 
 public static TokenRangeMapping buildTokenRangeMapping(final 
int initialToken, final ImmutableMap rfByDC, int 
instancesPerDC)
 {
+return buildTokenRangeMapping(initialToken, rfByDC, instancesPerDC, 
false, -1);
+}
 
+public static TokenRangeMapping 
buildTokenRangeMappingWithFailures(int initialToken,
+   
  ImmutableMap rfByDC,
+   
  int instancesPerDC)
+{
 final List instances = getInstances(initialToken, 
rfByDC, instancesPerDC);
+RingInstance instance = instances.remove(0);
+RingEntry entry = instance.getRingInstance();
+RingEntry newEntry = new RingEntry.Builder()
+ .datacenter(entry.datacenter())
+ .port(entry.port())
+ .address(entry.address())
+ .status(InstanceStatus.DOWN.name())
+ .state(entry.state())
+ .token(entry.token())
+ .fqdn(entry.fqdn())
+ .rack(entry.rack())
+ .owns(entry.owns())
+ .load(entry.load())
+ .hostId(entry.hostId())
+ .build();
+RingInstance newInstance = new RingInstance(newEntry);
+instances.add(0, newInstance);
 ReplicationFactor replicationFactor = getReplicationFactor(rfByDC);
 Map> writeReplicas =
-instances.stream()
- .collect(Collectors.groupingBy(RingInstance::getDataCenter,
-
Collectors.mapping(RingInstance::getNodeName, Collectors.toSet(;
+
instances.stream().collect(Collectors.groupingBy(RingInstance::getDataCenter,
+ 
Collectors.mapping(RingInstance::getNodeName,
+
Collectors.toSet(;

Review Comment:
   This is a new file though



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-12-08 Thread via GitHub


arjunashok commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1420972314


##
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java:
##
@@ -69,64 +78,217 @@ public class RecordWriterTest
 @BeforeEach
 public void setUp()
 {
-tw = new MockTableWriter(folder);
-ring = RingUtils.buildRing(0, "DC1", "test", 12);
-writerContext = new MockBulkWriterContext(ring);
+tw = new MockTableWriter(folder.getRoot());
+tokenRangeMapping = 
TokenRangeMappingUtils.buildTokenRangeMapping(10, ImmutableMap.of("DC1", 
3), 12);
+writerContext = new MockBulkWriterContext(tokenRangeMapping);
 tc = new TestTaskContext();
 range = 
writerContext.job().getTokenPartitioner().getTokenRange(tc.partitionId());
 tokenizer = new Tokenizer(writerContext);
 }
 
 @Test
-public void testSuccessfulWrite()
+public void testWriteFailWhenTopologyChangeWithinTask()
+{
+// Generate token range mapping to simulate node movement of the first 
node by assigning it a different token
+// within the same partition
+int moveTargetToken = 5;
+TokenRangeMapping testMapping =
+TokenRangeMappingUtils.buildTokenRangeMapping(10,
+  ImmutableMap.of("DC1", 
3),
+  12,
+  true,
+  moveTargetToken);
+
+MockBulkWriterContext m = Mockito.spy(writerContext);
+rw = new RecordWriter(m, COLUMN_NAMES, () -> tc, SSTableWriter::new);
+
+
when(m.getTokenRangeMapping(false)).thenCallRealMethod().thenReturn(testMapping);
+Iterator> data = generateData(5, true);
+RuntimeException ex = assertThrows(RuntimeException.class, () -> 
rw.write(data));
+assertThat(ex.getMessage(), endsWith("Token range mappings have 
changed since the task started"));
+}
+
+@Test
+public void testWriteWithExclusions()
+{
+TokenRangeMapping testMapping =
+TokenRangeMappingUtils.buildTokenRangeMappingWithFailures(10,
+  ImmutableMap.of("DC1", 
3),
+  12);
+
+MockBulkWriterContext m = Mockito.spy(writerContext);
+rw = new RecordWriter(m, COLUMN_NAMES, () -> tc, SSTableWriter::new);
+
+when(m.getTokenRangeMapping(anyBoolean())).thenReturn(testMapping);
+when(m.getInstanceAvailability()).thenCallRealMethod();
+Iterator> data = generateData(5, true);
+rw.write(data);
+Map> uploads = 
writerContext.getUploads();
+assertThat(uploads.keySet().size(), is(REPLICA_COUNT));  // Should 
upload to 3 replicas
+}
+
+@Test
+public void testSuccessfulWrite() throws InterruptedException
 {
 Iterator> data = generateData(5, true);
 validateSuccessfulWrite(writerContext, data, COLUMN_NAMES);
 }
 
 @Test
-public void testWriteWithConstantTTL()
+public void testSuccessfulWriteCheckUploads()
 {
-MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(ring);
+rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, 
SSTableWriter::new);
+Iterator> data = generateData(5, true);
+rw.write(data);
+Map> uploads = 
writerContext.getUploads();
+assertThat(uploads.keySet().size(), is(REPLICA_COUNT));  // Should 
upload to 3 replicas
+assertThat(uploads.values().stream().mapToInt(List::size).sum(), 
is(REPLICA_COUNT * FILES_PER_SSTABLE * UPLOADED_TABLES));
+List requests = 
uploads.values().stream().flatMap(List::stream).collect(Collectors.toList());
+for (UploadRequest ur : requests)
+{
+assertNotNull(ur.fileHash);
+}
+}
+
+@Test
+public void testWriteWithConstantTTL() throws InterruptedException
+{
+MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(tokenRangeMapping);
 Iterator> data = generateData(5, true, 
false, false);
 validateSuccessfulWrite(bulkWriterContext, data, COLUMN_NAMES);
 }
 
 @Test
-public void testWriteWithTTLColumn()
+public void testWriteWithTTLColumn() throws InterruptedException
 {
-MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(ring);
+MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(tokenRangeMapping);
 Iterator> data = generateData(5, true, 
true, false);
-String[] columnNamesWithTtl = {"id", "date", "course", "marks", "ttl"};
+String[] columnNamesWithTtl =
+{
+"id", "date", "course", "marks", "ttl"
+};
 validateSuccessfulWrite(bulkWriterCon

Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-12-08 Thread via GitHub


arjunashok commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1420965872


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ReplicaAwareFailureHandler.java:
##
@@ -71,64 +72,138 @@ public void addFailure(Range tokenRange, 
Instance casInstance, Strin
 for (Map.Entry, Multimap> entry : 
overlappingFailures.asMapOfRanges().entrySet())
 {
 Multimap newErrorMap = 
ArrayListMultimap.create(entry.getValue());
-
 newErrorMap.put(casInstance, errMessage);
 mappingsToAdd.put(entry.getKey(), newErrorMap);
 }
 failedRangesMap.putAll(mappingsToAdd);
 }
 
-public boolean hasFailed(ConsistencyLevel consistencyLevel, String localDC)
+public List getFailedInstances()

Review Comment:
   Yep, that should be a `Set`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-12-08 Thread via GitHub


arjunashok commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1420954777


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TokenPartitioner.java:
##
@@ -215,19 +221,19 @@ private void readObject(ObjectInputStream in) throws 
ClassNotFoundException, IOE
 
 // In order to best utilize the number of Spark cores while minimizing the 
number of commit calls,
 // we calculate the number of splits that will just match or exceed the 
total number of available Spark cores.
-// NOTE: The actual number of partitions that result from this should 
always be at least
-//   the number of token ranges times the number of splits, but can be 
slightly more.
-public int calculateSplits(CassandraRing ring,
+// Note that the actual number of partitions that result from this should 
always be at least the number of token ranges * the number of splits,
+// but can be slightly more.
+public int calculateSplits(TokenRangeMapping 
tokenRangeMapping,
Integer numberSplits,
int defaultParallelism,
Integer cores)
 {
-if (numberSplits >= 0)
+if (numberSplits != -1)

Review Comment:
   Yep, the previous check makes sense. Don't recall why this change was 
needed. Will revert.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-12-08 Thread via GitHub


arjunashok commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1420911706


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/StreamSession.java:
##
@@ -145,32 +157,27 @@ private List commit(StreamResult 
streamResult) throws ExecutionExc
 @VisibleForTesting
 List getReplicas()
 {
-Map, List> overlappingRanges = 
ring.getSubRanges(tokenRange).asMapOfRanges();
+List exclusions = failureHandler.getFailedInstances();
+final Map, List> overlappingRanges = 
tokenRangeMapping.getSubRanges(tokenRange).asMapOfRanges();
 
-Preconditions.checkState(overlappingRanges.keySet().size() == 1,
- String.format("Partition range %s is mapping 
more than one range %s",
-   tokenRange, overlappingRanges));
+LOGGER.debug("[{}]: Stream session token range: {} overlaps with ring 
ranges: {}", sessionID, tokenRange, overlappingRanges);
+Preconditions.checkState(!tokenRange.isEmpty(),

Review Comment:
   Moved this up to the point where we extract the token-range for the 
task/partition. We do not need to do this check for overlapping ranges as is it 
implicitly validated if we do not have any resulting replicas.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-12-08 Thread via GitHub


arjunashok commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1420859094


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java:
##
@@ -175,30 +324,31 @@ public void writeRow(Map valueMap,
 }
 }
 
-void checkBatchSize(StreamSession streamSession, int partitionId, JobInfo 
job) throws IOException
+/**
+ * Stream to replicas; if batchSize is reached, "finalize" SST to 
"schedule" streamSession
+ */
+private void checkBatchSize(final StreamSession streamSession, final int 
partitionId, final JobInfo job) throws IOException

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-12-08 Thread via GitHub


arjunashok commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1420858512


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java:
##
@@ -136,35 +181,139 @@ public StreamResult write(Iterator> sourceIterato
 }
 }
 
+private Map, List> 
taskTokenRangeMapping(TokenRangeMapping tokenRange,
+ 
Range taskTokenRange)
+{
+return tokenRange.getSubRanges(taskTokenRange).asMapOfRanges();
+}
+
+private Set instancesFromMapping(Map, 
List> mapping)
+{
+return mapping.values()
+  .stream()
+  .flatMap(Collection::stream)
+  .collect(Collectors.toSet());
+}
+
+/**
+ * Creates a new session if we have the current token range intersecting 
the ranges from write replica-set.
+ * If we do find the need to split a range into sub-ranges, we create the 
corresponding session for the sub-range
+ * if the token from the row data belongs to the range.
+ */
+private StreamSession maybeCreateStreamSession(TaskContext taskContext,
+   StreamSession streamSession,
+   Tuple2 rowData,
+   Set> 
subRanges,
+   
ReplicaAwareFailureHandler failureHandler,
+   List results)
+throws IOException, ExecutionException, InterruptedException
+{
+BigInteger token = rowData._1().getToken();
+Range tokenRange = getTokenRange(taskContext);
+
+Preconditions.checkState(tokenRange.contains(token),
+ String.format("Received Token %s outside of 
expected range %s", token, tokenRange));
+
+// We have split ranges likely resulting from pending nodes
+// Evaluate creating a new session if the token from current row is 
part of a sub-range
+if (subRanges.size() > 1)
+{
+// Create session using sub-range that contains the token from 
current row
+Range matchingSubRange = subRanges.stream().filter(r 
-> r.contains(token)).findFirst().get();
+Preconditions.checkState(matchingSubRange != null,
+ String.format("Received Token %s outside 
of expected range %s", token, matchingSubRange));
+streamSession = maybeCreateSubRangeSession(taskContext, 
streamSession, failureHandler, results, matchingSubRange);
+}
+
+// If we do not have any stream session at this point, we create a 
session using the partition's token range
+return (streamSession == null) ? createStreamSession(taskContext) : 
streamSession;
+}
+
+/**
+ * Given that the token belongs to a sub-range, creates a new stream 
session if either
+ * 1) we do not have an existing stream session, or 2) the existing stream 
session corresponds to a range that
+ * does NOT match the sub-range the token belongs to.
+ */
+private StreamSession maybeCreateSubRangeSession(TaskContext taskContext,
+ StreamSession 
streamSession,
+ 
ReplicaAwareFailureHandler failureHandler,
+ List 
results,
+ Range 
matchingSubRange)
+throws IOException, ExecutionException, InterruptedException
+{
+if (streamSession == null || streamSession.getTokenRange() != 
matchingSubRange)
+{
+LOGGER.debug("[{}] Creating stream session for range: {}", 
taskContext.partitionId(), matchingSubRange);
+// Schedule data to be sent if we are processing a batch that has 
not been scheduled yet.
+if (streamSession != null)
+{
+// Complete existing batched writes (if any) before the 
existing stream session is closed
+if (batchSize != 0)
+{
+finalizeSSTable(streamSession, taskContext.partitionId(), 
sstableWriter, batchNumber, batchSize);
+sstableWriter = null;
+batchSize = 0;
+}
+results.add(streamSession.close());
+}
+streamSession = new StreamSession(writerContext, 
getStreamId(taskContext), matchingSubRange, failureHandler);
+}
+return streamSession;
+}
+
+/**
+ * Get ranges from the set that intersect and/or overlap with the provided 
token range
+ */
+private Set> 
getIntersectingSubRanges(Set> ranges, Range 
tokenRange)
+{
+return ranges.stream()
+ .filter(r -> r.isConne

Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-12-08 Thread via GitHub


arjunashok commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1420855953


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java:
##
@@ -110,20 +133,42 @@ public StreamResult write(Iterator> sourceIterato
 Map valueMap = new HashMap<>();
 try
 {
+Set> newRanges = 
initialTokenRangeMapping.getRangeMap().asMapOfRanges().entrySet()
+   
.stream()
+   
.map(Map.Entry::getKey)
+   
.collect(Collectors.toSet());

Review Comment:
   makes sense. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-12-07 Thread via GitHub


arjunashok commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1419770583


##
cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ResiliencyTestBase.java:
##
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.management.ManagementFactory;
+import java.math.BigInteger;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Range;
+import org.junit.jupiter.api.BeforeAll;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.ConsistencyLevel;
+import 
o.a.c.analytics.sidecar.shaded.testing.adapters.base.StorageJmxOperations;
+import o.a.c.analytics.sidecar.shaded.testing.common.JmxClient;
+import o.a.c.analytics.sidecar.shaded.testing.common.data.QualifiedTableName;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.spark.bulkwriter.DecoratedKey;
+import org.apache.cassandra.spark.bulkwriter.Tokenizer;
+import org.apache.cassandra.spark.common.schema.ColumnType;
+import org.apache.cassandra.spark.common.schema.ColumnTypes;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+import scala.Tuple2;
+
+import static junit.framework.TestCase.assertTrue;
+import static 
org.apache.cassandra.distributed.shared.NetworkTopology.dcAndRack;
+import static 
org.apache.cassandra.distributed.shared.NetworkTopology.networkTopology;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Base class for resiliency tests. Contains helper methods for data 
generation and validation
+ */
+public abstract class ResiliencyTestBase extends IntegrationTestBase
+{
+public static final int rowCount = 1000;
+protected static final String retrieveRows = "select * from " + 
TEST_KEYSPACE + ".%s";
+private static final Logger LOGGER = 
LoggerFactory.getLogger(ResiliencyTestBase.class);
+private static final String createTableStmt = "create table if not exists 
%s (id int, course text, marks int, primary key (id));";
+
+private final ExecutorService executorService = 
Executors.newCachedThreadPool();
+private final AtomicReference errorOutput = new 
AtomicReference<>();
+private final AtomicReference outputBytes = new 
AtomicReference<>();
+
+
+public QualifiedTableName initializeSchema()
+{
+return initializeSchema(ImmutableMap.of("datacenter1", 1));
+}
+
+public QualifiedTableName initializeSchema(Map rf)
+{
+createTestKeyspace(rf);
+return createTestTable(createTableStmt);
+}
+
+public Set getDataForRange(Range range)
+{
+// Iterate through all data entries; filter only entries that belong 
to range; convert to strings
+return generateExpectedData().stream()
+ .filter(t -> 
range.contains(t._1().getToken()))
+   

Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-12-07 Thread via GitHub


arjunashok commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1419767330


##
cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java:
##
@@ -231,4 +273,84 @@ protected void waitForKeyspaceAndTable(String 
keyspaceName, String tableName)
 String.format("Keyspace/table %s/%s did not become visible on all 
sidecar instances",
   keyspaceName, tableName));
 }
+
+/**
+ * A {@link DnsResolver} instance used for tests that provides fast DNS 
resolution, to avoid blocking
+ * DNS resolution at the JDK/OS-level.
+ *
+ * NOTE: The resolver assumes that the addresses are of the form 
127.0.0.x, which is what is currently
+ * configured for integration tests.
+ */
+static class FastDnsResolver implements DnsResolver

Review Comment:
   Renamed to `LocalhostResolver`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-12-07 Thread via GitHub


arjunashok commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1419760670


##
cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/ResiliencyTestBase.java:
##
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Range;
+
+import com.datastax.driver.core.ConsistencyLevel;
+import 
o.a.c.analytics.sidecar.shaded.testing.adapters.base.StorageJmxOperations;
+import o.a.c.analytics.sidecar.shaded.testing.common.JmxClient;
+import o.a.c.analytics.sidecar.shaded.testing.common.data.QualifiedTableName;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.spark.KryoRegister;
+import org.apache.cassandra.spark.bulkwriter.BulkSparkConf;
+import org.apache.cassandra.spark.bulkwriter.DecoratedKey;
+import org.apache.cassandra.spark.bulkwriter.Tokenizer;
+import org.apache.cassandra.spark.common.schema.ColumnType;
+import org.apache.cassandra.spark.common.schema.ColumnTypes;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.DataFrameWriter;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructType;
+import scala.Tuple2;
+
+import static junit.framework.TestCase.assertTrue;
+import static 
org.apache.cassandra.distributed.shared.NetworkTopology.dcAndRack;
+import static 
org.apache.cassandra.distributed.shared.NetworkTopology.networkTopology;
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Base class for resiliency tests. Contains helper methods for data 
generation and validation
+ */
+public abstract class ResiliencyTestBase extends IntegrationTestBase
+{
+private static final String createTableStmt = "create table if not exists 
%s (id int, course text, marks int, primary key (id));";
+protected static final String retrieveRows = "select * from " + 
TEST_KEYSPACE + ".%s";
+public static final int rowCount = 1000;
+
+public QualifiedTableName initializeSchema()
+{
+return initializeSchema(ImmutableMap.of("datacenter1", 1));
+}
+
+public QualifiedTableName initializeSchema(Map rf)
+{
+createTestKeyspace(rf);
+return createTestTable(createTableStmt);
+}
+
+public SparkConf generateSparkConf()
+{
+SparkConf sparkConf = new SparkConf()
+  .setAppName("Integration test Spark Cassandra 
Bulk Reader Job")
+  .set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
+  .set("spark.master", "local[8,4]");
+BulkSparkConf.setupSparkConf(sparkConf, true);
+KryoRegister.setup(sparkConf);
+return sparkConf;
+}
+
+public SparkSession generateSparkSession(SparkConf sparkConf)
+{
+return SparkSession.builder()
+   .config(sparkConf)
+  

Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-12-07 Thread via GitHub


yifan-c commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1419342286


##
cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/expansion/JoiningDoubleClusterTest.java:
##
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics.expansion;
+
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.jupiter.api.TestInfo;
+
+import com.datastax.driver.core.ConsistencyLevel;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.dynamic.ClassFileLocator;
+import net.bytebuddy.dynamic.TypeResolutionStrategy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import net.bytebuddy.pool.TypePool;
+import org.apache.cassandra.analytics.TestUninterruptibles;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+import org.apache.cassandra.utils.Shared;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+
+
+public class JoiningDoubleClusterTest extends JoiningTestBase
+{
+@CassandraIntegrationTest(nodesPerDc = 5, newNodesPerDc = 5, network = 
true, buildCluster = false)
+void oneReadAllWrite(ConfigurableCassandraTestContext 
cassandraTestContext, TestInfo testInfo) throws Exception
+{
+BBHelperDoubleClusterSize.reset();
+runJoiningTestScenario(cassandraTestContext,
+   BBHelperDoubleClusterSize::install,
+   BBHelperDoubleClusterSize.transientStateStart,
+   BBHelperDoubleClusterSize.transientStateEnd,
+   ConsistencyLevel.ONE,
+   ConsistencyLevel.ALL,
+   false,
+   testInfo.getDisplayName());
+}
+
+@CassandraIntegrationTest(nodesPerDc = 5, newNodesPerDc = 5, network = 
true, buildCluster = false)
+void oneReadAllWriteFailure(ConfigurableCassandraTestContext 
cassandraTestContext, TestInfo testInfo) throws Exception
+{
+BBHelperDoubleClusterSizeFailure.reset();
+runJoiningTestScenario(cassandraTestContext,
+   BBHelperDoubleClusterSizeFailure::install,
+   
BBHelperDoubleClusterSizeFailure.transientStateStart,
+   
BBHelperDoubleClusterSizeFailure.transientStateEnd,
+   ConsistencyLevel.ONE,
+   ConsistencyLevel.ALL,
+   true,
+   testInfo.getDisplayName());
+}
+
+@CassandraIntegrationTest(nodesPerDc = 5, newNodesPerDc = 5, network = 
true, buildCluster = false)
+void quorumReadQuorumWrite(ConfigurableCassandraTestContext 
cassandraTestContext, TestInfo testInfo) throws Exception
+{
+BBHelperDoubleClusterSize.reset();
+runJoiningTestScenario(cassandraTestContext,
+   BBHelperDoubleClusterSize::install,
+   BBHelperDoubleClusterSize.transientStateStart,
+   BBHelperDoubleClusterSize.transientStateEnd,
+   ConsistencyLevel.QUORUM,
+   ConsistencyLevel.QUORUM,
+   false,
+   testInfo.getDisplayName());
+}
+
+@CassandraIntegrationTest(nodesPerDc = 5, newNodesPerDc = 5, network = 
true, buildCluster = false)
+void quorumReadQuorumWriteFailure(ConfigurableCassandraTestContext 
cassandraTestContext, TestInfo testInfo) throws Exception
+{
+BBHelperDoubleClusterSizeFailure.reset();
+runJoiningTestScenario(c

Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-12-07 Thread via GitHub


yifan-c commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1419125362


##
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java:
##
@@ -346,19 +366,22 @@ void writeBuffered()
 
 private void validateSuccessfulWrite(MockBulkWriterContext writerContext,
  Iterator> data,
- String[] columnNames)
+ String[] columnNames) throws 
InterruptedException
 {
 validateSuccessfulWrite(writerContext, data, columnNames, 
UPLOADED_TABLES);
 }
 
 private void validateSuccessfulWrite(MockBulkWriterContext writerContext,
  Iterator> data,
  String[] columnNames,
- int uploadedTables)
+ int uploadedTables) throws 
InterruptedException
 {
 RecordWriter rw = new RecordWriter(writerContext, columnNames, () -> 
tc, SSTableWriter::new);
 rw.write(data);
+// Wait for uploads to finish
+Thread.sleep(500);

Review Comment:
   Thanks for the improvement!



##
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java:
##
@@ -346,19 +366,22 @@ void writeBuffered()
 
 private void validateSuccessfulWrite(MockBulkWriterContext writerContext,
  Iterator> data,
- String[] columnNames)
+ String[] columnNames) throws 
InterruptedException
 {
 validateSuccessfulWrite(writerContext, data, columnNames, 
UPLOADED_TABLES);
 }
 
 private void validateSuccessfulWrite(MockBulkWriterContext writerContext,
  Iterator> data,
  String[] columnNames,
- int uploadedTables)
+ int uploadedTables) throws 
InterruptedException
 {
 RecordWriter rw = new RecordWriter(writerContext, columnNames, () -> 
tc, SSTableWriter::new);
 rw.write(data);
+// Wait for uploads to finish
+Thread.sleep(500);

Review Comment:
   Thanks for making the improvement!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-12-06 Thread via GitHub


arjunashok commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1417772899


##
cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/testing/CassandraIntegrationTest.java:
##
@@ -59,6 +59,13 @@
  */
 int numDcs() default 1;
 
+/**
+ * This is only applied in context of multi-DC tests. Returns true if the 
keyspace is replicated
+ * across multiple DCs. Defaults to {@code true}
+ * @return whether the multi-DC test uses a cross-DC keyspace
+ */
+boolean useCrossDcKeyspace() default true;

Review Comment:
   Agreed. We can revisit this when we are refactoring these helpers to make 
them less error prone (minimize the no. params being passed). 



##
cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/expansion/JoiningBaseTest.java:
##
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.analytics.expansion;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+
+import com.datastax.driver.core.ConsistencyLevel;
+import o.a.c.analytics.sidecar.shaded.testing.common.data.QualifiedTableName;
+import org.apache.cassandra.analytics.ResiliencyTestBase;
+import org.apache.cassandra.analytics.TestTokenSupplier;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+
+import static junit.framework.TestCase.assertNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class JoiningBaseTest extends ResiliencyTestBase

Review Comment:
   In these scenario specific base classes, we're only grouping common 
functionality instead of defining a contract for subclasses to implement, so 
this seemed appropriate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-12-06 Thread via GitHub


arjunashok commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1417772294


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java:
##
@@ -132,37 +133,32 @@ public List 
write(Iterator> sourceI
 Map valueMap = new HashMap<>();
 try
 {
-List exclusions = 
failureHandler.getFailedInstances();
 Set> newRanges = 
initialTokenRangeMapping.getRangeMap().asMapOfRanges().entrySet()

.stream()
-   
.filter(e -> !exclusions.contains(e.getValue()))

.map(Map.Entry::getKey)

.collect(Collectors.toSet());
+Range tokenRange = getTokenRange(taskContext);
+Set> subRanges = newRanges.contains(tokenRange) ?
+   
Collections.singleton(tokenRange) :
+   
getIntersectingSubRanges(newRanges, tokenRange);
 
 while (dataIterator.hasNext())
 {
 Tuple2 rowData = dataIterator.next();
-streamSession = maybeCreateStreamSession(taskContext, 
streamSession, rowData, newRanges, failureHandler);
-
-sessions.add(streamSession);
+streamSession = maybeCreateStreamSession(taskContext, 
streamSession, rowData, subRanges, failureHandler, results);
 maybeCreateTableWriter(partitionId, baseDir);
 writeRow(rowData, valueMap, partitionId, 
streamSession.getTokenRange());
 checkBatchSize(streamSession, partitionId, job);
 }
 
-// Finalize SSTable for the last StreamSession
-if (sstableWriter != null || (streamSession != null && batchSize 
!= 0))
+// Cleanup SSTable writer and schedule the last stream

Review Comment:
   Makes sense.



##
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java:
##
@@ -346,19 +366,22 @@ void writeBuffered()
 
 private void validateSuccessfulWrite(MockBulkWriterContext writerContext,
  Iterator> data,
- String[] columnNames)
+ String[] columnNames) throws 
InterruptedException
 {
 validateSuccessfulWrite(writerContext, data, columnNames, 
UPLOADED_TABLES);
 }
 
 private void validateSuccessfulWrite(MockBulkWriterContext writerContext,
  Iterator> data,
  String[] columnNames,
- int uploadedTables)
+ int uploadedTables) throws 
InterruptedException
 {
 RecordWriter rw = new RecordWriter(writerContext, columnNames, () -> 
tc, SSTableWriter::new);
 rw.write(data);
+// Wait for uploads to finish
+Thread.sleep(500);

Review Comment:
   In general, I agree with the flakiness introduced by sleep. This was added 
because when the entire test suite was executed, we did see the uploads not 
finishing before we look up the no. files that we uploaded. We could 
potentially use a latch in the `MockBulkWriterContext` to make this more 
deterministic. Will explore some more.



##
cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java:
##
@@ -90,16 +88,25 @@ public InstanceMetadata instanceFromId(int id) throws 
NoSuchElementException
  * @return instance meta information
  * @throws NoSuchElementException when the instance for {@code host} 
does not exist
  */
+@Override
 public InstanceMetadata instanceFromHost(String host) throws 
NoSuchElementException
 {
-return cassandraTestContext.instancesConfig.instanceFromHost(host);
+return 
cassandraTestContext.instancesConfig().instanceFromHost(host);
 }
 }
 
 @Provides
 @Singleton
 public SidecarConfiguration sidecarConfiguration()
 {
-return new SidecarConfigurationImpl(new 
ServiceConfigurationImpl("127.0.0.1"));
+ServiceConfiguration conf = ServiceConfigurationImpl.builder()
+.host("0.0.0.0") 
// binds to all interfaces, potential security issue if left running for long

Review Comment:
   Will defer this one to @JeetKunDoug.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail:

Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-12-06 Thread via GitHub


yifan-c commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1416591381


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java:
##
@@ -207,46 +203,62 @@ private Set 
instancesFromMapping(Map, List rowData,
-   Set> 
newRanges,
-   
ReplicaAwareFailureHandler failureHandler) throws IOException
+   Set> 
subRanges,
+   
ReplicaAwareFailureHandler failureHandler,
+   List results)
+throws IOException, ExecutionException, InterruptedException
 {
 BigInteger token = rowData._1().getToken();
 Range tokenRange = getTokenRange(taskContext);
 
 Preconditions.checkState(tokenRange.contains(token),
  String.format("Received Token %s outside of 
expected range %s", token, tokenRange));
 
-// token range for this partition is not among the write-replica-set 
ranges
-if (!newRanges.contains(tokenRange))
+// We have split ranges likely resulting from pending nodes
+// Evaluate creating a new session if the token from current row is 
part of a sub-range
+if (subRanges.size() > 1)
 {
-Set> subRanges = 
getIntersectingSubRanges(newRanges, tokenRange);
-// We have split ranges - likely resulting from pending nodes
-if (subRanges.size() > 1)
-{
-// Create session using sub-range that contains the token from 
current row
-Range matchingRange = subRanges.stream().filter(r 
-> r.contains(token)).findFirst().get();
-Preconditions.checkState(matchingRange != null,
- String.format("Received Token %s 
outside of expected range %s", token, matchingRange));
+// Create session using sub-range that contains the token from 
current row
+Range matchingSubRange = subRanges.stream().filter(r 
-> r.contains(token)).findFirst().get();
+Preconditions.checkState(matchingSubRange != null,
+ String.format("Received Token %s outside 
of expected range %s", token, matchingSubRange));

Review Comment:
   The `checkState` will not ever see `matchingSubRange == null`. The reason is 
that at line#222, if the value is null, the `get()` operation throws exception 
already. 
   If the intent to provide a more user friendly error message, can you not 
call `get()` and use `Optional> matchingSubRangeOpt` to 
capture the result and run `checkState` on the optional.



##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java:
##
@@ -132,37 +133,32 @@ public List 
write(Iterator> sourceI
 Map valueMap = new HashMap<>();
 try
 {
-List exclusions = 
failureHandler.getFailedInstances();
 Set> newRanges = 
initialTokenRangeMapping.getRangeMap().asMapOfRanges().entrySet()

.stream()
-   
.filter(e -> !exclusions.contains(e.getValue()))

.map(Map.Entry::getKey)

.collect(Collectors.toSet());
+Range tokenRange = getTokenRange(taskContext);
+Set> subRanges = newRanges.contains(tokenRange) ?
+   
Collections.singleton(tokenRange) :
+   
getIntersectingSubRanges(newRanges, tokenRange);
 
 while (dataIterator.hasNext())
 {
 Tuple2 rowData = dataIterator.next();
-streamSession = maybeCreateStreamSession(taskContext, 
streamSession, rowData, newRanges, failureHandler);
-
-sessions.add(streamSession);
+streamSession = maybeCreateStreamSession(taskContext, 
streamSession, rowData, subRanges, failureHandler, results);
 maybeCreateTableWriter(partitionId, baseDir);
 writeRow(rowData, valueMap, partitionId, 
streamSession.getTokenRange());
 checkBatchSize(streamSession, partitionId, job);
 }
 
-// Finalize SSTable for the last StreamSession
-if (sstableWriter != null || (streamSession != null && batchSize 
!= 0))
+// Cleanup SSTable writer and schedule the last stream

Review Comment:
   "Cleanup SSTable writer" reads wrong to me. I would stick with "Finalize". 
The code is to flush any data to sstable by closing the writer. Cleanup leads 
me to th

Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-11-30 Thread via GitHub


arjunashok commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1411436951


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java:
##
@@ -136,35 +185,122 @@ public StreamResult write(Iterator> sourceIterato
 }
 }
 
+private Map, List> 
taskTokenRangeMapping(TokenRangeMapping tokenRange,
+ 
Range taskTokenRange)
+{
+return tokenRange.getSubRanges(taskTokenRange).asMapOfRanges();
+}
+
+private Set instancesFromMapping(Map, 
List> mapping)
+{
+return mapping.values()
+  .stream()
+  .flatMap(Collection::stream)
+  .collect(Collectors.toSet());
+}
+
+/**
+ * Creates a new session if we have the current token range intersecting 
the ranges from write replica-set.
+ * If we do find the need to split a range into sub-ranges, we create the 
corresponding session for the sub-range
+ * if the token from the row data belongs to the range.
+ */
+private StreamSession maybeCreateStreamSession(TaskContext taskContext,
+   StreamSession streamSession,
+   Tuple2 rowData,
+   Set> 
newRanges,
+   
ReplicaAwareFailureHandler failureHandler) throws IOException
+{
+BigInteger token = rowData._1().getToken();
+Range tokenRange = getTokenRange(taskContext);
+
+Preconditions.checkState(tokenRange.contains(token),
+ String.format("Received Token %s outside of 
expected range %s", token, tokenRange));
+
+// token range for this partition is not among the write-replica-set 
ranges
+if (!newRanges.contains(tokenRange))

Review Comment:
   Makes sense. Pulled this out of the loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-11-30 Thread via GitHub


arjunashok commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1411436190


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java:
##
@@ -110,20 +132,47 @@ public StreamResult write(Iterator> sourceIterato
 Map valueMap = new HashMap<>();
 try
 {
+List exclusions = 
failureHandler.getFailedInstances();
+Set> newRanges = 
initialTokenRangeMapping.getRangeMap().asMapOfRanges().entrySet()
+   
.stream()
+   
.filter(e -> !exclusions.contains(e.getValue()))
+   
.map(Map.Entry::getKey)
+   
.collect(Collectors.toSet());
+
 while (dataIterator.hasNext())
 {
+Tuple2 rowData = dataIterator.next();
+streamSession = maybeCreateStreamSession(taskContext, 
streamSession, rowData, newRanges, failureHandler);
+
+sessions.add(streamSession);
 maybeCreateTableWriter(partitionId, baseDir);
-writeRow(valueMap, dataIterator, partitionId, range);
+writeRow(rowData, valueMap, partitionId, 
streamSession.getTokenRange());
 checkBatchSize(streamSession, partitionId, job);
 }
 
-if (sstableWriter != null)
+// Finalize SSTable for the last StreamSession
+if (sstableWriter != null || (streamSession != null && batchSize 
!= 0))

Review Comment:
   Good point, removing redundant checks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-11-30 Thread via GitHub


arjunashok commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1411436061


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java:
##
@@ -110,20 +132,47 @@ public StreamResult write(Iterator> sourceIterato
 Map valueMap = new HashMap<>();
 try
 {
+List exclusions = 
failureHandler.getFailedInstances();
+Set> newRanges = 
initialTokenRangeMapping.getRangeMap().asMapOfRanges().entrySet()
+   
.stream()
+   
.filter(e -> !exclusions.contains(e.getValue()))

Review Comment:
   Good catch. Evaluating if we even need this filter anymore. Will update.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-11-30 Thread via GitHub


arjunashok commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1411327830


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java:
##
@@ -110,20 +132,47 @@ public StreamResult write(Iterator> sourceIterato
 Map valueMap = new HashMap<>();
 try
 {
+List exclusions = 
failureHandler.getFailedInstances();
+Set> newRanges = 
initialTokenRangeMapping.getRangeMap().asMapOfRanges().entrySet()
+   
.stream()
+   
.filter(e -> !exclusions.contains(e.getValue()))
+   
.map(Map.Entry::getKey)
+   
.collect(Collectors.toSet());
+
 while (dataIterator.hasNext())
 {
+Tuple2 rowData = dataIterator.next();
+streamSession = maybeCreateStreamSession(taskContext, 
streamSession, rowData, newRanges, failureHandler);
+
+sessions.add(streamSession);

Review Comment:
   This was done to separate the session closures instead of having it 
scattered with checks for non-existent sessions. 
   
   However, on looking further it seems like we do not really need to eagerly 
create the session for the partition's token range. Will update for it to be 
created lazily.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org



Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-11-29 Thread via GitHub


arjunashok commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1409995629


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java:
##
@@ -136,35 +185,122 @@ public StreamResult write(Iterator> sourceIterato
 }
 }
 
+private Map, List> 
taskTokenRangeMapping(TokenRangeMapping tokenRange,
+ 
Range taskTokenRange)
+{
+return tokenRange.getSubRanges(taskTokenRange).asMapOfRanges();
+}
+
+private Set instancesFromMapping(Map, 
List> mapping)
+{
+return mapping.values()
+  .stream()
+  .flatMap(Collection::stream)
+  .collect(Collectors.toSet());
+}
+
+/**
+ * Creates a new session if we have the current token range intersecting 
the ranges from write replica-set.
+ * If we do find the need to split a range into sub-ranges, we create the 
corresponding session for the sub-range
+ * if the token from the row data belongs to the range.
+ */
+private StreamSession maybeCreateStreamSession(TaskContext taskContext,
+   StreamSession streamSession,
+   Tuple2 rowData,
+   Set> 
newRanges,
+   
ReplicaAwareFailureHandler failureHandler) throws IOException
+{
+BigInteger token = rowData._1().getToken();
+Range tokenRange = getTokenRange(taskContext);
+
+Preconditions.checkState(tokenRange.contains(token),
+ String.format("Received Token %s outside of 
expected range %s", token, tokenRange));
+
+// token range for this partition is not among the write-replica-set 
ranges
+if (!newRanges.contains(tokenRange))
+{
+Set> subRanges = 
getIntersectingSubRanges(newRanges, tokenRange);
+// We have split ranges - likely resulting from pending nodes
+if (subRanges.size() > 1)
+{
+// Create session using sub-range that contains the token from 
current row
+Range matchingRange = subRanges.stream().filter(r 
-> r.contains(token)).findFirst().get();
+Preconditions.checkState(matchingRange != null,
+ String.format("Received Token %s 
outside of expected range %s", token, matchingRange));
+
+if (streamSession != null && streamSession.getTokenRange() == 
matchingRange)
+{
+return streamSession;
+}
+else
+{
+LOGGER.debug(String.format("[{}] Creating stream session 
for range: %s", matchingRange), taskContext.partitionId());
+if (streamSession != null && batchSize != 0)
+{
+finalizeSSTable(streamSession, 
taskContext.partitionId(), sstableWriter, batchNumber, batchSize);
+sstableWriter = null;
+batchSize = 0;
+}
+return new StreamSession(writerContext, 
getStreamId(taskContext), matchingRange, failureHandler);
+}
+}
+}
+
+return (streamSession != null) ? streamSession : 
createStreamSession(taskContext);
+}
+
+/**
+ * Get ranges from the set that intersect and/or overlap with the provided 
token range
+ */
+private Set> 
getIntersectingSubRanges(Set> ranges, Range 
tokenRange)
+{
+return ranges.stream()
+ .filter(r -> r.isConnected(tokenRange) && 
!r.intersection(tokenRange).isEmpty())
+ .collect(Collectors.toSet());
+}
+
+private boolean 
haveTokenRangeMappingsChanged(TokenRangeMapping startTaskMapping, 
TaskContext taskContext)
+{
+Range taskTokenRange = getTokenRange(taskContext);
+// Get the uncached, current view of the ring to compare with initial 
ring
+TokenRangeMapping endTaskMapping = 
writerContext.cluster().getTokenRangeMapping(false);
+Map, List> startMapping = 
taskTokenRangeMapping(startTaskMapping, taskTokenRange);
+Map, List> endMapping = 
taskTokenRangeMapping(endTaskMapping, taskTokenRange);
+
+return !(startMapping.keySet().equals(endMapping.keySet()) &&
+   
instancesFromMapping(startMapping).equals(instancesFromMapping(endMapping)));

Review Comment:
   Thanks for the input.
   
   We're checking if either the token-ranges or the set of instances differ 
between the start and end of the task to determine if it should fail the task 
(and retry).
   
   In your example, if the the node is joining at the beginning of the task and 
has completed join

Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-11-29 Thread via GitHub


yifan-c commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1409303414


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java:
##
@@ -136,35 +185,122 @@ public StreamResult write(Iterator> sourceIterato
 }
 }
 
+private Map, List> 
taskTokenRangeMapping(TokenRangeMapping tokenRange,
+ 
Range taskTokenRange)
+{
+return tokenRange.getSubRanges(taskTokenRange).asMapOfRanges();
+}
+
+private Set instancesFromMapping(Map, 
List> mapping)
+{
+return mapping.values()
+  .stream()
+  .flatMap(Collection::stream)
+  .collect(Collectors.toSet());
+}
+
+/**
+ * Creates a new session if we have the current token range intersecting 
the ranges from write replica-set.
+ * If we do find the need to split a range into sub-ranges, we create the 
corresponding session for the sub-range
+ * if the token from the row data belongs to the range.
+ */
+private StreamSession maybeCreateStreamSession(TaskContext taskContext,
+   StreamSession streamSession,
+   Tuple2 rowData,
+   Set> 
newRanges,
+   
ReplicaAwareFailureHandler failureHandler) throws IOException
+{
+BigInteger token = rowData._1().getToken();
+Range tokenRange = getTokenRange(taskContext);
+
+Preconditions.checkState(tokenRange.contains(token),
+ String.format("Received Token %s outside of 
expected range %s", token, tokenRange));
+
+// token range for this partition is not among the write-replica-set 
ranges
+if (!newRanges.contains(tokenRange))
+{
+Set> subRanges = 
getIntersectingSubRanges(newRanges, tokenRange);
+// We have split ranges - likely resulting from pending nodes
+if (subRanges.size() > 1)
+{
+// Create session using sub-range that contains the token from 
current row
+Range matchingRange = subRanges.stream().filter(r 
-> r.contains(token)).findFirst().get();
+Preconditions.checkState(matchingRange != null,
+ String.format("Received Token %s 
outside of expected range %s", token, matchingRange));
+
+if (streamSession != null && streamSession.getTokenRange() == 
matchingRange)
+{
+return streamSession;
+}
+else
+{
+LOGGER.debug(String.format("[{}] Creating stream session 
for range: %s", matchingRange), taskContext.partitionId());
+if (streamSession != null && batchSize != 0)
+{
+finalizeSSTable(streamSession, 
taskContext.partitionId(), sstableWriter, batchNumber, batchSize);
+sstableWriter = null;
+batchSize = 0;
+}
+return new StreamSession(writerContext, 
getStreamId(taskContext), matchingRange, failureHandler);
+}
+}
+}
+
+return (streamSession != null) ? streamSession : 
createStreamSession(taskContext);
+}
+
+/**
+ * Get ranges from the set that intersect and/or overlap with the provided 
token range
+ */
+private Set> 
getIntersectingSubRanges(Set> ranges, Range 
tokenRange)
+{
+return ranges.stream()
+ .filter(r -> r.isConnected(tokenRange) && 
!r.intersection(tokenRange).isEmpty())
+ .collect(Collectors.toSet());
+}
+
+private boolean 
haveTokenRangeMappingsChanged(TokenRangeMapping startTaskMapping, 
TaskContext taskContext)
+{
+Range taskTokenRange = getTokenRange(taskContext);
+// Get the uncached, current view of the ring to compare with initial 
ring
+TokenRangeMapping endTaskMapping = 
writerContext.cluster().getTokenRangeMapping(false);
+Map, List> startMapping = 
taskTokenRangeMapping(startTaskMapping, taskTokenRange);
+Map, List> endMapping = 
taskTokenRangeMapping(endTaskMapping, taskTokenRange);
+
+return !(startMapping.keySet().equals(endMapping.keySet()) &&
+   
instancesFromMapping(startMapping).equals(instancesFromMapping(endMapping)));

Review Comment:
   nit: I think this is easier to read with less parenthesis. 
   ```suggestion
   return !Objects.equals(startMapping.keySet(), endMapping.keySet()) ||
  !Objects.equals(instancesFromMapping(startMapping), 
instancesFromMapping(endMapping));
   ```
   
   A follow up question. I

Re: [PR] Cassandra 18852: Make bulk writer resilient to cluster resize events [cassandra-analytics]

2023-10-25 Thread via GitHub


arjunashok commented on code in PR #17:
URL: 
https://github.com/apache/cassandra-analytics/pull/17#discussion_r1372321724


##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ConsistencyLevel.java:
##
@@ -20,18 +20,43 @@
 package org.apache.cassandra.spark.bulkwriter.token;
 
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import com.google.common.base.Preconditions;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.spark.common.model.CassandraInstance;
 import org.apache.cassandra.spark.data.ReplicationFactor;
 
 public interface ConsistencyLevel
 {
 boolean isLocal();
 
-boolean checkConsistency(Collection 
failedInsts, ReplicationFactor replicationFactor, String localDC);
+Logger LOGGER = LoggerFactory.getLogger(ConsistencyLevel.class);
+
+/**
+ * Checks if the consistency guarantees are maintained, given the failed, 
blocked and replacing instances, consistency-level and the replication-factor.
+ * 
+ * - QUORUM based consistency levels check for quorum using the 
write-replica-set (instead of RF) as they include healthy and pending nodes.
+ *   This is done to ensure that writes go to a quorum of healthy nodes 
while accounting for potential failure in pending nodes becoming healthy.
+ * - ONE and TWO consistency guarantees are maintained by ensuring that 
the failures leave us with at-least the corresponding healthy
+ *   (and non-pending) nodes.
+ *
+ *   For both the above cases, blocked instances are also considered as 
failures while performing consistency checks.
+ *   Write replicas are adjusted to exclude replacement nodes for 
consistency checks, if we have replacement nodes that are not among the failed 
instances.
+ *   This is to ensure that we are writing to sufficient non-replacement 
nodes as replacements can potentially fail leaving us with fewer nodes.
+ * 
+ */
+boolean checkConsistency(CassandraRing ring,
+ TokenRangeMapping 
tokenRangeMapping,

Review Comment:
   Addressed



##
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ConsistencyLevel.java:
##
@@ -82,19 +119,25 @@ public boolean checkConsistency(Collection failedIn
 },
 QUORUM
 {
+// Keyspaces exist with RF 1 or 2
 @Override
 public boolean isLocal()
 {
 return false;
 }
 
 @Override
-public boolean checkConsistency(Collection failedInsts,
-ReplicationFactor 
replicationFactor,
-String localDC)
+public boolean checkConsistency(final CassandraRing ring,
+final TokenRangeMapping tokenRangeMapping,
+final Collection failedInsts,
+final String localDC)
 {
-int rf = replicationFactor.getTotalReplicationFactor();
-return failedInsts.size() <= (rf - (rf / 2 + 1));
+Set replacingInstances = 
tokenRangeMapping.getReplacementInstances();
+Set failedInstanceIPs = 
failedInsts.stream().map(CassandraInstance::getIpAddress).collect(Collectors.toSet());
+final long writeReplicaCount = 
maybeUpdateWriteReplicasForReplacements(tokenRangeMapping.getWriteReplicas(),
+   
replacingInstances,
+   
failedInstanceIPs);

Review Comment:
   Addressed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org