This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-cassandra.git

commit 444164900103b1a36b6d2135c27e2f18cc9af9b5
Author: Etienne Chauchot <[email protected]>
AuthorDate: Mon Jul 28 14:25:26 2025 +0200

    [FLINK-37937] Use nodetool refreshsizeestimates in addition to flush to 
update size estimates: flush updates the SSTables and refreshsizeestimates 
updates the size estimates based on them
---
 .../cassandra/CassandraTestEnvironment.java        | 36 +++++++++++++---------
 1 file changed, 21 insertions(+), 15 deletions(-)

diff --git 
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java
 
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java
index 5cd21de..2e89383 100644
--- 
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java
+++ 
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java
@@ -29,15 +29,13 @@ import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.SimpleStatement;
 import com.datastax.driver.core.SocketOptions;
-import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.CassandraContainer;
-import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Container.ExecResult;
 import org.testcontainers.containers.Network;
 import org.testcontainers.containers.output.OutputFrame;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.containers.wait.CassandraQueryWaitStrategy;
 import org.testcontainers.containers.wait.strategy.Wait;
 import org.testcontainers.junit.jupiter.Container;
 import org.testcontainers.junit.jupiter.Testcontainers;
@@ -57,9 +55,6 @@ public class CassandraTestEnvironment implements TestResource 
{
 
     private static final int READ_TIMEOUT_MILLIS = 36000;
 
-    // flushing mem table to SS tables is an asynchronous operation that may 
take a while
-    private static final long FLUSH_MEMTABLES_DELAY = 30_000L;
-
     public static final String KEYSPACE = "flink";
 
     private static final String CREATE_KEYSPACE_QUERY =
@@ -90,6 +85,7 @@ public class CassandraTestEnvironment implements TestResource 
{
                     + " (col1, col2, col3, col4)"
                     + " VALUES (%d, %d, %d, %d)";
     private static final int NB_SPLITS_RECORDS = 1000;
+    private static final long FLSUH_MEMTABLES_DELAY = 30_000L;
 
     @Container private final CassandraContainer cassandraContainer1;
     @Container private final CassandraContainer cassandraContainer2;
@@ -184,7 +180,7 @@ public class CassandraTestEnvironment implements 
TestResource {
                         cassandraContainer1.getMappedPort(CQL_PORT));
         session = cluster.connect();
         executeRequestWithTimeout(CREATE_KEYSPACE_QUERY);
-        // create a dedicated table for split size tests (to avoid having to 
flush with each test)
+        // create a dedicated table for split size tests
         if (insertTestDataForSplitSizeTests) {
             insertTestDataForSplitSizeTests();
         }
@@ -196,7 +192,7 @@ public class CassandraTestEnvironment implements 
TestResource {
         for (int i = 0; i < NB_SPLITS_RECORDS; i++) {
             executeRequestWithTimeout(String.format(INSERT_INTO_FLINK_SPLITS, 
i, i, i, i));
         }
-        flushMemTables(SPLITS_TABLE);
+        refreshSizeEstimates(SPLITS_TABLE);
     }
 
     private void stopEnv() {
@@ -234,14 +230,24 @@ public class CassandraTestEnvironment implements 
TestResource {
     }
 
     /**
-     * Force the flush of cassandra memTables to SSTables in order to update 
size_estimates. It is
-     * needed for the tests because we just inserted records, we need to force 
cassandra to update
-     * size_estimates system table.
+     * Force the refresh of system.size_estimates table. It is needed for the 
tests because we just
+     * inserted records. It is done on a single node as the size estimation 
for split generation is
+     * evaluated based on the ring fraction the connect node represents in the 
cluster. We first
+     * flush the memTables to SSTables because the size estimates are only on 
SSTables. Then we refresh
+     * the size estimates.
      */
-    void flushMemTables(String table) throws Exception {
-        cassandraContainer1.execInContainer("nodetool", "flush", KEYSPACE, 
table);
-        cassandraContainer2.execInContainer("nodetool", "flush", KEYSPACE, 
table);
-        Thread.sleep(FLUSH_MEMTABLES_DELAY);
+    void refreshSizeEstimates(String table) throws Exception {
+        final ExecResult execResult1 =
+                cassandraContainer1.execInContainer(
+                        "nodetool", "flush", KEYSPACE, table);
+        Thread.sleep(FLSUH_MEMTABLES_DELAY);
+        final ExecResult execResult2 =
+                cassandraContainer1.execInContainer(
+                        "nodetool", "refreshsizeestimates");
+        if (execResult1.getExitCode() != 0 || execResult2.getExitCode() != 0) {
+            throw new RuntimeException(
+                    "Failed to refresh system.size_estimates on the Cassandra 
cluster");
+        }
     }
 
     public ResultSet executeRequestWithTimeout(String query) {

Reply via email to