This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-cassandra.git

commit eb10f9ef4e43d4ee4d2dafec43eb5db6b5a7588f
Author: Etienne Chauchot <[email protected]>
AuthorDate: Tue Jul 29 11:34:30 2025 +0200

    [FLINK-37937] Make estimatedTableSize calculated during split preparation 
accessible to tests and not call estimate_size during tests. In 
refreshSizeEstimates wait until system.size_estimates has at least a row that 
has non-null mean_partition_size
---
 .../dcfaa83d-a12c-48e1-9e51-b8d3808cd287              |  2 +-
 .../cassandra/source/split/SplitsGenerator.java       | 19 ++++++++++++-------
 .../connector/cassandra/CassandraTestEnvironment.java | 17 +++++++++++++++--
 .../cassandra/source/CassandraSourceITCase.java       | 10 ++++------
 4 files changed, 32 insertions(+), 16 deletions(-)

diff --git 
a/flink-connector-cassandra/archunit-violations/dcfaa83d-a12c-48e1-9e51-b8d3808cd287
 
b/flink-connector-cassandra/archunit-violations/dcfaa83d-a12c-48e1-9e51-b8d3808cd287
index 65e52c7..3fc516c 100644
--- 
a/flink-connector-cassandra/archunit-violations/dcfaa83d-a12c-48e1-9e51-b8d3808cd287
+++ 
b/flink-connector-cassandra/archunit-violations/dcfaa83d-a12c-48e1-9e51-b8d3808cd287
@@ -14,7 +14,7 @@ Method 
<org.apache.flink.connector.cassandra.source.CassandraSourceBuilder.build
 Method 
<org.apache.flink.connector.cassandra.source.CassandraSourceBuilder.validateCommonParameters()>
 calls method 
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, 
java.lang.String)> in (CassandraSourceBuilder.java:185)
 Method 
<org.apache.flink.connector.cassandra.source.CassandraSourceBuilder.validateCommonParameters()>
 calls method 
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, 
java.lang.String)> in (CassandraSourceBuilder.java:186)
 Method 
<org.apache.flink.connector.cassandra.source.reader.CassandraSplitReader.generateRangeQuery(java.lang.String,
 java.lang.String)> is annotated with 
<org.apache.flink.annotation.VisibleForTesting> in (CassandraSplitReader.java:0)
-Method 
<org.apache.flink.connector.cassandra.source.split.SplitsGenerator.estimateTableSize()>
 is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(SplitsGenerator.java:0)
+Method 
<org.apache.flink.connector.cassandra.source.split.SplitsGenerator.getEstimatedTableSize()>
 is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(SplitsGenerator.java:0)
 Method 
<org.apache.flink.connector.cassandra.source.utils.QueryValidator.checkQueryValidity(java.lang.String)>
 calls method <org.apache.flink.util.Preconditions.checkState(boolean, 
java.lang.Object)> in (QueryValidator.java:57)
 Method 
<org.apache.flink.connector.cassandra.source.utils.QueryValidator.checkQueryValidity(java.lang.String)>
 calls method <org.apache.flink.util.Preconditions.checkState(boolean, 
java.lang.Object)> in (QueryValidator.java:61)
 Method 
<org.apache.flink.connector.cassandra.source.utils.QueryValidator.extractFilteringColumns(java.lang.String)>
 is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(QueryValidator.java:0)
diff --git 
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java
 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java
index 98585c6..9676805 100644
--- 
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java
+++ 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java
@@ -49,6 +49,7 @@ public final class SplitsGenerator {
     private final String table;
     private final int parallelism;
     private final long maxSplitMemorySize;
+    private long estimatedTableSize;
 
     public SplitsGenerator(
             CassandraPartitioner partitioner,
@@ -91,8 +92,8 @@ public final class SplitsGenerator {
      */
     private long decideOnNumSplits() {
         long numSplits;
-        final long estimateTableSize = estimateTableSize();
-        if (estimateTableSize == 0) { // size estimates unavailable
+        estimatedTableSize = estimateTableSize();
+        if (estimatedTableSize == 0) { // size estimates unavailable
             LOG.info(
                     "Cassandra size estimates are not available for {}.{} 
table. Creating as many splits as parallelism ({})",
                     keyspace,
@@ -105,11 +106,11 @@ public final class SplitsGenerator {
                     "Estimated size for {}.{} table is {} bytes",
                     keyspace,
                     table,
-                    estimateTableSize);
+                    estimatedTableSize);
             numSplits =
-                    estimateTableSize / maxSplitMemorySize == 0
+                    estimatedTableSize / maxSplitMemorySize == 0
                             ? parallelism
-                            : estimateTableSize / maxSplitMemorySize;
+                            : estimatedTableSize / maxSplitMemorySize;
             LOG.info(
                     "maxSplitMemorySize set value ({}) leads to the creation 
of {} splits",
                     maxSplitMemorySize,
@@ -123,8 +124,7 @@ public final class SplitsGenerator {
      * just inserted and the amount of data in the table was small. This is 
very common situation
      * during tests.
      */
-    @VisibleForTesting
-    public long estimateTableSize() {
+    private long estimateTableSize() {
         List<TokenRange> tokenRanges = getTokenRangesOfTable();
         long size = 0L;
         for (TokenRange tokenRange : tokenRanges) {
@@ -229,6 +229,11 @@ public final class SplitsGenerator {
         }
     }
 
+    @VisibleForTesting
+    public long getEstimatedTableSize() {
+        return estimatedTableSize;
+    }
+
     private static class TokenRange {
         private final long partitionCount;
         private final long meanPartitionSize;
diff --git 
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java
 
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java
index dc783e9..24e8fda 100644
--- 
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java
+++ 
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java
@@ -26,6 +26,7 @@ import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.ConsistencyLevel;
 import com.datastax.driver.core.QueryOptions;
 import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.SimpleStatement;
 import com.datastax.driver.core.SocketOptions;
@@ -43,6 +44,8 @@ import org.testcontainers.utility.MountableFile;
 
 import java.net.InetSocketAddress;
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * Junit test environment that contains everything needed at the test suite 
level: testContainer
@@ -86,7 +89,6 @@ public class CassandraTestEnvironment implements TestResource 
{
                     + " (col1, col2, col3, col4)"
                     + " VALUES (%d, %d, %d, %d)";
     private static final int NB_SPLITS_RECORDS = 1000;
-    private static final long FLUSH_MEMTABLES_DELAY = 30_000L;
 
     @Container private final CassandraContainer cassandraContainer1;
     @Container private final CassandraContainer cassandraContainer2;
@@ -235,13 +237,24 @@ public class CassandraTestEnvironment implements 
TestResource {
     void refreshSizeEstimates(String table) throws Exception {
         final ExecResult execResult1 =
                 cassandraContainer1.execInContainer("nodetool", "flush", 
KEYSPACE, table);
-        Thread.sleep(FLUSH_MEMTABLES_DELAY);
         final ExecResult execResult2 =
                 cassandraContainer1.execInContainer("nodetool", 
"refreshsizeestimates");
         if (execResult1.getExitCode() != 0 || execResult2.getExitCode() != 0) {
             throw new RuntimeException(
                     "Failed to refresh system.size_estimates on the Cassandra 
cluster");
         }
+        List<Row> partitions = new ArrayList<>();
+        while (partitions.isEmpty()
+                || partitions.stream().anyMatch(row -> 
row.getLong("mean_partition_size") == 0L)) {
+            Thread.sleep(1000);
+            partitions =
+                    session.execute(
+                                    "SELECT range_start, range_end, 
partitions_count, mean_partition_size FROM "
+                                            + "system.size_estimates WHERE 
keyspace_name = ? AND table_name = ?",
+                                    KEYSPACE,
+                                    table)
+                            .all();
+        }
     }
 
     public ResultSet executeRequestWithTimeout(String query) {
diff --git 
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java
 
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java
index 5268d6b..f74a41b 100644
--- 
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java
+++ 
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java
@@ -62,7 +62,6 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 /** Test for the Cassandra source. */
 class CassandraSourceITCase extends SourceTestSuiteBase<Pojo> {
 
-    private static final long EXPECTED_TABLE_SIZE = 51200L;
     @TestEnv MiniClusterTestEnvironment flinkTestEnvironment = new 
MiniClusterTestEnvironment();
 
     @TestExternalSystem
@@ -152,10 +151,9 @@ class CassandraSourceITCase extends 
SourceTestSuiteBase<Pojo> {
                         SPLITS_TABLE,
                         parallelism,
                         maxSplitMemorySize);
-        final long tableSize = generator.estimateTableSize();
-        // sanity check to ensure that the size estimates were updated in the 
Cassandra cluster
-        assertThat(tableSize).isEqualTo(EXPECTED_TABLE_SIZE);
         final CassandraEnumeratorState cassandraEnumeratorState = 
generator.prepareSplits();
+        final long tableSize = generator.getEstimatedTableSize();
+
         assertThat(cassandraEnumeratorState.getNumSplitsLeftToGenerate())
                 // regular case
                 .isEqualTo(tableSize / maxSplitMemorySize);
@@ -177,9 +175,9 @@ class CassandraSourceITCase extends 
SourceTestSuiteBase<Pojo> {
                         SPLITS_TABLE,
                         parallelism,
                         100_000_000L);
-        // sanity check to ensure that the size estimates were updated in the 
Cassandra cluster
-        
assertThat(generator.estimateTableSize()).isEqualTo(EXPECTED_TABLE_SIZE);
         final CassandraEnumeratorState cassandraEnumeratorState = 
generator.prepareSplits();
+        final long tableSize = generator.getEstimatedTableSize();
+
         // maxSplitMemorySize is too high compared to table size. Falling back 
to parallelism splits
         // too low maxSplitMemorySize is guarded by an assertion > min at 
source creation
         
assertThat(cassandraEnumeratorState.getNumSplitsLeftToGenerate()).isEqualTo(parallelism);

Reply via email to