This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-cassandra.git
commit 191e50aeac8b670de070ed712d6b5ac73305a2ed Author: Etienne Chauchot <[email protected]> AuthorDate: Tue Sep 9 13:05:54 2025 +0200 [FLINK-37937] Deal with timeouts: put consistency level to ONE to be coherent between read and write request because CL=ANY in write requests could lead to hint writes that would be invisible to subsequent read requests. Raise write request timeout to the same timeout as read request. Put replication factor to 2 to deal with temporary down cassandra container. But back sequential start of the 2 containers. --- .../connectors/cassandra/example/BatchPojoExample.java | 2 +- .../connector/cassandra/CassandraTestEnvironment.java | 16 +++++++++------- .../connector/cassandra/source/CassandraTestContext.java | 8 +++++++- .../src/test/resources/cassandra.yaml | 2 +- 4 files changed, 18 insertions(+), 10 deletions(-) diff --git a/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java b/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java index 441cc09..729d4bc 100644 --- a/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java +++ b/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java @@ -86,7 +86,7 @@ public class BatchPojoExample { Pojo.class, () -> new Mapper.Option[] { - Mapper.Option.consistencyLevel(ConsistencyLevel.ANY) + Mapper.Option.consistencyLevel(ConsistencyLevel.ONE) })); inputDS.print(); diff --git a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java index bb8d2e5..59604a2 100644 --- a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java +++ b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java @@ -40,7 +40,6 @@ import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.wait.CassandraQueryWaitStrategy; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.lifecycle.Startables; import org.testcontainers.utility.MountableFile; import java.net.InetSocketAddress; @@ -65,7 +64,7 @@ public class CassandraTestEnvironment implements TestResource { private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE " + KEYSPACE - + " WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};"; + + " WITH replication= {'class':'SimpleStrategy', 'replication_factor':2};"; public static final String SPLITS_TABLE = "flinksplits"; /* @@ -90,6 +89,7 @@ public class CassandraTestEnvironment implements TestResource { + " (col1, col2, col3, col4)" + " VALUES (%d, %d, %d, %d)"; private static final int NB_SPLITS_RECORDS = 1000; + private static final int STARTUP_TIMEOUT_MINUTES = 3; @Container private final CassandraContainer cassandraContainer1; @Container private final CassandraContainer cassandraContainer2; @@ -144,16 +144,19 @@ public class CassandraTestEnvironment implements TestResource { // configure container start to wait until cassandra is ready to receive queries // start with retrials cassandraContainer1.waitingFor( - new CassandraQueryWaitStrategy().withStartupTimeout(Duration.ofMinutes(2))); + new CassandraQueryWaitStrategy() + .withStartupTimeout(Duration.ofMinutes(STARTUP_TIMEOUT_MINUTES))); cassandraContainer2.waitingFor( - new CassandraQueryWaitStrategy().withStartupTimeout(Duration.ofMinutes(2))); - Startables.deepStart(cassandraContainer1, cassandraContainer2).join(); + new CassandraQueryWaitStrategy() + .withStartupTimeout(Duration.ofMinutes(STARTUP_TIMEOUT_MINUTES))); + cassandraContainer1.start(); cassandraContainer1.followOutput( new Slf4jLogConsumer(LOG), OutputFrame.OutputType.END, OutputFrame.OutputType.STDERR, OutputFrame.OutputType.STDOUT); + cassandraContainer2.start(); cassandraContainer2.followOutput( new Slf4jLogConsumer(LOG), OutputFrame.OutputType.END, @@ -168,10 +171,9 @@ public class CassandraTestEnvironment implements TestResource { cassandraContainer1.getHost(), cassandraContainer1.getMappedPort(CQL_PORT)); queryValidator = new QueryValidator(builderForReading); - // Lower consistency level ANY is only available for writing. builderForWriting = createBuilderWithConsistencyLevel( - ConsistencyLevel.ANY, + ConsistencyLevel.ONE, cassandraContainer1.getHost(), cassandraContainer1.getMappedPort(CQL_PORT)); session = cluster.connect(); diff --git a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestContext.java b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestContext.java index 8aa338a..fc2198a 100644 --- a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestContext.java +++ b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestContext.java @@ -28,6 +28,7 @@ import org.apache.flink.connector.testframe.external.source.TestingSourceSetting import org.apache.flink.connectors.cassandra.utils.Pojo; import org.apache.flink.streaming.connectors.cassandra.MapperOptions; +import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.mapping.Mapper; import com.datastax.driver.mapping.MappingManager; @@ -65,7 +66,12 @@ public class CassandraTestContext implements DataStreamSourceExternalContext<Poj this.cassandraTestEnvironment = cassandraTestEnvironment; createTable(); mapper = new MappingManager(cassandraTestEnvironment.getSession()).mapper(Pojo.class); - mapperOptions = () -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)}; + mapperOptions = + () -> + new Mapper.Option[] { + Mapper.Option.saveNullFields(true), + Mapper.Option.consistencyLevel(ConsistencyLevel.ONE) + }; } @Override diff --git a/flink-connector-cassandra/src/test/resources/cassandra.yaml b/flink-connector-cassandra/src/test/resources/cassandra.yaml index 71566fe..cbf69ea 100644 --- a/flink-connector-cassandra/src/test/resources/cassandra.yaml +++ b/flink-connector-cassandra/src/test/resources/cassandra.yaml @@ -1123,7 +1123,7 @@ range_request_timeout: 10000ms # How long the coordinator should wait for writes to complete. # Lowest acceptable value is 10 ms. # Min unit: ms -write_request_timeout: 6000ms +write_request_timeout: 15000ms # How long the coordinator should wait for counter writes to complete. # Lowest acceptable value is 10 ms. # Min unit: ms
