This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-cassandra.git

commit ebbc2181c7bc80fd1ae3bee2008cb2133db41b0f
Author: Etienne Chauchot <[email protected]>
AuthorDate: Fri Jul 25 18:37:04 2025 +0200

    [FLINK-37937] Add a node to Cassandra testContainers cluster
---
 .../cassandra/CassandraTestEnvironment.java        | 64 +++++++++++++++++-----
 1 file changed, 50 insertions(+), 14 deletions(-)

diff --git 
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java
 
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java
index 7d5e568..5cd21de 100644
--- 
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java
+++ 
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java
@@ -34,13 +34,16 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.CassandraContainer;
 import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
 import org.testcontainers.containers.output.OutputFrame;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.containers.wait.CassandraQueryWaitStrategy;
+import org.testcontainers.containers.wait.strategy.Wait;
 import org.testcontainers.junit.jupiter.Container;
 import org.testcontainers.junit.jupiter.Testcontainers;
 
 import java.net.InetSocketAddress;
+import java.time.Duration;
 
 /**
  * Junit test environment that contains everything needed at the test suite 
level: testContainer
@@ -88,7 +91,8 @@ public class CassandraTestEnvironment implements TestResource 
{
                     + " VALUES (%d, %d, %d, %d)";
     private static final int NB_SPLITS_RECORDS = 1000;
 
-    @Container private final CassandraContainer cassandraContainer;
+    @Container private final CassandraContainer cassandraContainer1;
+    @Container private final CassandraContainer cassandraContainer2;
 
     boolean insertTestDataForSplitSizeTests;
     private Cluster cluster;
@@ -99,13 +103,31 @@ public class CassandraTestEnvironment implements 
TestResource {
 
     public CassandraTestEnvironment(boolean insertTestDataForSplitSizeTests) {
         this.insertTestDataForSplitSizeTests = insertTestDataForSplitSizeTests;
-        cassandraContainer = new CassandraContainer(DOCKER_CASSANDRA_IMAGE);
-        // more generous timeouts
+
+        Network network = Network.newNetwork();
+        cassandraContainer1 = (CassandraContainer) new 
CassandraContainer(DOCKER_CASSANDRA_IMAGE)
+                .withNetwork(network)
+                .withEnv("CASSANDRA_CLUSTER_NAME", "testcontainers")
+                .withEnv("CASSANDRA_SEEDS", "cassandra")
+                .withEnv("JVM_OPTS", "")
+                .withNetworkAliases("cassandra");
+
+        addJavaOpts(
+                cassandraContainer1,
+                "-Dcassandra.request_timeout_in_ms=30000",
+                "-Dcassandra.read_request_timeout_in_ms=15000",
+                "-Dcassandra.write_request_timeout_in_ms=6000");
+        cassandraContainer2 = (CassandraContainer) new 
CassandraContainer(DOCKER_CASSANDRA_IMAGE)
+                .withNetwork(network)
+                .withEnv("CASSANDRA_CLUSTER_NAME", "testcontainers")
+                .withEnv("JVM_OPTS", "")
+                .withEnv("CASSANDRA_SEEDS", "cassandra");
         addJavaOpts(
-                cassandraContainer,
+                cassandraContainer2,
                 "-Dcassandra.request_timeout_in_ms=30000",
                 "-Dcassandra.read_request_timeout_in_ms=15000",
                 "-Dcassandra.write_request_timeout_in_ms=6000");
+
     }
 
     @Override
@@ -125,29 +147,41 @@ public class CassandraTestEnvironment implements 
TestResource {
 
     private void startEnv() throws Exception {
         // configure container start to wait until cassandra is ready to 
receive queries
-        cassandraContainer.waitingFor(new CassandraQueryWaitStrategy());
         // start with retrials
-        cassandraContainer.start();
-        cassandraContainer.followOutput(
+        cassandraContainer1.waitingFor(
+                Wait.forLogMessage(".*Startup complete.*", 1)
+                        .withStartupTimeout(Duration.ofMinutes(2)));
+        cassandraContainer1.start();
+        cassandraContainer1.followOutput(
+                new Slf4jLogConsumer(LOG),
+                OutputFrame.OutputType.END,
+                OutputFrame.OutputType.STDERR,
+                OutputFrame.OutputType.STDOUT);
+
+        cassandraContainer2.waitingFor(
+                Wait.forLogMessage(".*Startup complete.*", 1)
+                        .withStartupTimeout(Duration.ofMinutes(2)));
+        cassandraContainer2.start();
+        cassandraContainer2.followOutput(
                 new Slf4jLogConsumer(LOG),
                 OutputFrame.OutputType.END,
                 OutputFrame.OutputType.STDERR,
                 OutputFrame.OutputType.STDOUT);
 
-        cluster = cassandraContainer.getCluster();
+        cluster = cassandraContainer1.getCluster();
         // ConsistencyLevel.ONE is the minimum level for reading
         builderForReading =
                 createBuilderWithConsistencyLevel(
                         ConsistencyLevel.ONE,
-                        cassandraContainer.getHost(),
-                        cassandraContainer.getMappedPort(CQL_PORT));
+                        cassandraContainer1.getHost(),
+                        cassandraContainer1.getMappedPort(CQL_PORT));
         queryValidator = new QueryValidator(builderForReading);
         // Lower consistency level ANY is only available for writing.
         builderForWriting =
                 createBuilderWithConsistencyLevel(
                         ConsistencyLevel.ANY,
-                        cassandraContainer.getHost(),
-                        cassandraContainer.getMappedPort(CQL_PORT));
+                        cassandraContainer1.getHost(),
+                        cassandraContainer1.getMappedPort(CQL_PORT));
         session = cluster.connect();
         executeRequestWithTimeout(CREATE_KEYSPACE_QUERY);
         // create a dedicated table for split size tests (to avoid having to 
flush with each test)
@@ -173,7 +207,8 @@ public class CassandraTestEnvironment implements 
TestResource {
         if (cluster != null) {
             cluster.close();
         }
-        cassandraContainer.stop();
+        cassandraContainer1.stop();
+        cassandraContainer2.stop();
     }
 
     private ClusterBuilder createBuilderWithConsistencyLevel(
@@ -204,7 +239,8 @@ public class CassandraTestEnvironment implements 
TestResource {
      * size_estimates system table.
      */
     void flushMemTables(String table) throws Exception {
-        cassandraContainer.execInContainer("nodetool", "flush", KEYSPACE, 
table);
+        cassandraContainer1.execInContainer("nodetool", "flush", KEYSPACE, 
table);
+        cassandraContainer2.execInContainer("nodetool", "flush", KEYSPACE, 
table);
         Thread.sleep(FLUSH_MEMTABLES_DELAY);
     }
 

Reply via email to