This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-cassandra.git
commit ebbc2181c7bc80fd1ae3bee2008cb2133db41b0f Author: Etienne Chauchot <[email protected]> AuthorDate: Fri Jul 25 18:37:04 2025 +0200 [FLINK-37937] Add a node to Cassandra testContainers cluster --- .../cassandra/CassandraTestEnvironment.java | 64 +++++++++++++++++----- 1 file changed, 50 insertions(+), 14 deletions(-) diff --git a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java index 7d5e568..5cd21de 100644 --- a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java +++ b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java @@ -34,13 +34,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.CassandraContainer; import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; import org.testcontainers.containers.output.OutputFrame; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.wait.CassandraQueryWaitStrategy; +import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import java.net.InetSocketAddress; +import java.time.Duration; /** * Junit test environment that contains everything needed at the test suite level: testContainer @@ -88,7 +91,8 @@ public class CassandraTestEnvironment implements TestResource { + " VALUES (%d, %d, %d, %d)"; private static final int NB_SPLITS_RECORDS = 1000; - @Container private final CassandraContainer cassandraContainer; + @Container private final CassandraContainer cassandraContainer1; + @Container private final CassandraContainer cassandraContainer2; boolean insertTestDataForSplitSizeTests; private Cluster cluster; @@ -99,13 +103,31 @@ public class CassandraTestEnvironment implements TestResource { public CassandraTestEnvironment(boolean insertTestDataForSplitSizeTests) { this.insertTestDataForSplitSizeTests = insertTestDataForSplitSizeTests; - cassandraContainer = new CassandraContainer(DOCKER_CASSANDRA_IMAGE); - // more generous timeouts + + Network network = Network.newNetwork(); + cassandraContainer1 = (CassandraContainer) new CassandraContainer(DOCKER_CASSANDRA_IMAGE) + .withNetwork(network) + .withEnv("CASSANDRA_CLUSTER_NAME", "testcontainers") + .withEnv("CASSANDRA_SEEDS", "cassandra") + .withEnv("JVM_OPTS", "") + .withNetworkAliases("cassandra"); + + addJavaOpts( + cassandraContainer1, + "-Dcassandra.request_timeout_in_ms=30000", + "-Dcassandra.read_request_timeout_in_ms=15000", + "-Dcassandra.write_request_timeout_in_ms=6000"); + cassandraContainer2 = (CassandraContainer) new CassandraContainer(DOCKER_CASSANDRA_IMAGE) + .withNetwork(network) + .withEnv("CASSANDRA_CLUSTER_NAME", "testcontainers") + .withEnv("JVM_OPTS", "") + .withEnv("CASSANDRA_SEEDS", "cassandra"); addJavaOpts( - cassandraContainer, + cassandraContainer2, "-Dcassandra.request_timeout_in_ms=30000", "-Dcassandra.read_request_timeout_in_ms=15000", "-Dcassandra.write_request_timeout_in_ms=6000"); + } @Override @@ -125,29 +147,41 @@ public class CassandraTestEnvironment implements TestResource { private void startEnv() throws Exception { // configure container start to wait until cassandra is ready to receive queries - cassandraContainer.waitingFor(new CassandraQueryWaitStrategy()); // start with retrials - cassandraContainer.start(); - cassandraContainer.followOutput( + cassandraContainer1.waitingFor( + Wait.forLogMessage(".*Startup complete.*", 1) + .withStartupTimeout(Duration.ofMinutes(2))); + cassandraContainer1.start(); + cassandraContainer1.followOutput( + new Slf4jLogConsumer(LOG), + OutputFrame.OutputType.END, + OutputFrame.OutputType.STDERR, + OutputFrame.OutputType.STDOUT); + + cassandraContainer2.waitingFor( + Wait.forLogMessage(".*Startup complete.*", 1) + .withStartupTimeout(Duration.ofMinutes(2))); + cassandraContainer2.start(); + cassandraContainer2.followOutput( new Slf4jLogConsumer(LOG), OutputFrame.OutputType.END, OutputFrame.OutputType.STDERR, OutputFrame.OutputType.STDOUT); - cluster = cassandraContainer.getCluster(); + cluster = cassandraContainer1.getCluster(); // ConsistencyLevel.ONE is the minimum level for reading builderForReading = createBuilderWithConsistencyLevel( ConsistencyLevel.ONE, - cassandraContainer.getHost(), - cassandraContainer.getMappedPort(CQL_PORT)); + cassandraContainer1.getHost(), + cassandraContainer1.getMappedPort(CQL_PORT)); queryValidator = new QueryValidator(builderForReading); // Lower consistency level ANY is only available for writing. builderForWriting = createBuilderWithConsistencyLevel( ConsistencyLevel.ANY, - cassandraContainer.getHost(), - cassandraContainer.getMappedPort(CQL_PORT)); + cassandraContainer1.getHost(), + cassandraContainer1.getMappedPort(CQL_PORT)); session = cluster.connect(); executeRequestWithTimeout(CREATE_KEYSPACE_QUERY); // create a dedicated table for split size tests (to avoid having to flush with each test) @@ -173,7 +207,8 @@ public class CassandraTestEnvironment implements TestResource { if (cluster != null) { cluster.close(); } - cassandraContainer.stop(); + cassandraContainer1.stop(); + cassandraContainer2.stop(); } private ClusterBuilder createBuilderWithConsistencyLevel( @@ -204,7 +239,8 @@ public class CassandraTestEnvironment implements TestResource { * size_estimates system table. */ void flushMemTables(String table) throws Exception { - cassandraContainer.execInContainer("nodetool", "flush", KEYSPACE, table); + cassandraContainer1.execInContainer("nodetool", "flush", KEYSPACE, table); + cassandraContainer2.execInContainer("nodetool", "flush", KEYSPACE, table); Thread.sleep(FLUSH_MEMTABLES_DELAY); }
