This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-cassandra.git
commit eb10f9ef4e43d4ee4d2dafec43eb5db6b5a7588f Author: Etienne Chauchot <[email protected]> AuthorDate: Tue Jul 29 11:34:30 2025 +0200 [FLINK-37937] Make estimatedTableSize calculated during split preparation accessible to tests and not call estimate_size during tests. In refreshSizeEstimates wait until system.size_estimates has at least a row that has non-null mean_partition_size --- .../dcfaa83d-a12c-48e1-9e51-b8d3808cd287 | 2 +- .../cassandra/source/split/SplitsGenerator.java | 19 ++++++++++++------- .../connector/cassandra/CassandraTestEnvironment.java | 17 +++++++++++++++-- .../cassandra/source/CassandraSourceITCase.java | 10 ++++------ 4 files changed, 32 insertions(+), 16 deletions(-) diff --git a/flink-connector-cassandra/archunit-violations/dcfaa83d-a12c-48e1-9e51-b8d3808cd287 b/flink-connector-cassandra/archunit-violations/dcfaa83d-a12c-48e1-9e51-b8d3808cd287 index 65e52c7..3fc516c 100644 --- a/flink-connector-cassandra/archunit-violations/dcfaa83d-a12c-48e1-9e51-b8d3808cd287 +++ b/flink-connector-cassandra/archunit-violations/dcfaa83d-a12c-48e1-9e51-b8d3808cd287 @@ -14,7 +14,7 @@ Method <org.apache.flink.connector.cassandra.source.CassandraSourceBuilder.build Method <org.apache.flink.connector.cassandra.source.CassandraSourceBuilder.validateCommonParameters()> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (CassandraSourceBuilder.java:185) Method <org.apache.flink.connector.cassandra.source.CassandraSourceBuilder.validateCommonParameters()> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (CassandraSourceBuilder.java:186) Method <org.apache.flink.connector.cassandra.source.reader.CassandraSplitReader.generateRangeQuery(java.lang.String, java.lang.String)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (CassandraSplitReader.java:0) -Method <org.apache.flink.connector.cassandra.source.split.SplitsGenerator.estimateTableSize()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (SplitsGenerator.java:0) +Method <org.apache.flink.connector.cassandra.source.split.SplitsGenerator.getEstimatedTableSize()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (SplitsGenerator.java:0) Method <org.apache.flink.connector.cassandra.source.utils.QueryValidator.checkQueryValidity(java.lang.String)> calls method <org.apache.flink.util.Preconditions.checkState(boolean, java.lang.Object)> in (QueryValidator.java:57) Method <org.apache.flink.connector.cassandra.source.utils.QueryValidator.checkQueryValidity(java.lang.String)> calls method <org.apache.flink.util.Preconditions.checkState(boolean, java.lang.Object)> in (QueryValidator.java:61) Method <org.apache.flink.connector.cassandra.source.utils.QueryValidator.extractFilteringColumns(java.lang.String)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (QueryValidator.java:0) diff --git a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java index 98585c6..9676805 100644 --- a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java +++ b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java @@ -49,6 +49,7 @@ public final class SplitsGenerator { private final String table; private final int parallelism; private final long maxSplitMemorySize; + private long estimatedTableSize; public SplitsGenerator( CassandraPartitioner partitioner, @@ -91,8 +92,8 @@ public final class SplitsGenerator { */ private long decideOnNumSplits() { long numSplits; - final long estimateTableSize = estimateTableSize(); - if (estimateTableSize == 0) { // size estimates unavailable + estimatedTableSize = estimateTableSize(); + if (estimatedTableSize == 0) { // size estimates unavailable LOG.info( "Cassandra size estimates are not available for {}.{} table. Creating as many splits as parallelism ({})", keyspace, @@ -105,11 +106,11 @@ public final class SplitsGenerator { "Estimated size for {}.{} table is {} bytes", keyspace, table, - estimateTableSize); + estimatedTableSize); numSplits = - estimateTableSize / maxSplitMemorySize == 0 + estimatedTableSize / maxSplitMemorySize == 0 ? parallelism - : estimateTableSize / maxSplitMemorySize; + : estimatedTableSize / maxSplitMemorySize; LOG.info( "maxSplitMemorySize set value ({}) leads to the creation of {} splits", maxSplitMemorySize, @@ -123,8 +124,7 @@ public final class SplitsGenerator { * just inserted and the amount of data in the table was small. This is very common situation * during tests. */ - @VisibleForTesting - public long estimateTableSize() { + private long estimateTableSize() { List<TokenRange> tokenRanges = getTokenRangesOfTable(); long size = 0L; for (TokenRange tokenRange : tokenRanges) { @@ -229,6 +229,11 @@ public final class SplitsGenerator { } } + @VisibleForTesting + public long getEstimatedTableSize() { + return estimatedTableSize; + } + private static class TokenRange { private final long partitionCount; private final long meanPartitionSize; diff --git a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java index dc783e9..24e8fda 100644 --- a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java +++ b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/CassandraTestEnvironment.java @@ -26,6 +26,7 @@ import com.datastax.driver.core.Cluster; import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.QueryOptions; import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.datastax.driver.core.SimpleStatement; import com.datastax.driver.core.SocketOptions; @@ -43,6 +44,8 @@ import org.testcontainers.utility.MountableFile; import java.net.InetSocketAddress; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; /** * Junit test environment that contains everything needed at the test suite level: testContainer @@ -86,7 +89,6 @@ public class CassandraTestEnvironment implements TestResource { + " (col1, col2, col3, col4)" + " VALUES (%d, %d, %d, %d)"; private static final int NB_SPLITS_RECORDS = 1000; - private static final long FLUSH_MEMTABLES_DELAY = 30_000L; @Container private final CassandraContainer cassandraContainer1; @Container private final CassandraContainer cassandraContainer2; @@ -235,13 +237,24 @@ public class CassandraTestEnvironment implements TestResource { void refreshSizeEstimates(String table) throws Exception { final ExecResult execResult1 = cassandraContainer1.execInContainer("nodetool", "flush", KEYSPACE, table); - Thread.sleep(FLUSH_MEMTABLES_DELAY); final ExecResult execResult2 = cassandraContainer1.execInContainer("nodetool", "refreshsizeestimates"); if (execResult1.getExitCode() != 0 || execResult2.getExitCode() != 0) { throw new RuntimeException( "Failed to refresh system.size_estimates on the Cassandra cluster"); } + List<Row> partitions = new ArrayList<>(); + while (partitions.isEmpty() + || partitions.stream().anyMatch(row -> row.getLong("mean_partition_size") == 0L)) { + Thread.sleep(1000); + partitions = + session.execute( + "SELECT range_start, range_end, partitions_count, mean_partition_size FROM " + + "system.size_estimates WHERE keyspace_name = ? AND table_name = ?", + KEYSPACE, + table) + .all(); + } } public ResultSet executeRequestWithTimeout(String query) { diff --git a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java index 5268d6b..f74a41b 100644 --- a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java +++ b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java @@ -62,7 +62,6 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test for the Cassandra source. */ class CassandraSourceITCase extends SourceTestSuiteBase<Pojo> { - private static final long EXPECTED_TABLE_SIZE = 51200L; @TestEnv MiniClusterTestEnvironment flinkTestEnvironment = new MiniClusterTestEnvironment(); @TestExternalSystem @@ -152,10 +151,9 @@ class CassandraSourceITCase extends SourceTestSuiteBase<Pojo> { SPLITS_TABLE, parallelism, maxSplitMemorySize); - final long tableSize = generator.estimateTableSize(); - // sanity check to ensure that the size estimates were updated in the Cassandra cluster - assertThat(tableSize).isEqualTo(EXPECTED_TABLE_SIZE); final CassandraEnumeratorState cassandraEnumeratorState = generator.prepareSplits(); + final long tableSize = generator.getEstimatedTableSize(); + assertThat(cassandraEnumeratorState.getNumSplitsLeftToGenerate()) // regular case .isEqualTo(tableSize / maxSplitMemorySize); @@ -177,9 +175,9 @@ class CassandraSourceITCase extends SourceTestSuiteBase<Pojo> { SPLITS_TABLE, parallelism, 100_000_000L); - // sanity check to ensure that the size estimates were updated in the Cassandra cluster - assertThat(generator.estimateTableSize()).isEqualTo(EXPECTED_TABLE_SIZE); final CassandraEnumeratorState cassandraEnumeratorState = generator.prepareSplits(); + final long tableSize = generator.getEstimatedTableSize(); + // maxSplitMemorySize is too high compared to table size. Falling back to parallelism splits // too low maxSplitMemorySize is guarded by an assertion > min at source creation assertThat(cassandraEnumeratorState.getNumSplitsLeftToGenerate()).isEqualTo(parallelism);
