zentol commented on code in PR #3: URL: https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1136881612
########## flink-connector-cassandra/pom.xml: ########## @@ -180,7 +187,7 @@ under the License. <scope>provided</scope> </dependency> - <!-- Test dependencies --> + <!-- Test dependencies --> Review Comment: ```suggestion <!-- Test dependencies --> ``` ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java: ########## @@ -80,27 +85,55 @@ public class CassandraSource<OUT> implements Source<OUT, CassandraSplit, CassandraEnumeratorState>, ResultTypeQueryable<OUT> { - public static final String CQL_PROHIBITTED_CLAUSES_REGEXP = + public static final String CQL_PROHIBITED_CLAUSES_REGEXP = "(?i).*(AVG|COUNT|MIN|MAX|SUM|ORDER|GROUP BY).*"; + public static final String SELECT_REGEXP = "(?i)select .+ from (\\w+)\\.(\\w+).*;$"; + private static final long serialVersionUID = 1L; private final ClusterBuilder clusterBuilder; + @Nullable private final Long maxSplitMemorySize; private final Class<OUT> pojoClass; private final String query; + private final String keyspace; + private final String table; private final MapperOptions mapperOptions; public CassandraSource( ClusterBuilder clusterBuilder, Class<OUT> pojoClass, String query, MapperOptions mapperOptions) { + this(clusterBuilder, null, pojoClass, query, mapperOptions); + } + + public CassandraSource( + ClusterBuilder clusterBuilder, + Long maxSplitMemorySize, + Class<OUT> pojoClass, + String query, + MapperOptions mapperOptions) { checkNotNull(clusterBuilder, "ClusterBuilder required but not provided"); + checkState( + maxSplitMemorySize == null || maxSplitMemorySize > 0, + "Max split size in bytes provided but set to an invalid value {}", + maxSplitMemorySize); checkNotNull(pojoClass, "POJO class required but not provided"); - checkQueryValidity(query); Review Comment: I feel like we could've kept this method if we'd have it return a `Matcher`. ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java: ########## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.enumerator; + +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.CassandraSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayDeque; +import java.util.Queue; + +/** Serializer for {@link CassandraEnumeratorState}. */ +public class CassandraEnumeratorStateSerializer Review Comment: Add private constructor to enforce singleton usage? ########## flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java: ########## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source; + +import org.apache.flink.connector.testframe.TestResource; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.QueryOptions; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; +import com.datastax.driver.core.SocketOptions; +import com.datastax.driver.core.Statement; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.CassandraContainer; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.OutputFrame; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.CassandraQueryWaitStrategy; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.net.InetSocketAddress; + +/** + * Junit test environment that contains everything needed at the test suite level: testContainer + * setup, keyspace setup, Cassandra cluster/session management ClusterBuilder setup). + */ +@Testcontainers +public class CassandraTestEnvironment implements TestResource { + private static final Logger LOG = LoggerFactory.getLogger(CassandraTestEnvironment.class); + private static final String DOCKER_CASSANDRA_IMAGE = "cassandra:4.0.8"; + private static final int CQL_PORT = 9042; + + private static final int READ_TIMEOUT_MILLIS = 36000; + + private static final long FLUSH_MEMTABLES_DELAY = + 30_000L; // updating flushing mem table to SS tables is long, it is the minimum delay. Review Comment: ```suggestion // flushing mem table to SS tables is an asynchronous operation that may take a while private static final long FLUSH_MEMTABLES_DELAY = 30_000L; ``` ########## flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source; + +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.SplitsGenerator; +import org.apache.flink.connector.testframe.environment.ClusterControllable; +import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment; +import org.apache.flink.connector.testframe.environment.TestEnvironment; +import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext; +import org.apache.flink.connector.testframe.junit.annotations.TestContext; +import org.apache.flink.connector.testframe.junit.annotations.TestEnv; +import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem; +import org.apache.flink.connector.testframe.junit.annotations.TestSemantics; +import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase; +import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions; +import org.apache.flink.connectors.cassandra.utils.Pojo; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.util.CloseableIterator; + +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.TestTemplate; + +import java.util.List; + +import static java.util.concurrent.CompletableFuture.runAsync; +import static org.apache.flink.connector.cassandra.source.CassandraTestContext.CassandraTestContextFactory; +import static org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.MURMUR3PARTITIONER; +import static org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.RANDOMPARTITIONER; +import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for the Cassandra source. */ +class CassandraSourceITCase extends SourceTestSuiteBase<Pojo> { + + @TestEnv MiniClusterTestEnvironment flinkTestEnvironment = new MiniClusterTestEnvironment(); + + @TestExternalSystem + CassandraTestEnvironment cassandraTestEnvironment = new CassandraTestEnvironment(); + + @TestSemantics + CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE}; + + @TestContext + CassandraTestContextFactory contextFactory = + new CassandraTestContextFactory(cassandraTestEnvironment); + + @TestTemplate + @DisplayName("Test basic splitting with MURMUR3PARTITIONER (default Cassandra partitioner)") + public void testGenerateSplitsMurMur3Partitioner( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) { + final int parallelism = 2; + SplitsGenerator generator = + new SplitsGenerator( + MURMUR3PARTITIONER, + cassandraTestEnvironment.getSession(), + CassandraTestEnvironment.KEYSPACE, + CassandraTestEnvironment.SPLITS_TABLE, + parallelism, + null); + List<CassandraSplit> splits = generator.generateSplits(); + + // no maxSplitMemorySize specified falling back number of splits = parallelism + assertThat(splits.size()).isEqualTo(parallelism); + assertThat(splits.get(0).splitId()).isEqualTo("(-9223372036854775808,0)"); + assertThat(splits.get(1).splitId()).isEqualTo("(0,9223372036854775807)"); + } + + @TestTemplate + @DisplayName("Test basic splitting with RANDOMPARTITIONER") + public void testGenerateSplitsRandomPartitioner( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) { + final int parallelism = 2; + final SplitsGenerator generator = + new SplitsGenerator( + RANDOMPARTITIONER, + cassandraTestEnvironment.getSession(), + CassandraTestEnvironment.KEYSPACE, + CassandraTestEnvironment.SPLITS_TABLE, + parallelism, + null); + List<CassandraSplit> splits = generator.generateSplits(); + + // no maxSplitMemorySize specified falling back number of splits = parallelism + assertThat(splits.size()).isEqualTo(parallelism); + assertThat(splits.get(0).splitId()).isEqualTo("(0,85070591730234615865843651857942052864)"); + assertThat(splits.get(1).splitId()) + .isEqualTo( + "(85070591730234615865843651857942052864,170141183460469231731687303715884105727)"); + } + + @TestTemplate + @DisplayName("Test splitting with a correct split size set") + public void testGenerateSplitsWithCorrectSize( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) + throws Exception { + final int parallelism = 2; + SplitsGenerator generator = + new SplitsGenerator( + MURMUR3PARTITIONER, + cassandraTestEnvironment.getSession(), + CassandraTestEnvironment.KEYSPACE, + CassandraTestEnvironment.SPLITS_TABLE, + parallelism, + 10000L); + assertThat(generator.estimateTableSize()).isEqualTo(35840L); + List<CassandraSplit> splits = generator.generateSplits(); + // nb splits = tableSize / maxSplitMemorySize + assertThat(splits.size()).isEqualTo(3); + } + + @TestTemplate + @DisplayName("Test splitting with a too big split size set") + public void testGenerateSplitsWithTooBigSize( Review Comment: ```suggestion public void testGenerateSplitsWithTooLargeSize( ``` ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java: ########## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorState; +import org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorStateSerializer; +import org.apache.flink.connector.cassandra.source.enumerator.CassandraSplitEnumerator; +import org.apache.flink.connector.cassandra.source.reader.CassandraSourceReaderFactory; +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.CassandraSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.apache.flink.streaming.connectors.cassandra.MapperOptions; + +import javax.annotation.Nullable; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A bounded source to read from Cassandra and return a collection of entities as {@code + * DataStream<Entity>}. An entity is built by Cassandra mapper ({@code + * com.datastax.driver.mapping.EntityMapper}) based on a POJO containing annotations (as described + * in <a + * href="https://docs.datastax.com/en/developer/java-driver/3.11/manual/object_mapper/creating/"> + * Cassandra object mapper</a>). + * + * <p>To use it, do the following: + * + * <pre>{@code + * ClusterBuilder clusterBuilder = new ClusterBuilder() { + * @Override + * protected Cluster buildCluster(Cluster.Builder builder) { + * return builder.addContactPointsWithPorts(new InetSocketAddress(HOST,PORT)) + * .withQueryOptions(new QueryOptions().setConsistencyLevel(CL)) + * .withSocketOptions(new SocketOptions() + * .setConnectTimeoutMillis(CONNECT_TIMEOUT) + * .setReadTimeoutMillis(READ_TIMEOUT)) + * .build(); + * } + * }; + * long maxSplitMemorySize = ... //optional max split size in bytes. If not set, maxSplitMemorySize = tableSize / parallelism + * Source cassandraSource = new CassandraSource(clusterBuilder, + * maxSplitMemorySize, + * Pojo.class, + * "select ... from KEYSPACE.TABLE ...;", + * () -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)}); + * + * DataStream<Pojo> stream = env.fromSource(cassandraSource, WatermarkStrategy.noWatermarks(), + * "CassandraSource"); + * }</pre> + */ +@PublicEvolving +public class CassandraSource<OUT> + implements Source<OUT, CassandraSplit, CassandraEnumeratorState>, ResultTypeQueryable<OUT> { + + public static final String CQL_PROHIBITED_CLAUSES_REGEXP = + "(?i).*(AVG|COUNT|MIN|MAX|SUM|ORDER|GROUP BY).*"; + public static final String SELECT_REGEXP = "(?i)select .+ from (\\w+)\\.(\\w+).*;$"; Review Comment: nit: Both of these could be a `Pattern`. ########## flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source; + +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.SplitsGenerator; +import org.apache.flink.connector.testframe.environment.ClusterControllable; +import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment; +import org.apache.flink.connector.testframe.environment.TestEnvironment; +import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext; +import org.apache.flink.connector.testframe.junit.annotations.TestContext; +import org.apache.flink.connector.testframe.junit.annotations.TestEnv; +import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem; +import org.apache.flink.connector.testframe.junit.annotations.TestSemantics; +import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase; +import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions; +import org.apache.flink.connectors.cassandra.utils.Pojo; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.util.CloseableIterator; + +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.TestTemplate; + +import java.util.List; + +import static java.util.concurrent.CompletableFuture.runAsync; +import static org.apache.flink.connector.cassandra.source.CassandraTestContext.CassandraTestContextFactory; +import static org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.MURMUR3PARTITIONER; +import static org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.RANDOMPARTITIONER; +import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for the Cassandra source. */ +class CassandraSourceITCase extends SourceTestSuiteBase<Pojo> { + + @TestEnv MiniClusterTestEnvironment flinkTestEnvironment = new MiniClusterTestEnvironment(); + + @TestExternalSystem + CassandraTestEnvironment cassandraTestEnvironment = new CassandraTestEnvironment(); + + @TestSemantics + CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE}; + + @TestContext + CassandraTestContextFactory contextFactory = + new CassandraTestContextFactory(cassandraTestEnvironment); + + @TestTemplate + @DisplayName("Test basic splitting with MURMUR3PARTITIONER (default Cassandra partitioner)") + public void testGenerateSplitsMurMur3Partitioner( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) { + final int parallelism = 2; + SplitsGenerator generator = + new SplitsGenerator( + MURMUR3PARTITIONER, + cassandraTestEnvironment.getSession(), + CassandraTestEnvironment.KEYSPACE, + CassandraTestEnvironment.SPLITS_TABLE, + parallelism, + null); + List<CassandraSplit> splits = generator.generateSplits(); + + // no maxSplitMemorySize specified falling back number of splits = parallelism + assertThat(splits.size()).isEqualTo(parallelism); + assertThat(splits.get(0).splitId()).isEqualTo("(-9223372036854775808,0)"); + assertThat(splits.get(1).splitId()).isEqualTo("(0,9223372036854775807)"); + } + + @TestTemplate + @DisplayName("Test basic splitting with RANDOMPARTITIONER") + public void testGenerateSplitsRandomPartitioner( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) { + final int parallelism = 2; + final SplitsGenerator generator = + new SplitsGenerator( + RANDOMPARTITIONER, + cassandraTestEnvironment.getSession(), + CassandraTestEnvironment.KEYSPACE, + CassandraTestEnvironment.SPLITS_TABLE, + parallelism, + null); + List<CassandraSplit> splits = generator.generateSplits(); + + // no maxSplitMemorySize specified falling back number of splits = parallelism + assertThat(splits.size()).isEqualTo(parallelism); + assertThat(splits.get(0).splitId()).isEqualTo("(0,85070591730234615865843651857942052864)"); + assertThat(splits.get(1).splitId()) + .isEqualTo( + "(85070591730234615865843651857942052864,170141183460469231731687303715884105727)"); + } + + @TestTemplate + @DisplayName("Test splitting with a correct split size set") + public void testGenerateSplitsWithCorrectSize( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) + throws Exception { + final int parallelism = 2; + SplitsGenerator generator = + new SplitsGenerator( + MURMUR3PARTITIONER, + cassandraTestEnvironment.getSession(), + CassandraTestEnvironment.KEYSPACE, + CassandraTestEnvironment.SPLITS_TABLE, + parallelism, + 10000L); + assertThat(generator.estimateTableSize()).isEqualTo(35840L); + List<CassandraSplit> splits = generator.generateSplits(); + // nb splits = tableSize / maxSplitMemorySize + assertThat(splits.size()).isEqualTo(3); Review Comment: ```suggestion long tableSize = generator.estimateTableSize(); assertThat(tableSize).isEqualTo(35840L); List<CassandraSplit> splits = generator.generateSplits(); assertThat(splits.size()).isEqualTo(tableSize / maxSplitMemorySize); ``` ########## flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/reader/CassandraQueryTest.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.reader; + +import org.apache.flink.connector.cassandra.source.CassandraSource; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.assertj.core.api.Assertions.assertThat; + +/** tests for query generation and query sanity checks. */ +class CassandraQueryTest { + + private static final Pattern SELECT_PATTERN = Pattern.compile(CassandraSource.SELECT_REGEXP); + + @Test + public void testKeySpaceTableExtractionRegexp() { + Arrays.asList( + "select field FROM keyspace.table where field = value;", + "select * FROM keyspace.table;", + "select field1, field2 from keyspace.table;", + "select field1, field2 from keyspace.table LIMIT(1000);", + "select field1 from keyspace.table ;", + "select field1 from keyspace.table where field1=1;") + .forEach(CassandraQueryTest::assertQueryFormatCorrect); + + Arrays.asList( + "select field1 from table;", // missing keyspace + "select field1 from keyspace.table" // missing ";" + ) + .forEach(CassandraQueryTest::assertQueryFormatIncorrect); + } + + @Test + public void testProhibitedClauses() { + Arrays.asList( + "SELECT COUNT(*) from flink.table;", + "SELECT AVG(*) from flink.table;", + "SELECT MIN(*) from flink.table;", + "SELECT MAX(*) from flink.table;", + "SELECT SUM(*) from flink.table;", + "SELECT field1, field2 from flink.table ORDER BY field1;", + "SELECT field1, field2 from flink.table GROUP BY field1;") + .forEach(CassandraQueryTest::assertProhibitedClauseRejected); + } + + @Test + public void testGenerateRangeQuery() { + String query; + String outputQuery; + + // query with where clause + query = "SELECT field FROM keyspace.table WHERE field = value;"; + outputQuery = CassandraSplitReader.generateRangeQuery(query, "field"); + assertThat(outputQuery) + .isEqualTo( + "SELECT field FROM keyspace.table WHERE (token(field) >= ?) AND (token(field) < ?) AND field = value;"); + + // query without where clause + query = "SELECT * FROM keyspace.table;"; + outputQuery = CassandraSplitReader.generateRangeQuery(query, "field"); + assertThat(outputQuery) + .isEqualTo( + "SELECT * FROM keyspace.table WHERE (token(field) >= ?) AND (token(field) < ?);"); + + // query without where clause but with another trailing clause Review Comment: also test WHERE + LIMIT? ########## flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source; + +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.SplitsGenerator; +import org.apache.flink.connector.testframe.environment.ClusterControllable; +import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment; +import org.apache.flink.connector.testframe.environment.TestEnvironment; +import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext; +import org.apache.flink.connector.testframe.junit.annotations.TestContext; +import org.apache.flink.connector.testframe.junit.annotations.TestEnv; +import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem; +import org.apache.flink.connector.testframe.junit.annotations.TestSemantics; +import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase; +import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions; +import org.apache.flink.connectors.cassandra.utils.Pojo; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.util.CloseableIterator; + +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.TestTemplate; + +import java.util.List; + +import static java.util.concurrent.CompletableFuture.runAsync; +import static org.apache.flink.connector.cassandra.source.CassandraTestContext.CassandraTestContextFactory; +import static org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.MURMUR3PARTITIONER; +import static org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.RANDOMPARTITIONER; +import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for the Cassandra source. */ +class CassandraSourceITCase extends SourceTestSuiteBase<Pojo> { + + @TestEnv MiniClusterTestEnvironment flinkTestEnvironment = new MiniClusterTestEnvironment(); + + @TestExternalSystem + CassandraTestEnvironment cassandraTestEnvironment = new CassandraTestEnvironment(); + + @TestSemantics + CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE}; + + @TestContext + CassandraTestContextFactory contextFactory = + new CassandraTestContextFactory(cassandraTestEnvironment); + + @TestTemplate + @DisplayName("Test basic splitting with MURMUR3PARTITIONER (default Cassandra partitioner)") + public void testGenerateSplitsMurMur3Partitioner( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) { + final int parallelism = 2; + SplitsGenerator generator = + new SplitsGenerator( + MURMUR3PARTITIONER, + cassandraTestEnvironment.getSession(), + CassandraTestEnvironment.KEYSPACE, + CassandraTestEnvironment.SPLITS_TABLE, + parallelism, + null); + List<CassandraSplit> splits = generator.generateSplits(); + + // no maxSplitMemorySize specified falling back number of splits = parallelism + assertThat(splits.size()).isEqualTo(parallelism); + assertThat(splits.get(0).splitId()).isEqualTo("(-9223372036854775808,0)"); + assertThat(splits.get(1).splitId()).isEqualTo("(0,9223372036854775807)"); + } + + @TestTemplate + @DisplayName("Test basic splitting with RANDOMPARTITIONER") + public void testGenerateSplitsRandomPartitioner( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) { + final int parallelism = 2; + final SplitsGenerator generator = + new SplitsGenerator( + RANDOMPARTITIONER, + cassandraTestEnvironment.getSession(), + CassandraTestEnvironment.KEYSPACE, + CassandraTestEnvironment.SPLITS_TABLE, + parallelism, + null); + List<CassandraSplit> splits = generator.generateSplits(); + + // no maxSplitMemorySize specified falling back number of splits = parallelism + assertThat(splits.size()).isEqualTo(parallelism); + assertThat(splits.get(0).splitId()).isEqualTo("(0,85070591730234615865843651857942052864)"); + assertThat(splits.get(1).splitId()) + .isEqualTo( + "(85070591730234615865843651857942052864,170141183460469231731687303715884105727)"); + } + + @TestTemplate + @DisplayName("Test splitting with a correct split size set") + public void testGenerateSplitsWithCorrectSize( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) + throws Exception { + final int parallelism = 2; + SplitsGenerator generator = + new SplitsGenerator( + MURMUR3PARTITIONER, + cassandraTestEnvironment.getSession(), + CassandraTestEnvironment.KEYSPACE, + CassandraTestEnvironment.SPLITS_TABLE, + parallelism, + 10000L); + assertThat(generator.estimateTableSize()).isEqualTo(35840L); + List<CassandraSplit> splits = generator.generateSplits(); + // nb splits = tableSize / maxSplitMemorySize + assertThat(splits.size()).isEqualTo(3); + } + + @TestTemplate + @DisplayName("Test splitting with a too big split size set") + public void testGenerateSplitsWithTooBigSize( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) + throws Exception { + final int parallelism = 20; + SplitsGenerator generator = + new SplitsGenerator( + MURMUR3PARTITIONER, + cassandraTestEnvironment.getSession(), + CassandraTestEnvironment.KEYSPACE, + CassandraTestEnvironment.SPLITS_TABLE, + parallelism, + 100_000_000L); + assertThat(generator.estimateTableSize()).isEqualTo(35840L); + List<CassandraSplit> splits = generator.generateSplits(); + // tableSize / maxSplitMemorySize is too little compared to parallelism falling back to + // number of splits = parallelism + assertThat(splits.size()).isEqualTo(parallelism); + } + + @TestTemplate + @DisplayName("Test splitting with a too small split size set") + public void testGenerateSplitsWithTooSmallSize( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) + throws Exception { + final int parallelism = 2; + SplitsGenerator generator = + new SplitsGenerator( + MURMUR3PARTITIONER, + cassandraTestEnvironment.getSession(), + CassandraTestEnvironment.KEYSPACE, + CassandraTestEnvironment.SPLITS_TABLE, + parallelism, + 1L); + assertThat(generator.estimateTableSize()).isEqualTo(35840L); + List<CassandraSplit> splits = generator.generateSplits(); + + // tableSize / maxSplitMemorySize is too big compared to parallelism falling back to Review Comment: Reminder for me to check how this is determined. ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraRecordEmitter.java: ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.reader; + +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.connector.base.source.reader.RecordEmitter; +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; + +import com.datastax.driver.core.ColumnDefinitions; +import com.datastax.driver.core.ExecutionInfo; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.function.Function; + +/** + * {@link RecordEmitter} that converts the {@link CassandraRow} read by the {@link + * CassandraSplitReader} to specified POJO and output it. This class uses the Cassandra driver + * mapper to map the row to the POJO. + * + * @param <OUT> type of POJO record to output + */ +class CassandraRecordEmitter<OUT> implements RecordEmitter<CassandraRow, OUT, CassandraSplit> { + + private final Function<ResultSet, OUT> map; + + public CassandraRecordEmitter(Function<ResultSet, OUT> map) { + this.map = map; + } + + @Override + public void emitRecord( + CassandraRow cassandraRow, SourceOutput<OUT> output, CassandraSplit cassandraSplit) { + final Row row = cassandraRow.getRow(); + // Mapping from a row to a Class<OUT> is a complex operation involving reflection API. + // It is better to use Cassandra mapper for it. + // but the mapper takes only a resultSet as input hence forging one containing only the row + ResultSet resultSet = + new ResultSet() { Review Comment: Create a dedicated class that we pass the row into as a constructor argument. Saves us from defining a new class for each row. ########## flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source; + +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.SplitsGenerator; +import org.apache.flink.connector.testframe.environment.ClusterControllable; +import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment; +import org.apache.flink.connector.testframe.environment.TestEnvironment; +import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext; +import org.apache.flink.connector.testframe.junit.annotations.TestContext; +import org.apache.flink.connector.testframe.junit.annotations.TestEnv; +import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem; +import org.apache.flink.connector.testframe.junit.annotations.TestSemantics; +import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase; +import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions; +import org.apache.flink.connectors.cassandra.utils.Pojo; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.util.CloseableIterator; + +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.TestTemplate; + +import java.util.List; + +import static java.util.concurrent.CompletableFuture.runAsync; +import static org.apache.flink.connector.cassandra.source.CassandraTestContext.CassandraTestContextFactory; +import static org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.MURMUR3PARTITIONER; +import static org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.RANDOMPARTITIONER; +import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for the Cassandra source. */ +class CassandraSourceITCase extends SourceTestSuiteBase<Pojo> { + + @TestEnv MiniClusterTestEnvironment flinkTestEnvironment = new MiniClusterTestEnvironment(); + + @TestExternalSystem + CassandraTestEnvironment cassandraTestEnvironment = new CassandraTestEnvironment(); + + @TestSemantics + CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE}; + + @TestContext + CassandraTestContextFactory contextFactory = + new CassandraTestContextFactory(cassandraTestEnvironment); + + @TestTemplate + @DisplayName("Test basic splitting with MURMUR3PARTITIONER (default Cassandra partitioner)") + public void testGenerateSplitsMurMur3Partitioner( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) { + final int parallelism = 2; + SplitsGenerator generator = + new SplitsGenerator( + MURMUR3PARTITIONER, + cassandraTestEnvironment.getSession(), + CassandraTestEnvironment.KEYSPACE, + CassandraTestEnvironment.SPLITS_TABLE, + parallelism, + null); + List<CassandraSplit> splits = generator.generateSplits(); + + // no maxSplitMemorySize specified falling back number of splits = parallelism + assertThat(splits.size()).isEqualTo(parallelism); + assertThat(splits.get(0).splitId()).isEqualTo("(-9223372036854775808,0)"); + assertThat(splits.get(1).splitId()).isEqualTo("(0,9223372036854775807)"); + } + + @TestTemplate + @DisplayName("Test basic splitting with RANDOMPARTITIONER") + public void testGenerateSplitsRandomPartitioner( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) { + final int parallelism = 2; + final SplitsGenerator generator = + new SplitsGenerator( + RANDOMPARTITIONER, + cassandraTestEnvironment.getSession(), + CassandraTestEnvironment.KEYSPACE, + CassandraTestEnvironment.SPLITS_TABLE, + parallelism, + null); + List<CassandraSplit> splits = generator.generateSplits(); + + // no maxSplitMemorySize specified falling back number of splits = parallelism + assertThat(splits.size()).isEqualTo(parallelism); + assertThat(splits.get(0).splitId()).isEqualTo("(0,85070591730234615865843651857942052864)"); + assertThat(splits.get(1).splitId()) + .isEqualTo( + "(85070591730234615865843651857942052864,170141183460469231731687303715884105727)"); + } + + @TestTemplate + @DisplayName("Test splitting with a correct split size set") + public void testGenerateSplitsWithCorrectSize( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) + throws Exception { + final int parallelism = 2; + SplitsGenerator generator = + new SplitsGenerator( + MURMUR3PARTITIONER, + cassandraTestEnvironment.getSession(), + CassandraTestEnvironment.KEYSPACE, + CassandraTestEnvironment.SPLITS_TABLE, + parallelism, + 10000L); + assertThat(generator.estimateTableSize()).isEqualTo(35840L); Review Comment: add a comment or error message that this is a sanity check for data being flushed to SSTs ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java: ########## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.split; + +import org.apache.flink.annotation.VisibleForTesting; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; + +/** + * This class generates {@link CassandraSplit}s based on Cassandra cluster partitioner and cluster + * statistics. It estimates the total size of the table using Cassandra system table + * system.size_estimates. But there is no way to estimate the size of the data with the optional SQL + * filters without reading the data. So the splits can be smaller than {@param maxSplitMemorySize} + * when the query is executed. + */ +public final class SplitsGenerator { + + private static final Logger LOG = LoggerFactory.getLogger(SplitsGenerator.class); + private static final int ACCEPTABLE_NB_SPLIT_PARALLELISM_RATIO = 10; + + private final CassandraPartitioner partitioner; + private final Session session; + private final String keyspace; + private final String table; + private final int parallelism; + @Nullable private final Long maxSplitMemorySize; + + public SplitsGenerator( + CassandraPartitioner partitioner, + Session session, + String keyspace, + String table, + int parallelism, + Long maxSplitMemorySize) { + this.partitioner = partitioner; + this.session = session; + this.keyspace = keyspace; + this.table = table; + this.parallelism = parallelism; + this.maxSplitMemorySize = maxSplitMemorySize; + } + + /** + * Split Cassandra tokens ring into {@link CassandraSplit}s containing each a range of the + * Cassandra ring of {@param maxSplitMemorySize}. If {@param maxSplitMemorySize} is not defined, + * or is too high or too low compared to the task parallelism, then it generates as many {@link + * CassandraSplit}s as the task parallelism. + * + * @return list containing {@code numSplits} CassandraSplits. + */ + public List<CassandraSplit> generateSplits() { + long numSplits; + if (maxSplitMemorySize != null) { + final long estimateTableSize = estimateTableSize(); + LOG.debug("Estimated table size for table {} is {} bytes", table, estimateTableSize); + numSplits = estimateTableSize / maxSplitMemorySize; + if (numSplits == 0 // estimateTableSize can be null in some cases (see javadoc) + || numSplits < parallelism / ACCEPTABLE_NB_SPLIT_PARALLELISM_RATIO // too low + || numSplits + > (long) parallelism + * ACCEPTABLE_NB_SPLIT_PARALLELISM_RATIO) { // too high Review Comment: Not sure how you arrived at this particular ratio; 10 splits per subtasks seems perfectly fine to me. Overall it seems a bit...arbitrary? I don't think we should limit this ratio; so long as things work we're good, no? If someone wants to process a 1GB table in 1 MB chunks with 1 subtask they should be free to do so. We can solve the # number of in-memory splits by generating them lazily instead. I'd be inclined to drop this ratio stuff and enforce a minimum size (a few MB I guess) instead to avoid some stupid cases (like 1). As-is this can just blow-up suddenly in surprising ways. For example, a periodic batch job against a growing table may suddenly crash with a OOM because they now hit this ratio and get massive splits instead. ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java: ########## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.split; + +import org.apache.flink.annotation.VisibleForTesting; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; + +/** + * This class generates {@link CassandraSplit}s based on Cassandra cluster partitioner and cluster + * statistics. It estimates the total size of the table using Cassandra system table + * system.size_estimates. But there is no way to estimate the size of the data with the optional SQL + * filters without reading the data. So the splits can be smaller than {@param maxSplitMemorySize} + * when the query is executed. + */ +public final class SplitsGenerator { + + private static final Logger LOG = LoggerFactory.getLogger(SplitsGenerator.class); + private static final int ACCEPTABLE_NB_SPLIT_PARALLELISM_RATIO = 10; + + private final CassandraPartitioner partitioner; + private final Session session; + private final String keyspace; + private final String table; + private final int parallelism; + @Nullable private final Long maxSplitMemorySize; + + public SplitsGenerator( + CassandraPartitioner partitioner, + Session session, + String keyspace, + String table, + int parallelism, + Long maxSplitMemorySize) { + this.partitioner = partitioner; + this.session = session; + this.keyspace = keyspace; + this.table = table; + this.parallelism = parallelism; + this.maxSplitMemorySize = maxSplitMemorySize; + } + + /** + * Split Cassandra tokens ring into {@link CassandraSplit}s containing each a range of the + * Cassandra ring of {@param maxSplitMemorySize}. If {@param maxSplitMemorySize} is not defined, + * or is too high or too low compared to the task parallelism, then it generates as many {@link + * CassandraSplit}s as the task parallelism. + * + * @return list containing {@code numSplits} CassandraSplits. + */ + public List<CassandraSplit> generateSplits() { + long numSplits; + if (maxSplitMemorySize != null) { + final long estimateTableSize = estimateTableSize(); + LOG.debug("Estimated table size for table {} is {} bytes", table, estimateTableSize); + numSplits = estimateTableSize / maxSplitMemorySize; + if (numSplits == 0 // estimateTableSize can be null in some cases (see javadoc) + || numSplits < parallelism / ACCEPTABLE_NB_SPLIT_PARALLELISM_RATIO // too low + || numSplits + > (long) parallelism + * ACCEPTABLE_NB_SPLIT_PARALLELISM_RATIO) { // too high + LOG.info( + "maxSplitMemorySize set value leads to {} splits with a task parallelism of {}. Creating as many splits as parallelism", + numSplits, + parallelism); + numSplits = parallelism; + } + } else { // not defined + LOG.info("maxSplitMemorySize not set. Creating as many splits as parallelism"); Review Comment: ```suggestion LOG.info("maxSplitMemorySize not set. Creating as many splits as parallelism ({})", parallelism); ``` ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java: ########## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.split; + +import org.apache.flink.annotation.VisibleForTesting; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; + +/** + * This class generates {@link CassandraSplit}s based on Cassandra cluster partitioner and cluster + * statistics. It estimates the total size of the table using Cassandra system table + * system.size_estimates. But there is no way to estimate the size of the data with the optional SQL + * filters without reading the data. So the splits can be smaller than {@param maxSplitMemorySize} + * when the query is executed. + */ +public final class SplitsGenerator { + + private static final Logger LOG = LoggerFactory.getLogger(SplitsGenerator.class); + private static final int ACCEPTABLE_NB_SPLIT_PARALLELISM_RATIO = 10; + + private final CassandraPartitioner partitioner; + private final Session session; + private final String keyspace; + private final String table; + private final int parallelism; + @Nullable private final Long maxSplitMemorySize; + + public SplitsGenerator( + CassandraPartitioner partitioner, + Session session, + String keyspace, + String table, + int parallelism, + Long maxSplitMemorySize) { + this.partitioner = partitioner; + this.session = session; + this.keyspace = keyspace; + this.table = table; + this.parallelism = parallelism; + this.maxSplitMemorySize = maxSplitMemorySize; + } + + /** + * Split Cassandra tokens ring into {@link CassandraSplit}s containing each a range of the + * Cassandra ring of {@param maxSplitMemorySize}. If {@param maxSplitMemorySize} is not defined, + * or is too high or too low compared to the task parallelism, then it generates as many {@link + * CassandraSplit}s as the task parallelism. + * + * @return list containing {@code numSplits} CassandraSplits. + */ + public List<CassandraSplit> generateSplits() { + long numSplits; + if (maxSplitMemorySize != null) { + final long estimateTableSize = estimateTableSize(); + LOG.debug("Estimated table size for table {} is {} bytes", table, estimateTableSize); + numSplits = estimateTableSize / maxSplitMemorySize; + if (numSplits == 0 // estimateTableSize can be null in some cases (see javadoc) + || numSplits < parallelism / ACCEPTABLE_NB_SPLIT_PARALLELISM_RATIO // too low Review Comment: ```suggestion || numSplits < parallelism // too low ``` I don't understand why there should be a ratio here. If `numSplits` is 10, then why should we treat p=90 and p=100 differently? ########## flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source; + +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.SplitsGenerator; +import org.apache.flink.connector.testframe.environment.ClusterControllable; +import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment; +import org.apache.flink.connector.testframe.environment.TestEnvironment; +import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext; +import org.apache.flink.connector.testframe.junit.annotations.TestContext; +import org.apache.flink.connector.testframe.junit.annotations.TestEnv; +import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem; +import org.apache.flink.connector.testframe.junit.annotations.TestSemantics; +import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase; +import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions; +import org.apache.flink.connectors.cassandra.utils.Pojo; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.util.CloseableIterator; + +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.TestTemplate; + +import java.util.List; + +import static java.util.concurrent.CompletableFuture.runAsync; +import static org.apache.flink.connector.cassandra.source.CassandraTestContext.CassandraTestContextFactory; +import static org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.MURMUR3PARTITIONER; +import static org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.RANDOMPARTITIONER; +import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for the Cassandra source. */ +class CassandraSourceITCase extends SourceTestSuiteBase<Pojo> { + + @TestEnv MiniClusterTestEnvironment flinkTestEnvironment = new MiniClusterTestEnvironment(); + + @TestExternalSystem + CassandraTestEnvironment cassandraTestEnvironment = new CassandraTestEnvironment(); + + @TestSemantics + CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE}; + + @TestContext + CassandraTestContextFactory contextFactory = + new CassandraTestContextFactory(cassandraTestEnvironment); + + @TestTemplate + @DisplayName("Test basic splitting with MURMUR3PARTITIONER (default Cassandra partitioner)") + public void testGenerateSplitsMurMur3Partitioner( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) { + final int parallelism = 2; + SplitsGenerator generator = + new SplitsGenerator( + MURMUR3PARTITIONER, + cassandraTestEnvironment.getSession(), + CassandraTestEnvironment.KEYSPACE, + CassandraTestEnvironment.SPLITS_TABLE, + parallelism, + null); + List<CassandraSplit> splits = generator.generateSplits(); + + // no maxSplitMemorySize specified falling back number of splits = parallelism + assertThat(splits.size()).isEqualTo(parallelism); + assertThat(splits.get(0).splitId()).isEqualTo("(-9223372036854775808,0)"); + assertThat(splits.get(1).splitId()).isEqualTo("(0,9223372036854775807)"); + } + + @TestTemplate + @DisplayName("Test basic splitting with RANDOMPARTITIONER") + public void testGenerateSplitsRandomPartitioner( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) { + final int parallelism = 2; + final SplitsGenerator generator = + new SplitsGenerator( + RANDOMPARTITIONER, + cassandraTestEnvironment.getSession(), + CassandraTestEnvironment.KEYSPACE, + CassandraTestEnvironment.SPLITS_TABLE, + parallelism, + null); + List<CassandraSplit> splits = generator.generateSplits(); + + // no maxSplitMemorySize specified falling back number of splits = parallelism + assertThat(splits.size()).isEqualTo(parallelism); + assertThat(splits.get(0).splitId()).isEqualTo("(0,85070591730234615865843651857942052864)"); + assertThat(splits.get(1).splitId()) + .isEqualTo( + "(85070591730234615865843651857942052864,170141183460469231731687303715884105727)"); + } + + @TestTemplate + @DisplayName("Test splitting with a correct split size set") + public void testGenerateSplitsWithCorrectSize( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) + throws Exception { + final int parallelism = 2; + SplitsGenerator generator = + new SplitsGenerator( + MURMUR3PARTITIONER, + cassandraTestEnvironment.getSession(), + CassandraTestEnvironment.KEYSPACE, + CassandraTestEnvironment.SPLITS_TABLE, + parallelism, + 10000L); + assertThat(generator.estimateTableSize()).isEqualTo(35840L); + List<CassandraSplit> splits = generator.generateSplits(); + // nb splits = tableSize / maxSplitMemorySize + assertThat(splits.size()).isEqualTo(3); + } + + @TestTemplate + @DisplayName("Test splitting with a too big split size set") + public void testGenerateSplitsWithTooBigSize( Review Comment: The test name is a bit ambiguous in that it's not clear whether it refers to the size of: * the split * the table * the max split size Maybe name it `testGenerateSplitsWithTableSizeLowerThanMaximumSplitSize` ########## flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source; + +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.SplitsGenerator; +import org.apache.flink.connector.testframe.environment.ClusterControllable; +import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment; +import org.apache.flink.connector.testframe.environment.TestEnvironment; +import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext; +import org.apache.flink.connector.testframe.junit.annotations.TestContext; +import org.apache.flink.connector.testframe.junit.annotations.TestEnv; +import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem; +import org.apache.flink.connector.testframe.junit.annotations.TestSemantics; +import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase; +import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions; +import org.apache.flink.connectors.cassandra.utils.Pojo; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.util.CloseableIterator; + +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.TestTemplate; + +import java.util.List; + +import static java.util.concurrent.CompletableFuture.runAsync; +import static org.apache.flink.connector.cassandra.source.CassandraTestContext.CassandraTestContextFactory; +import static org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.MURMUR3PARTITIONER; +import static org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.RANDOMPARTITIONER; +import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for the Cassandra source. */ +class CassandraSourceITCase extends SourceTestSuiteBase<Pojo> { + + @TestEnv MiniClusterTestEnvironment flinkTestEnvironment = new MiniClusterTestEnvironment(); + + @TestExternalSystem + CassandraTestEnvironment cassandraTestEnvironment = new CassandraTestEnvironment(); + + @TestSemantics + CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE}; + + @TestContext + CassandraTestContextFactory contextFactory = + new CassandraTestContextFactory(cassandraTestEnvironment); + + @TestTemplate + @DisplayName("Test basic splitting with MURMUR3PARTITIONER (default Cassandra partitioner)") + public void testGenerateSplitsMurMur3Partitioner( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) { + final int parallelism = 2; + SplitsGenerator generator = + new SplitsGenerator( + MURMUR3PARTITIONER, + cassandraTestEnvironment.getSession(), + CassandraTestEnvironment.KEYSPACE, + CassandraTestEnvironment.SPLITS_TABLE, + parallelism, + null); + List<CassandraSplit> splits = generator.generateSplits(); + + // no maxSplitMemorySize specified falling back number of splits = parallelism + assertThat(splits.size()).isEqualTo(parallelism); + assertThat(splits.get(0).splitId()).isEqualTo("(-9223372036854775808,0)"); + assertThat(splits.get(1).splitId()).isEqualTo("(0,9223372036854775807)"); + } + + @TestTemplate + @DisplayName("Test basic splitting with RANDOMPARTITIONER") + public void testGenerateSplitsRandomPartitioner( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) { + final int parallelism = 2; + final SplitsGenerator generator = + new SplitsGenerator( + RANDOMPARTITIONER, + cassandraTestEnvironment.getSession(), + CassandraTestEnvironment.KEYSPACE, + CassandraTestEnvironment.SPLITS_TABLE, + parallelism, + null); + List<CassandraSplit> splits = generator.generateSplits(); + + // no maxSplitMemorySize specified falling back number of splits = parallelism + assertThat(splits.size()).isEqualTo(parallelism); + assertThat(splits.get(0).splitId()).isEqualTo("(0,85070591730234615865843651857942052864)"); + assertThat(splits.get(1).splitId()) + .isEqualTo( + "(85070591730234615865843651857942052864,170141183460469231731687303715884105727)"); + } + + @TestTemplate + @DisplayName("Test splitting with a correct split size set") + public void testGenerateSplitsWithCorrectSize( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) + throws Exception { + final int parallelism = 2; + SplitsGenerator generator = + new SplitsGenerator( + MURMUR3PARTITIONER, + cassandraTestEnvironment.getSession(), + CassandraTestEnvironment.KEYSPACE, + CassandraTestEnvironment.SPLITS_TABLE, + parallelism, + 10000L); + assertThat(generator.estimateTableSize()).isEqualTo(35840L); + List<CassandraSplit> splits = generator.generateSplits(); + // nb splits = tableSize / maxSplitMemorySize + assertThat(splits.size()).isEqualTo(3); Review Comment: I guess we can't really verify the actual size of a split (without reading it)? ########## flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/reader/CassandraQueryTest.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.reader; + +import org.apache.flink.connector.cassandra.source.CassandraSource; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.assertj.core.api.Assertions.assertThat; + +/** tests for query generation and query sanity checks. */ +class CassandraQueryTest { + + private static final Pattern SELECT_PATTERN = Pattern.compile(CassandraSource.SELECT_REGEXP); + + @Test + public void testKeySpaceTableExtractionRegexp() { + Arrays.asList( + "select field FROM keyspace.table where field = value;", + "select * FROM keyspace.table;", + "select field1, field2 from keyspace.table;", + "select field1, field2 from keyspace.table LIMIT(1000);", + "select field1 from keyspace.table ;", + "select field1 from keyspace.table where field1=1;") + .forEach(CassandraQueryTest::assertQueryFormatCorrect); + + Arrays.asList( + "select field1 from table;", // missing keyspace + "select field1 from keyspace.table" // missing ";" + ) Review Comment: ```suggestion "select field1 from table;", // missing keyspace "select field1 from .table" // missing keyspace "select field1 from keyspace;", // missing table "select field1 from keyspace.;", // missing table "select field1 from keyspace.table" // missing ";" ) ``` Some more cases we could consider. In particular the `.table` variants could be interesting because they'd happen when users use something like `${keyspace}.${table}` in a script but one of the variables is empty/undefined. ########## flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source; + +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.SplitsGenerator; +import org.apache.flink.connector.testframe.environment.ClusterControllable; +import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment; +import org.apache.flink.connector.testframe.environment.TestEnvironment; +import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext; +import org.apache.flink.connector.testframe.junit.annotations.TestContext; +import org.apache.flink.connector.testframe.junit.annotations.TestEnv; +import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem; +import org.apache.flink.connector.testframe.junit.annotations.TestSemantics; +import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase; +import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions; +import org.apache.flink.connectors.cassandra.utils.Pojo; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.util.CloseableIterator; + +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.TestTemplate; + +import java.util.List; + +import static java.util.concurrent.CompletableFuture.runAsync; +import static org.apache.flink.connector.cassandra.source.CassandraTestContext.CassandraTestContextFactory; +import static org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.MURMUR3PARTITIONER; +import static org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.RANDOMPARTITIONER; +import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for the Cassandra source. */ +class CassandraSourceITCase extends SourceTestSuiteBase<Pojo> { + + @TestEnv MiniClusterTestEnvironment flinkTestEnvironment = new MiniClusterTestEnvironment(); + + @TestExternalSystem + CassandraTestEnvironment cassandraTestEnvironment = new CassandraTestEnvironment(); + + @TestSemantics + CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE}; + + @TestContext + CassandraTestContextFactory contextFactory = + new CassandraTestContextFactory(cassandraTestEnvironment); + + @TestTemplate + @DisplayName("Test basic splitting with MURMUR3PARTITIONER (default Cassandra partitioner)") + public void testGenerateSplitsMurMur3Partitioner( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) { + final int parallelism = 2; + SplitsGenerator generator = + new SplitsGenerator( + MURMUR3PARTITIONER, + cassandraTestEnvironment.getSession(), + CassandraTestEnvironment.KEYSPACE, + CassandraTestEnvironment.SPLITS_TABLE, + parallelism, + null); + List<CassandraSplit> splits = generator.generateSplits(); + + // no maxSplitMemorySize specified falling back number of splits = parallelism + assertThat(splits.size()).isEqualTo(parallelism); + assertThat(splits.get(0).splitId()).isEqualTo("(-9223372036854775808,0)"); + assertThat(splits.get(1).splitId()).isEqualTo("(0,9223372036854775807)"); + } + + @TestTemplate + @DisplayName("Test basic splitting with RANDOMPARTITIONER") + public void testGenerateSplitsRandomPartitioner( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) { + final int parallelism = 2; + final SplitsGenerator generator = + new SplitsGenerator( + RANDOMPARTITIONER, + cassandraTestEnvironment.getSession(), + CassandraTestEnvironment.KEYSPACE, + CassandraTestEnvironment.SPLITS_TABLE, + parallelism, + null); + List<CassandraSplit> splits = generator.generateSplits(); + + // no maxSplitMemorySize specified falling back number of splits = parallelism + assertThat(splits.size()).isEqualTo(parallelism); + assertThat(splits.get(0).splitId()).isEqualTo("(0,85070591730234615865843651857942052864)"); + assertThat(splits.get(1).splitId()) + .isEqualTo( + "(85070591730234615865843651857942052864,170141183460469231731687303715884105727)"); + } + + @TestTemplate + @DisplayName("Test splitting with a correct split size set") + public void testGenerateSplitsWithCorrectSize( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) + throws Exception { + final int parallelism = 2; + SplitsGenerator generator = + new SplitsGenerator( + MURMUR3PARTITIONER, + cassandraTestEnvironment.getSession(), + CassandraTestEnvironment.KEYSPACE, + CassandraTestEnvironment.SPLITS_TABLE, + parallelism, + 10000L); + assertThat(generator.estimateTableSize()).isEqualTo(35840L); + List<CassandraSplit> splits = generator.generateSplits(); + // nb splits = tableSize / maxSplitMemorySize + assertThat(splits.size()).isEqualTo(3); + } + + @TestTemplate + @DisplayName("Test splitting with a too big split size set") + public void testGenerateSplitsWithTooBigSize( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) + throws Exception { + final int parallelism = 20; + SplitsGenerator generator = + new SplitsGenerator( + MURMUR3PARTITIONER, + cassandraTestEnvironment.getSession(), + CassandraTestEnvironment.KEYSPACE, + CassandraTestEnvironment.SPLITS_TABLE, + parallelism, + 100_000_000L); + assertThat(generator.estimateTableSize()).isEqualTo(35840L); + List<CassandraSplit> splits = generator.generateSplits(); + // tableSize / maxSplitMemorySize is too little compared to parallelism falling back to + // number of splits = parallelism + assertThat(splits.size()).isEqualTo(parallelism); + } + + @TestTemplate + @DisplayName("Test splitting with a too small split size set") + public void testGenerateSplitsWithTooSmallSize( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) + throws Exception { + final int parallelism = 2; + SplitsGenerator generator = + new SplitsGenerator( + MURMUR3PARTITIONER, + cassandraTestEnvironment.getSession(), + CassandraTestEnvironment.KEYSPACE, + CassandraTestEnvironment.SPLITS_TABLE, + parallelism, + 1L); + assertThat(generator.estimateTableSize()).isEqualTo(35840L); + List<CassandraSplit> splits = generator.generateSplits(); + + // tableSize / maxSplitMemorySize is too big compared to parallelism falling back to + // number of splits = parallelism + assertThat(splits.size()).isEqualTo(parallelism); + } + + // overridden to use unordered checks + @Override + protected void checkResultWithSemantic( + CloseableIterator<Pojo> resultIterator, + List<List<Pojo>> testData, + CheckpointingMode semantic, + Integer limit) { + if (limit != null) { + Runnable runnable = + () -> + CollectIteratorAssertions.assertUnordered(resultIterator) + .withNumRecordsLimit(limit) + .matchesRecordsFromSource(testData, semantic); + + assertThat(runAsync(runnable)).succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT); Review Comment: Why is this using runAsync here but not in the else branch? ########## flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source; + +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.SplitsGenerator; +import org.apache.flink.connector.testframe.environment.ClusterControllable; +import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment; +import org.apache.flink.connector.testframe.environment.TestEnvironment; +import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext; +import org.apache.flink.connector.testframe.junit.annotations.TestContext; +import org.apache.flink.connector.testframe.junit.annotations.TestEnv; +import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem; +import org.apache.flink.connector.testframe.junit.annotations.TestSemantics; +import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase; +import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions; +import org.apache.flink.connectors.cassandra.utils.Pojo; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.util.CloseableIterator; + +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.TestTemplate; + +import java.util.List; + +import static java.util.concurrent.CompletableFuture.runAsync; +import static org.apache.flink.connector.cassandra.source.CassandraTestContext.CassandraTestContextFactory; +import static org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.MURMUR3PARTITIONER; +import static org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.RANDOMPARTITIONER; +import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for the Cassandra source. */ +class CassandraSourceITCase extends SourceTestSuiteBase<Pojo> { + + @TestEnv MiniClusterTestEnvironment flinkTestEnvironment = new MiniClusterTestEnvironment(); + + @TestExternalSystem + CassandraTestEnvironment cassandraTestEnvironment = new CassandraTestEnvironment(); + + @TestSemantics + CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE}; + + @TestContext + CassandraTestContextFactory contextFactory = + new CassandraTestContextFactory(cassandraTestEnvironment); + + @TestTemplate + @DisplayName("Test basic splitting with MURMUR3PARTITIONER (default Cassandra partitioner)") + public void testGenerateSplitsMurMur3Partitioner( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) { + final int parallelism = 2; + SplitsGenerator generator = + new SplitsGenerator( + MURMUR3PARTITIONER, + cassandraTestEnvironment.getSession(), + CassandraTestEnvironment.KEYSPACE, + CassandraTestEnvironment.SPLITS_TABLE, + parallelism, + null); + List<CassandraSplit> splits = generator.generateSplits(); + + // no maxSplitMemorySize specified falling back number of splits = parallelism + assertThat(splits.size()).isEqualTo(parallelism); + assertThat(splits.get(0).splitId()).isEqualTo("(-9223372036854775808,0)"); + assertThat(splits.get(1).splitId()).isEqualTo("(0,9223372036854775807)"); + } + + @TestTemplate + @DisplayName("Test basic splitting with RANDOMPARTITIONER") + public void testGenerateSplitsRandomPartitioner( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) { + final int parallelism = 2; + final SplitsGenerator generator = + new SplitsGenerator( + RANDOMPARTITIONER, + cassandraTestEnvironment.getSession(), + CassandraTestEnvironment.KEYSPACE, + CassandraTestEnvironment.SPLITS_TABLE, + parallelism, + null); + List<CassandraSplit> splits = generator.generateSplits(); + + // no maxSplitMemorySize specified falling back number of splits = parallelism + assertThat(splits.size()).isEqualTo(parallelism); + assertThat(splits.get(0).splitId()).isEqualTo("(0,85070591730234615865843651857942052864)"); + assertThat(splits.get(1).splitId()) + .isEqualTo( + "(85070591730234615865843651857942052864,170141183460469231731687303715884105727)"); + } + + @TestTemplate + @DisplayName("Test splitting with a correct split size set") + public void testGenerateSplitsWithCorrectSize( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) + throws Exception { + final int parallelism = 2; + SplitsGenerator generator = + new SplitsGenerator( + MURMUR3PARTITIONER, + cassandraTestEnvironment.getSession(), + CassandraTestEnvironment.KEYSPACE, + CassandraTestEnvironment.SPLITS_TABLE, + parallelism, + 10000L); + assertThat(generator.estimateTableSize()).isEqualTo(35840L); + List<CassandraSplit> splits = generator.generateSplits(); + // nb splits = tableSize / maxSplitMemorySize + assertThat(splits.size()).isEqualTo(3); + } + + @TestTemplate + @DisplayName("Test splitting with a too big split size set") + public void testGenerateSplitsWithTooBigSize( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) + throws Exception { + final int parallelism = 20; + SplitsGenerator generator = + new SplitsGenerator( + MURMUR3PARTITIONER, + cassandraTestEnvironment.getSession(), + CassandraTestEnvironment.KEYSPACE, + CassandraTestEnvironment.SPLITS_TABLE, + parallelism, + 100_000_000L); + assertThat(generator.estimateTableSize()).isEqualTo(35840L); + List<CassandraSplit> splits = generator.generateSplits(); + // tableSize / maxSplitMemorySize is too little compared to parallelism falling back to + // number of splits = parallelism Review Comment: Wondering if this shouldn't result in a single split instead :thinking: ########## flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java: ########## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source; + +import org.apache.flink.connector.testframe.TestResource; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.QueryOptions; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; +import com.datastax.driver.core.SocketOptions; +import com.datastax.driver.core.Statement; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.CassandraContainer; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.OutputFrame; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.CassandraQueryWaitStrategy; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.net.InetSocketAddress; + +/** + * Junit test environment that contains everything needed at the test suite level: testContainer + * setup, keyspace setup, Cassandra cluster/session management ClusterBuilder setup). + */ +@Testcontainers +public class CassandraTestEnvironment implements TestResource { + private static final Logger LOG = LoggerFactory.getLogger(CassandraTestEnvironment.class); + private static final String DOCKER_CASSANDRA_IMAGE = "cassandra:4.0.8"; + private static final int CQL_PORT = 9042; + + private static final int READ_TIMEOUT_MILLIS = 36000; + + private static final long FLUSH_MEMTABLES_DELAY = + 30_000L; // updating flushing mem table to SS tables is long, it is the minimum delay. + + static final String KEYSPACE = "flink"; + + private static final String CREATE_KEYSPACE_QUERY = + "CREATE KEYSPACE " + + KEYSPACE + + " WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};"; + + static final String SPLITS_TABLE = "flinksplits"; + private static final String CREATE_SPLITS_TABLE_QUERY = + "CREATE TABLE " + KEYSPACE + "." + SPLITS_TABLE + " (id int PRIMARY KEY, counter int);"; + private static final String INSERT_INTO_FLINK_SPLITS = + "INSERT INTO " + KEYSPACE + "." + SPLITS_TABLE + " (id, counter)" + " VALUES (%d, %d)"; + private static final int NB_SPLITS_RECORDS = 1000; + + @Container private final CassandraContainer cassandraContainer; + + private Cluster cluster; + private Session session; + private ClusterBuilder clusterBuilder; + + public CassandraTestEnvironment() { + cassandraContainer = new CassandraContainer(DOCKER_CASSANDRA_IMAGE); + // more generous timeouts + addJavaOpts( + cassandraContainer, + "-Dcassandra.request_timeout_in_ms=30000", + "-Dcassandra.read_request_timeout_in_ms=15000", + "-Dcassandra.write_request_timeout_in_ms=6000"); + } + + @Override + public void startUp() throws Exception { + startEnv(); + } + + @Override + public void tearDown() throws Exception { + stopEnv(); + } + + private static void addJavaOpts(GenericContainer<?> container, String... opts) { + String jvmOpts = container.getEnvMap().getOrDefault("JVM_OPTS", ""); + container.withEnv("JVM_OPTS", jvmOpts + " " + StringUtils.join(opts, " ")); + } + + private void startEnv() throws Exception { + // configure container start to wait until cassandra is ready to receive queries + cassandraContainer.waitingFor(new CassandraQueryWaitStrategy()); + // start with retrials + cassandraContainer.start(); + cassandraContainer.followOutput( + new Slf4jLogConsumer(LOG), + OutputFrame.OutputType.END, + OutputFrame.OutputType.STDERR, + OutputFrame.OutputType.STDOUT); + + cluster = cassandraContainer.getCluster(); + clusterBuilder = + createBuilderWithConsistencyLevel( + ConsistencyLevel.ONE, + cassandraContainer.getHost(), + cassandraContainer.getMappedPort(CQL_PORT)); + + session = cluster.connect(); + session.execute(requestWithTimeout(CREATE_KEYSPACE_QUERY)); + // create a dedicated table for split size tests (to avoid having to flush with each test) + insertTestDataForSplitSizeTests(); Review Comment: Note: If we ever use this environment in multiple tests we'd want this to be an opt-in thing so we don't wait unnecessarily for a table that we don't even need. ########## flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java: ########## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source; + +import org.apache.flink.connector.testframe.TestResource; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.QueryOptions; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; +import com.datastax.driver.core.SocketOptions; +import com.datastax.driver.core.Statement; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.CassandraContainer; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.OutputFrame; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.CassandraQueryWaitStrategy; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.net.InetSocketAddress; + +/** + * Junit test environment that contains everything needed at the test suite level: testContainer + * setup, keyspace setup, Cassandra cluster/session management ClusterBuilder setup). + */ +@Testcontainers +public class CassandraTestEnvironment implements TestResource { + private static final Logger LOG = LoggerFactory.getLogger(CassandraTestEnvironment.class); + private static final String DOCKER_CASSANDRA_IMAGE = "cassandra:4.0.8"; + private static final int CQL_PORT = 9042; + + private static final int READ_TIMEOUT_MILLIS = 36000; + + private static final long FLUSH_MEMTABLES_DELAY = + 30_000L; // updating flushing mem table to SS tables is long, it is the minimum delay. + + static final String KEYSPACE = "flink"; + + private static final String CREATE_KEYSPACE_QUERY = + "CREATE KEYSPACE " + + KEYSPACE + + " WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};"; + + static final String SPLITS_TABLE = "flinksplits"; + private static final String CREATE_SPLITS_TABLE_QUERY = + "CREATE TABLE " + KEYSPACE + "." + SPLITS_TABLE + " (id int PRIMARY KEY, counter int);"; + private static final String INSERT_INTO_FLINK_SPLITS = + "INSERT INTO " + KEYSPACE + "." + SPLITS_TABLE + " (id, counter)" + " VALUES (%d, %d)"; + private static final int NB_SPLITS_RECORDS = 1000; + + @Container private final CassandraContainer cassandraContainer; + + private Cluster cluster; + private Session session; + private ClusterBuilder clusterBuilder; + + public CassandraTestEnvironment() { + cassandraContainer = new CassandraContainer(DOCKER_CASSANDRA_IMAGE); + // more generous timeouts + addJavaOpts( + cassandraContainer, + "-Dcassandra.request_timeout_in_ms=30000", + "-Dcassandra.read_request_timeout_in_ms=15000", + "-Dcassandra.write_request_timeout_in_ms=6000"); + } + + @Override + public void startUp() throws Exception { + startEnv(); + } + + @Override + public void tearDown() throws Exception { + stopEnv(); + } + + private static void addJavaOpts(GenericContainer<?> container, String... opts) { + String jvmOpts = container.getEnvMap().getOrDefault("JVM_OPTS", ""); + container.withEnv("JVM_OPTS", jvmOpts + " " + StringUtils.join(opts, " ")); + } + + private void startEnv() throws Exception { + // configure container start to wait until cassandra is ready to receive queries + cassandraContainer.waitingFor(new CassandraQueryWaitStrategy()); + // start with retrials + cassandraContainer.start(); + cassandraContainer.followOutput( + new Slf4jLogConsumer(LOG), + OutputFrame.OutputType.END, + OutputFrame.OutputType.STDERR, + OutputFrame.OutputType.STDOUT); + + cluster = cassandraContainer.getCluster(); + clusterBuilder = + createBuilderWithConsistencyLevel( + ConsistencyLevel.ONE, + cassandraContainer.getHost(), + cassandraContainer.getMappedPort(CQL_PORT)); + + session = cluster.connect(); + session.execute(requestWithTimeout(CREATE_KEYSPACE_QUERY)); + // create a dedicated table for split size tests (to avoid having to flush with each test) + insertTestDataForSplitSizeTests(); + } + + private void insertTestDataForSplitSizeTests() throws Exception { + session.execute(requestWithTimeout(CREATE_SPLITS_TABLE_QUERY)); + for (int i = 0; i < NB_SPLITS_RECORDS; i++) { + session.execute(requestWithTimeout(String.format(INSERT_INTO_FLINK_SPLITS, i, i))); + } + flushMemTables(SPLITS_TABLE); + } + + private void stopEnv() { + + if (session != null) { + session.close(); + } + if (cluster != null) { + cluster.close(); + } + cassandraContainer.stop(); + } + + private ClusterBuilder createBuilderWithConsistencyLevel( + ConsistencyLevel consistencyLevel, String host, int port) { + return new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder.addContactPointsWithPorts(new InetSocketAddress(host, port)) + .withQueryOptions( + new QueryOptions() + .setConsistencyLevel(consistencyLevel) + .setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL)) + .withSocketOptions( + new SocketOptions() + // default timeout x 3 + .setConnectTimeoutMillis(15000) + // default timeout x3 and higher than + // request_timeout_in_ms at the cluster level + .setReadTimeoutMillis(READ_TIMEOUT_MILLIS)) + .withoutJMXReporting() + .withoutMetrics() + .build(); + } + }; + } + + /** + * Force the flush of cassandra memTables to SSTables in order to update size_estimates. It is + * needed for the tests because we just inserted records, we need to force cassandra to update + * size_estimates system table. + */ + void flushMemTables(String table) throws Exception { + cassandraContainer.execInContainer("nodetool", "flush", KEYSPACE, table); Review Comment: what's the difference between `flush`and https://docs.datastax.com/en/cassandra-oss/3.x/cassandra/tools/toolsRefreshSizeEstimates.html? ########## flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraSplitEnumerator.java: ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.enumerator; + +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.SplitsGenerator; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Metadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.List; + +import static org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.MURMUR3PARTITIONER; +import static org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.RANDOMPARTITIONER; + +/** {@link SplitEnumerator} that splits Cassandra cluster into {@link CassandraSplit}s. */ +public final class CassandraSplitEnumerator + implements SplitEnumerator<CassandraSplit, CassandraEnumeratorState> { + private static final Logger LOG = LoggerFactory.getLogger(CassandraSplitEnumerator.class); + + private final SplitEnumeratorContext<CassandraSplit> enumeratorContext; + private final CassandraEnumeratorState state; + private final Cluster cluster; + + public CassandraSplitEnumerator( + SplitEnumeratorContext<CassandraSplit> enumeratorContext, + CassandraEnumeratorState state, + ClusterBuilder clusterBuilder) { + this.enumeratorContext = enumeratorContext; + this.state = state == null ? new CassandraEnumeratorState() : state /* snapshot restore*/; + this.cluster = clusterBuilder.getCluster(); + } + + @Override + public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { + checkReaderRegistered(subtaskId); + final CassandraSplit cassandraSplit = state.getASplit(); + if (cassandraSplit != null) { + LOG.info("Assigning splits to reader {}", subtaskId); + enumeratorContext.assignSplit(cassandraSplit, subtaskId); + } else { + LOG.info( + "No split assigned to reader {} because the enumerator has no unassigned split left", + subtaskId); + } + if (!state.hasMoreSplits()) { + LOG.info( + "No more CassandraSplits to assign. Sending NoMoreSplitsEvent to reader {}.", + subtaskId); + enumeratorContext.signalNoMoreSplits(subtaskId); + } + } + + @Override + public void start() { + // discover the splits and update unprocessed splits and then assign them. + // There is only an initial splits discovery, no periodic discovery. + enumeratorContext.callAsync( + this::discoverSplits, + (splits, throwable) -> { + LOG.info("Add {} splits to CassandraSplitEnumerator.", splits.size()); + state.addNewSplits(splits); + }); + } + + private List<CassandraSplit> discoverSplits() { + final int numberOfSplits = enumeratorContext.currentParallelism(); + final Metadata clusterMetadata = cluster.getMetadata(); + final String partitionerName = clusterMetadata.getPartitioner(); + final SplitsGenerator.CassandraPartitioner partitioner = + partitionerName.contains(MURMUR3PARTITIONER.className()) + ? MURMUR3PARTITIONER + : RANDOMPARTITIONER; + return new SplitsGenerator(partitioner).generateSplits(numberOfSplits); Review Comment: Looking at SplitsGenerator#generateaSplits it should be straight-forward to lazily generate splits, no? The start-/endToken, increment and numSplits would be your state. This seems safer than to add safeguards for split counts, which'd be difficult to asses imo as to how effective/dangerous they are. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org