This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-cassandra.git
commit 444164900103b1a36b6d2135c27e2f18cc9af9b5 Author: Etienne Chauchot <[email protected]> AuthorDate: Mon Jul 28 14:25:26 2025 +0200 [FLINK-37937] Use nodetool refreshsizeestimates in addition to flush to update size estimates: flush updates the SSTables and refreshsizeestimates updates the size estimates based on them --- .../cassandra/CassandraTestEnvironment.java | 36 +++++++++++++--------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java index 5cd21de..2e89383 100644 --- a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java +++ b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java @@ -29,15 +29,13 @@ import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Session; import com.datastax.driver.core.SimpleStatement; import com.datastax.driver.core.SocketOptions; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.CassandraContainer; -import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Container.ExecResult; import org.testcontainers.containers.Network; import org.testcontainers.containers.output.OutputFrame; import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.containers.wait.CassandraQueryWaitStrategy; import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; @@ -57,9 +55,6 @@ public class CassandraTestEnvironment implements TestResource { private static final int READ_TIMEOUT_MILLIS = 36000; - // flushing mem table to SS tables is an asynchronous operation that may take a while - private static final long FLUSH_MEMTABLES_DELAY = 30_000L; - public static final String KEYSPACE = "flink"; private static final String CREATE_KEYSPACE_QUERY = @@ -90,6 +85,7 @@ public class CassandraTestEnvironment implements TestResource { + " (col1, col2, col3, col4)" + " VALUES (%d, %d, %d, %d)"; private static final int NB_SPLITS_RECORDS = 1000; + private static final long FLSUH_MEMTABLES_DELAY = 30_000L; @Container private final CassandraContainer cassandraContainer1; @Container private final CassandraContainer cassandraContainer2; @@ -184,7 +180,7 @@ public class CassandraTestEnvironment implements TestResource { cassandraContainer1.getMappedPort(CQL_PORT)); session = cluster.connect(); executeRequestWithTimeout(CREATE_KEYSPACE_QUERY); - // create a dedicated table for split size tests (to avoid having to flush with each test) + // create a dedicated table for split size tests if (insertTestDataForSplitSizeTests) { insertTestDataForSplitSizeTests(); } @@ -196,7 +192,7 @@ public class CassandraTestEnvironment implements TestResource { for (int i = 0; i < NB_SPLITS_RECORDS; i++) { executeRequestWithTimeout(String.format(INSERT_INTO_FLINK_SPLITS, i, i, i, i)); } - flushMemTables(SPLITS_TABLE); + refreshSizeEstimates(SPLITS_TABLE); } private void stopEnv() { @@ -234,14 +230,24 @@ public class CassandraTestEnvironment implements TestResource { } /** - * Force the flush of cassandra memTables to SSTables in order to update size_estimates. It is - * needed for the tests because we just inserted records, we need to force cassandra to update - * size_estimates system table. + * Force the refresh of system.size_estimates table. It is needed for the tests because we just + * inserted records. It is done on a single node as the size estimation for split generation is + * evaluated based on the ring fraction the connect node represents in the cluster. We first + * flush the memTables to SSTables because the size estimates are only on SSTables. Then we refresh + * the size estimates. */ - void flushMemTables(String table) throws Exception { - cassandraContainer1.execInContainer("nodetool", "flush", KEYSPACE, table); - cassandraContainer2.execInContainer("nodetool", "flush", KEYSPACE, table); - Thread.sleep(FLUSH_MEMTABLES_DELAY); + void refreshSizeEstimates(String table) throws Exception { + final ExecResult execResult1 = + cassandraContainer1.execInContainer( + "nodetool", "flush", KEYSPACE, table); + Thread.sleep(FLSUH_MEMTABLES_DELAY); + final ExecResult execResult2 = + cassandraContainer1.execInContainer( + "nodetool", "refreshsizeestimates"); + if (execResult1.getExitCode() != 0 || execResult2.getExitCode() != 0) { + throw new RuntimeException( + "Failed to refresh system.size_estimates on the Cassandra cluster"); + } } public ResultSet executeRequestWithTimeout(String query) {
