This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-cassandra.git

commit 191e50aeac8b670de070ed712d6b5ac73305a2ed
Author: Etienne Chauchot <[email protected]>
AuthorDate: Tue Sep 9 13:05:54 2025 +0200

    [FLINK-37937] Deal with timeouts: put consistency level to ONE to be 
coherent between read and write request because CL=ANY in write requests could 
lead to hint writes that would be invisible to subsequent read requests. Raise 
write request timeout to the same timeout as read request. Put replication 
factor to 2 to deal with temporary down cassandra container.
    But back sequential start of the 2 containers.
---
 .../connectors/cassandra/example/BatchPojoExample.java   |  2 +-
 .../connector/cassandra/CassandraTestEnvironment.java    | 16 +++++++++-------
 .../connector/cassandra/source/CassandraTestContext.java |  8 +++++++-
 .../src/test/resources/cassandra.yaml                    |  2 +-
 4 files changed, 18 insertions(+), 10 deletions(-)

diff --git 
a/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
 
b/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
index 441cc09..729d4bc 100644
--- 
a/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
+++ 
b/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
@@ -86,7 +86,7 @@ public class BatchPojoExample {
                                 Pojo.class,
                                 () ->
                                         new Mapper.Option[] {
-                                            
Mapper.Option.consistencyLevel(ConsistencyLevel.ANY)
+                                            
Mapper.Option.consistencyLevel(ConsistencyLevel.ONE)
                                         }));
 
         inputDS.print();
diff --git 
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java
 
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java
index bb8d2e5..59604a2 100644
--- 
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java
+++ 
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java
@@ -40,7 +40,6 @@ import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.containers.wait.CassandraQueryWaitStrategy;
 import org.testcontainers.junit.jupiter.Container;
 import org.testcontainers.junit.jupiter.Testcontainers;
-import org.testcontainers.lifecycle.Startables;
 import org.testcontainers.utility.MountableFile;
 
 import java.net.InetSocketAddress;
@@ -65,7 +64,7 @@ public class CassandraTestEnvironment implements TestResource 
{
     private static final String CREATE_KEYSPACE_QUERY =
             "CREATE KEYSPACE "
                     + KEYSPACE
-                    + " WITH replication= {'class':'SimpleStrategy', 
'replication_factor':1};";
+                    + " WITH replication= {'class':'SimpleStrategy', 
'replication_factor':2};";
 
     public static final String SPLITS_TABLE = "flinksplits";
     /*
@@ -90,6 +89,7 @@ public class CassandraTestEnvironment implements TestResource 
{
                     + " (col1, col2, col3, col4)"
                     + " VALUES (%d, %d, %d, %d)";
     private static final int NB_SPLITS_RECORDS = 1000;
+    private static final int STARTUP_TIMEOUT_MINUTES = 3;
 
     @Container private final CassandraContainer cassandraContainer1;
     @Container private final CassandraContainer cassandraContainer2;
@@ -144,16 +144,19 @@ public class CassandraTestEnvironment implements 
TestResource {
         // configure container start to wait until cassandra is ready to 
receive queries
         // start with retrials
         cassandraContainer1.waitingFor(
-                new 
CassandraQueryWaitStrategy().withStartupTimeout(Duration.ofMinutes(2)));
+                new CassandraQueryWaitStrategy()
+                        
.withStartupTimeout(Duration.ofMinutes(STARTUP_TIMEOUT_MINUTES)));
         cassandraContainer2.waitingFor(
-                new 
CassandraQueryWaitStrategy().withStartupTimeout(Duration.ofMinutes(2)));
-        Startables.deepStart(cassandraContainer1, cassandraContainer2).join();
+                new CassandraQueryWaitStrategy()
+                        
.withStartupTimeout(Duration.ofMinutes(STARTUP_TIMEOUT_MINUTES)));
+        cassandraContainer1.start();
         cassandraContainer1.followOutput(
                 new Slf4jLogConsumer(LOG),
                 OutputFrame.OutputType.END,
                 OutputFrame.OutputType.STDERR,
                 OutputFrame.OutputType.STDOUT);
 
+        cassandraContainer2.start();
         cassandraContainer2.followOutput(
                 new Slf4jLogConsumer(LOG),
                 OutputFrame.OutputType.END,
@@ -168,10 +171,9 @@ public class CassandraTestEnvironment implements 
TestResource {
                         cassandraContainer1.getHost(),
                         cassandraContainer1.getMappedPort(CQL_PORT));
         queryValidator = new QueryValidator(builderForReading);
-        // Lower consistency level ANY is only available for writing.
         builderForWriting =
                 createBuilderWithConsistencyLevel(
-                        ConsistencyLevel.ANY,
+                        ConsistencyLevel.ONE,
                         cassandraContainer1.getHost(),
                         cassandraContainer1.getMappedPort(CQL_PORT));
         session = cluster.connect();
diff --git 
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestContext.java
 
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestContext.java
index 8aa338a..fc2198a 100644
--- 
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestContext.java
+++ 
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestContext.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.connector.testframe.external.source.TestingSourceSetting
 import org.apache.flink.connectors.cassandra.utils.Pojo;
 import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
 
+import com.datastax.driver.core.ConsistencyLevel;
 import com.datastax.driver.mapping.Mapper;
 import com.datastax.driver.mapping.MappingManager;
 
@@ -65,7 +66,12 @@ public class CassandraTestContext implements 
DataStreamSourceExternalContext<Poj
         this.cassandraTestEnvironment = cassandraTestEnvironment;
         createTable();
         mapper = new 
MappingManager(cassandraTestEnvironment.getSession()).mapper(Pojo.class);
-        mapperOptions = () -> new Mapper.Option[] 
{Mapper.Option.saveNullFields(true)};
+        mapperOptions =
+                () ->
+                        new Mapper.Option[] {
+                            Mapper.Option.saveNullFields(true),
+                            
Mapper.Option.consistencyLevel(ConsistencyLevel.ONE)
+                        };
     }
 
     @Override
diff --git a/flink-connector-cassandra/src/test/resources/cassandra.yaml 
b/flink-connector-cassandra/src/test/resources/cassandra.yaml
index 71566fe..cbf69ea 100644
--- a/flink-connector-cassandra/src/test/resources/cassandra.yaml
+++ b/flink-connector-cassandra/src/test/resources/cassandra.yaml
@@ -1123,7 +1123,7 @@ range_request_timeout: 10000ms
 # How long the coordinator should wait for writes to complete.
 # Lowest acceptable value is 10 ms.
 # Min unit: ms
-write_request_timeout: 6000ms
+write_request_timeout: 15000ms
 # How long the coordinator should wait for counter writes to complete.
 # Lowest acceptable value is 10 ms.
 # Min unit: ms

Reply via email to