zentol commented on code in PR #3:
URL: 
https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1136881612


##########
flink-connector-cassandra/pom.xml:
##########
@@ -180,7 +187,7 @@ under the License.
                        <scope>provided</scope>
                </dependency>
 
-               <!-- Test dependencies -->
+       <!-- Test dependencies -->

Review Comment:
   ```suggestion
                <!-- Test dependencies -->
   ```



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java:
##########
@@ -80,27 +85,55 @@
 public class CassandraSource<OUT>
         implements Source<OUT, CassandraSplit, CassandraEnumeratorState>, 
ResultTypeQueryable<OUT> {
 
-    public static final String CQL_PROHIBITTED_CLAUSES_REGEXP =
+    public static final String CQL_PROHIBITED_CLAUSES_REGEXP =
             "(?i).*(AVG|COUNT|MIN|MAX|SUM|ORDER|GROUP BY).*";
+    public static final String SELECT_REGEXP = "(?i)select .+ from 
(\\w+)\\.(\\w+).*;$";
+
     private static final long serialVersionUID = 1L;
 
     private final ClusterBuilder clusterBuilder;
+    @Nullable private final Long maxSplitMemorySize;
     private final Class<OUT> pojoClass;
     private final String query;
+    private final String keyspace;
+    private final String table;
     private final MapperOptions mapperOptions;
 
     public CassandraSource(
             ClusterBuilder clusterBuilder,
             Class<OUT> pojoClass,
             String query,
             MapperOptions mapperOptions) {
+        this(clusterBuilder, null, pojoClass, query, mapperOptions);
+    }
+
+    public CassandraSource(
+            ClusterBuilder clusterBuilder,
+            Long maxSplitMemorySize,
+            Class<OUT> pojoClass,
+            String query,
+            MapperOptions mapperOptions) {
         checkNotNull(clusterBuilder, "ClusterBuilder required but not 
provided");
+        checkState(
+                maxSplitMemorySize == null || maxSplitMemorySize > 0,
+                "Max split size in bytes provided but set to an invalid value 
{}",
+                maxSplitMemorySize);
         checkNotNull(pojoClass, "POJO class required but not provided");
-        checkQueryValidity(query);

Review Comment:
   I feel like we could've kept this method if we'd have it return a `Matcher`.



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.enumerator;
+
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+import 
org.apache.flink.connector.cassandra.source.split.CassandraSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+/** Serializer for {@link CassandraEnumeratorState}. */
+public class CassandraEnumeratorStateSerializer

Review Comment:
   Add private constructor to enforce singleton usage?



##########
flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import org.apache.flink.connector.testframe.TestResource;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.SocketOptions;
+import com.datastax.driver.core.Statement;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.CassandraContainer;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.OutputFrame;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.CassandraQueryWaitStrategy;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Junit test environment that contains everything needed at the test suite 
level: testContainer
+ * setup, keyspace setup, Cassandra cluster/session management ClusterBuilder 
setup).
+ */
+@Testcontainers
+public class CassandraTestEnvironment implements TestResource {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CassandraTestEnvironment.class);
+    private static final String DOCKER_CASSANDRA_IMAGE = "cassandra:4.0.8";
+    private static final int CQL_PORT = 9042;
+
+    private static final int READ_TIMEOUT_MILLIS = 36000;
+
+    private static final long FLUSH_MEMTABLES_DELAY =
+            30_000L; // updating flushing mem table to SS tables is long, it 
is the minimum delay.

Review Comment:
   ```suggestion
       // flushing mem table to SS tables is an asynchronous operation that may 
take a while
       private static final long FLUSH_MEMTABLES_DELAY = 30_000L; 
   ```



##########
flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java:
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+import org.apache.flink.connector.cassandra.source.split.SplitsGenerator;
+import org.apache.flink.connector.testframe.environment.ClusterControllable;
+import 
org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import 
org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
+import 
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
+import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
+import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions;
+import org.apache.flink.connectors.cassandra.utils.Pojo;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.TestTemplate;
+
+import java.util.List;
+
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static 
org.apache.flink.connector.cassandra.source.CassandraTestContext.CassandraTestContextFactory;
+import static 
org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.MURMUR3PARTITIONER;
+import static 
org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.RANDOMPARTITIONER;
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for the Cassandra source. */
+class CassandraSourceITCase extends SourceTestSuiteBase<Pojo> {
+
+    @TestEnv MiniClusterTestEnvironment flinkTestEnvironment = new 
MiniClusterTestEnvironment();
+
+    @TestExternalSystem
+    CassandraTestEnvironment cassandraTestEnvironment = new 
CassandraTestEnvironment();
+
+    @TestSemantics
+    CheckpointingMode[] semantics = new CheckpointingMode[] 
{CheckpointingMode.EXACTLY_ONCE};
+
+    @TestContext
+    CassandraTestContextFactory contextFactory =
+            new CassandraTestContextFactory(cassandraTestEnvironment);
+
+    @TestTemplate
+    @DisplayName("Test basic splitting with MURMUR3PARTITIONER (default 
Cassandra partitioner)")
+    public void testGenerateSplitsMurMur3Partitioner(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic) {
+        final int parallelism = 2;
+        SplitsGenerator generator =
+                new SplitsGenerator(
+                        MURMUR3PARTITIONER,
+                        cassandraTestEnvironment.getSession(),
+                        CassandraTestEnvironment.KEYSPACE,
+                        CassandraTestEnvironment.SPLITS_TABLE,
+                        parallelism,
+                        null);
+        List<CassandraSplit> splits = generator.generateSplits();
+
+        // no maxSplitMemorySize specified falling back number of splits = 
parallelism
+        assertThat(splits.size()).isEqualTo(parallelism);
+        
assertThat(splits.get(0).splitId()).isEqualTo("(-9223372036854775808,0)");
+        
assertThat(splits.get(1).splitId()).isEqualTo("(0,9223372036854775807)");
+    }
+
+    @TestTemplate
+    @DisplayName("Test basic splitting with RANDOMPARTITIONER")
+    public void testGenerateSplitsRandomPartitioner(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic) {
+        final int parallelism = 2;
+        final SplitsGenerator generator =
+                new SplitsGenerator(
+                        RANDOMPARTITIONER,
+                        cassandraTestEnvironment.getSession(),
+                        CassandraTestEnvironment.KEYSPACE,
+                        CassandraTestEnvironment.SPLITS_TABLE,
+                        parallelism,
+                        null);
+        List<CassandraSplit> splits = generator.generateSplits();
+
+        // no maxSplitMemorySize specified falling back number of splits = 
parallelism
+        assertThat(splits.size()).isEqualTo(parallelism);
+        
assertThat(splits.get(0).splitId()).isEqualTo("(0,85070591730234615865843651857942052864)");
+        assertThat(splits.get(1).splitId())
+                .isEqualTo(
+                        
"(85070591730234615865843651857942052864,170141183460469231731687303715884105727)");
+    }
+
+    @TestTemplate
+    @DisplayName("Test splitting with a correct split size set")
+    public void testGenerateSplitsWithCorrectSize(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        final int parallelism = 2;
+        SplitsGenerator generator =
+                new SplitsGenerator(
+                        MURMUR3PARTITIONER,
+                        cassandraTestEnvironment.getSession(),
+                        CassandraTestEnvironment.KEYSPACE,
+                        CassandraTestEnvironment.SPLITS_TABLE,
+                        parallelism,
+                        10000L);
+        assertThat(generator.estimateTableSize()).isEqualTo(35840L);
+        List<CassandraSplit> splits = generator.generateSplits();
+        // nb splits = tableSize / maxSplitMemorySize
+        assertThat(splits.size()).isEqualTo(3);
+    }
+
+    @TestTemplate
+    @DisplayName("Test splitting with a too big split size set")
+    public void testGenerateSplitsWithTooBigSize(

Review Comment:
   ```suggestion
       public void testGenerateSplitsWithTooLargeSize(
   ```



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import 
org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorState;
+import 
org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorStateSerializer;
+import 
org.apache.flink.connector.cassandra.source.enumerator.CassandraSplitEnumerator;
+import 
org.apache.flink.connector.cassandra.source.reader.CassandraSourceReaderFactory;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+import 
org.apache.flink.connector.cassandra.source.split.CassandraSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
+
+import javax.annotation.Nullable;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A bounded source to read from Cassandra and return a collection of entities 
as {@code
+ * DataStream<Entity>}. An entity is built by Cassandra mapper ({@code
+ * com.datastax.driver.mapping.EntityMapper}) based on a POJO containing 
annotations (as described
+ * in <a
+ * 
href="https://docs.datastax.com/en/developer/java-driver/3.11/manual/object_mapper/creating/";>
+ * Cassandra object mapper</a>).
+ *
+ * <p>To use it, do the following:
+ *
+ * <pre>{@code
+ * ClusterBuilder clusterBuilder = new ClusterBuilder() {
+ *   @Override
+ *   protected Cluster buildCluster(Cluster.Builder builder) {
+ *     return builder.addContactPointsWithPorts(new 
InetSocketAddress(HOST,PORT))
+ *                   .withQueryOptions(new 
QueryOptions().setConsistencyLevel(CL))
+ *                   .withSocketOptions(new SocketOptions()
+ *                   .setConnectTimeoutMillis(CONNECT_TIMEOUT)
+ *                   .setReadTimeoutMillis(READ_TIMEOUT))
+ *                   .build();
+ *   }
+ * };
+ * long maxSplitMemorySize = ... //optional max split size in bytes. If not 
set, maxSplitMemorySize = tableSize / parallelism
+ * Source cassandraSource = new CassandraSource(clusterBuilder,
+ *                                              maxSplitMemorySize,
+ *                                              Pojo.class,
+ *                                              "select ... from 
KEYSPACE.TABLE ...;",
+ *                                              () -> new Mapper.Option[] 
{Mapper.Option.saveNullFields(true)});
+ *
+ * DataStream<Pojo> stream = env.fromSource(cassandraSource, 
WatermarkStrategy.noWatermarks(),
+ * "CassandraSource");
+ * }</pre>
+ */
+@PublicEvolving
+public class CassandraSource<OUT>
+        implements Source<OUT, CassandraSplit, CassandraEnumeratorState>, 
ResultTypeQueryable<OUT> {
+
+    public static final String CQL_PROHIBITED_CLAUSES_REGEXP =
+            "(?i).*(AVG|COUNT|MIN|MAX|SUM|ORDER|GROUP BY).*";
+    public static final String SELECT_REGEXP = "(?i)select .+ from 
(\\w+)\\.(\\w+).*;$";

Review Comment:
   nit: Both of these could be a `Pattern`.



##########
flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java:
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+import org.apache.flink.connector.cassandra.source.split.SplitsGenerator;
+import org.apache.flink.connector.testframe.environment.ClusterControllable;
+import 
org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import 
org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
+import 
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
+import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
+import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions;
+import org.apache.flink.connectors.cassandra.utils.Pojo;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.TestTemplate;
+
+import java.util.List;
+
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static 
org.apache.flink.connector.cassandra.source.CassandraTestContext.CassandraTestContextFactory;
+import static 
org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.MURMUR3PARTITIONER;
+import static 
org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.RANDOMPARTITIONER;
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for the Cassandra source. */
+class CassandraSourceITCase extends SourceTestSuiteBase<Pojo> {
+
+    @TestEnv MiniClusterTestEnvironment flinkTestEnvironment = new 
MiniClusterTestEnvironment();
+
+    @TestExternalSystem
+    CassandraTestEnvironment cassandraTestEnvironment = new 
CassandraTestEnvironment();
+
+    @TestSemantics
+    CheckpointingMode[] semantics = new CheckpointingMode[] 
{CheckpointingMode.EXACTLY_ONCE};
+
+    @TestContext
+    CassandraTestContextFactory contextFactory =
+            new CassandraTestContextFactory(cassandraTestEnvironment);
+
+    @TestTemplate
+    @DisplayName("Test basic splitting with MURMUR3PARTITIONER (default 
Cassandra partitioner)")
+    public void testGenerateSplitsMurMur3Partitioner(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic) {
+        final int parallelism = 2;
+        SplitsGenerator generator =
+                new SplitsGenerator(
+                        MURMUR3PARTITIONER,
+                        cassandraTestEnvironment.getSession(),
+                        CassandraTestEnvironment.KEYSPACE,
+                        CassandraTestEnvironment.SPLITS_TABLE,
+                        parallelism,
+                        null);
+        List<CassandraSplit> splits = generator.generateSplits();
+
+        // no maxSplitMemorySize specified falling back number of splits = 
parallelism
+        assertThat(splits.size()).isEqualTo(parallelism);
+        
assertThat(splits.get(0).splitId()).isEqualTo("(-9223372036854775808,0)");
+        
assertThat(splits.get(1).splitId()).isEqualTo("(0,9223372036854775807)");
+    }
+
+    @TestTemplate
+    @DisplayName("Test basic splitting with RANDOMPARTITIONER")
+    public void testGenerateSplitsRandomPartitioner(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic) {
+        final int parallelism = 2;
+        final SplitsGenerator generator =
+                new SplitsGenerator(
+                        RANDOMPARTITIONER,
+                        cassandraTestEnvironment.getSession(),
+                        CassandraTestEnvironment.KEYSPACE,
+                        CassandraTestEnvironment.SPLITS_TABLE,
+                        parallelism,
+                        null);
+        List<CassandraSplit> splits = generator.generateSplits();
+
+        // no maxSplitMemorySize specified falling back number of splits = 
parallelism
+        assertThat(splits.size()).isEqualTo(parallelism);
+        
assertThat(splits.get(0).splitId()).isEqualTo("(0,85070591730234615865843651857942052864)");
+        assertThat(splits.get(1).splitId())
+                .isEqualTo(
+                        
"(85070591730234615865843651857942052864,170141183460469231731687303715884105727)");
+    }
+
+    @TestTemplate
+    @DisplayName("Test splitting with a correct split size set")
+    public void testGenerateSplitsWithCorrectSize(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        final int parallelism = 2;
+        SplitsGenerator generator =
+                new SplitsGenerator(
+                        MURMUR3PARTITIONER,
+                        cassandraTestEnvironment.getSession(),
+                        CassandraTestEnvironment.KEYSPACE,
+                        CassandraTestEnvironment.SPLITS_TABLE,
+                        parallelism,
+                        10000L);
+        assertThat(generator.estimateTableSize()).isEqualTo(35840L);
+        List<CassandraSplit> splits = generator.generateSplits();
+        // nb splits = tableSize / maxSplitMemorySize
+        assertThat(splits.size()).isEqualTo(3);

Review Comment:
   ```suggestion
           long tableSize = generator.estimateTableSize();
           assertThat(tableSize).isEqualTo(35840L);
           List<CassandraSplit> splits = generator.generateSplits();
           assertThat(splits.size()).isEqualTo(tableSize / maxSplitMemorySize);
   ```



##########
flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/reader/CassandraQueryTest.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.reader;
+
+import org.apache.flink.connector.cassandra.source.CassandraSource;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** tests for query generation and query sanity checks. */
+class CassandraQueryTest {
+
+    private static final Pattern SELECT_PATTERN = 
Pattern.compile(CassandraSource.SELECT_REGEXP);
+
+    @Test
+    public void testKeySpaceTableExtractionRegexp() {
+        Arrays.asList(
+                        "select field FROM keyspace.table where field = 
value;",
+                        "select * FROM keyspace.table;",
+                        "select field1, field2 from keyspace.table;",
+                        "select field1, field2 from keyspace.table 
LIMIT(1000);",
+                        "select field1 from keyspace.table ;",
+                        "select field1 from keyspace.table where field1=1;")
+                .forEach(CassandraQueryTest::assertQueryFormatCorrect);
+
+        Arrays.asList(
+                        "select field1 from table;", // missing keyspace
+                        "select field1 from keyspace.table" // missing ";"
+                        )
+                .forEach(CassandraQueryTest::assertQueryFormatIncorrect);
+    }
+
+    @Test
+    public void testProhibitedClauses() {
+        Arrays.asList(
+                        "SELECT COUNT(*) from flink.table;",
+                        "SELECT AVG(*) from flink.table;",
+                        "SELECT MIN(*) from flink.table;",
+                        "SELECT MAX(*) from flink.table;",
+                        "SELECT SUM(*) from flink.table;",
+                        "SELECT field1, field2 from flink.table ORDER BY 
field1;",
+                        "SELECT field1, field2 from flink.table GROUP BY 
field1;")
+                .forEach(CassandraQueryTest::assertProhibitedClauseRejected);
+    }
+
+    @Test
+    public void testGenerateRangeQuery() {
+        String query;
+        String outputQuery;
+
+        // query with where clause
+        query = "SELECT field FROM keyspace.table WHERE field = value;";
+        outputQuery = CassandraSplitReader.generateRangeQuery(query, "field");
+        assertThat(outputQuery)
+                .isEqualTo(
+                        "SELECT field FROM keyspace.table WHERE (token(field) 
>= ?) AND (token(field) < ?) AND field = value;");
+
+        // query without where clause
+        query = "SELECT * FROM keyspace.table;";
+        outputQuery = CassandraSplitReader.generateRangeQuery(query, "field");
+        assertThat(outputQuery)
+                .isEqualTo(
+                        "SELECT * FROM keyspace.table WHERE (token(field) >= 
?) AND (token(field) < ?);");
+
+        // query without where clause but with another trailing clause

Review Comment:
   also test WHERE + LIMIT?



##########
flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java:
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+import org.apache.flink.connector.cassandra.source.split.SplitsGenerator;
+import org.apache.flink.connector.testframe.environment.ClusterControllable;
+import 
org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import 
org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
+import 
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
+import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
+import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions;
+import org.apache.flink.connectors.cassandra.utils.Pojo;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.TestTemplate;
+
+import java.util.List;
+
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static 
org.apache.flink.connector.cassandra.source.CassandraTestContext.CassandraTestContextFactory;
+import static 
org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.MURMUR3PARTITIONER;
+import static 
org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.RANDOMPARTITIONER;
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for the Cassandra source. */
+class CassandraSourceITCase extends SourceTestSuiteBase<Pojo> {
+
+    @TestEnv MiniClusterTestEnvironment flinkTestEnvironment = new 
MiniClusterTestEnvironment();
+
+    @TestExternalSystem
+    CassandraTestEnvironment cassandraTestEnvironment = new 
CassandraTestEnvironment();
+
+    @TestSemantics
+    CheckpointingMode[] semantics = new CheckpointingMode[] 
{CheckpointingMode.EXACTLY_ONCE};
+
+    @TestContext
+    CassandraTestContextFactory contextFactory =
+            new CassandraTestContextFactory(cassandraTestEnvironment);
+
+    @TestTemplate
+    @DisplayName("Test basic splitting with MURMUR3PARTITIONER (default 
Cassandra partitioner)")
+    public void testGenerateSplitsMurMur3Partitioner(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic) {
+        final int parallelism = 2;
+        SplitsGenerator generator =
+                new SplitsGenerator(
+                        MURMUR3PARTITIONER,
+                        cassandraTestEnvironment.getSession(),
+                        CassandraTestEnvironment.KEYSPACE,
+                        CassandraTestEnvironment.SPLITS_TABLE,
+                        parallelism,
+                        null);
+        List<CassandraSplit> splits = generator.generateSplits();
+
+        // no maxSplitMemorySize specified falling back number of splits = 
parallelism
+        assertThat(splits.size()).isEqualTo(parallelism);
+        
assertThat(splits.get(0).splitId()).isEqualTo("(-9223372036854775808,0)");
+        
assertThat(splits.get(1).splitId()).isEqualTo("(0,9223372036854775807)");
+    }
+
+    @TestTemplate
+    @DisplayName("Test basic splitting with RANDOMPARTITIONER")
+    public void testGenerateSplitsRandomPartitioner(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic) {
+        final int parallelism = 2;
+        final SplitsGenerator generator =
+                new SplitsGenerator(
+                        RANDOMPARTITIONER,
+                        cassandraTestEnvironment.getSession(),
+                        CassandraTestEnvironment.KEYSPACE,
+                        CassandraTestEnvironment.SPLITS_TABLE,
+                        parallelism,
+                        null);
+        List<CassandraSplit> splits = generator.generateSplits();
+
+        // no maxSplitMemorySize specified falling back number of splits = 
parallelism
+        assertThat(splits.size()).isEqualTo(parallelism);
+        
assertThat(splits.get(0).splitId()).isEqualTo("(0,85070591730234615865843651857942052864)");
+        assertThat(splits.get(1).splitId())
+                .isEqualTo(
+                        
"(85070591730234615865843651857942052864,170141183460469231731687303715884105727)");
+    }
+
+    @TestTemplate
+    @DisplayName("Test splitting with a correct split size set")
+    public void testGenerateSplitsWithCorrectSize(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        final int parallelism = 2;
+        SplitsGenerator generator =
+                new SplitsGenerator(
+                        MURMUR3PARTITIONER,
+                        cassandraTestEnvironment.getSession(),
+                        CassandraTestEnvironment.KEYSPACE,
+                        CassandraTestEnvironment.SPLITS_TABLE,
+                        parallelism,
+                        10000L);
+        assertThat(generator.estimateTableSize()).isEqualTo(35840L);
+        List<CassandraSplit> splits = generator.generateSplits();
+        // nb splits = tableSize / maxSplitMemorySize
+        assertThat(splits.size()).isEqualTo(3);
+    }
+
+    @TestTemplate
+    @DisplayName("Test splitting with a too big split size set")
+    public void testGenerateSplitsWithTooBigSize(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        final int parallelism = 20;
+        SplitsGenerator generator =
+                new SplitsGenerator(
+                        MURMUR3PARTITIONER,
+                        cassandraTestEnvironment.getSession(),
+                        CassandraTestEnvironment.KEYSPACE,
+                        CassandraTestEnvironment.SPLITS_TABLE,
+                        parallelism,
+                        100_000_000L);
+        assertThat(generator.estimateTableSize()).isEqualTo(35840L);
+        List<CassandraSplit> splits = generator.generateSplits();
+        // tableSize / maxSplitMemorySize is too little compared to 
parallelism falling back to
+        // number of splits = parallelism
+        assertThat(splits.size()).isEqualTo(parallelism);
+    }
+
+    @TestTemplate
+    @DisplayName("Test splitting with a too small split size set")
+    public void testGenerateSplitsWithTooSmallSize(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        final int parallelism = 2;
+        SplitsGenerator generator =
+                new SplitsGenerator(
+                        MURMUR3PARTITIONER,
+                        cassandraTestEnvironment.getSession(),
+                        CassandraTestEnvironment.KEYSPACE,
+                        CassandraTestEnvironment.SPLITS_TABLE,
+                        parallelism,
+                        1L);
+        assertThat(generator.estimateTableSize()).isEqualTo(35840L);
+        List<CassandraSplit> splits = generator.generateSplits();
+
+        // tableSize / maxSplitMemorySize is too big compared to parallelism 
falling back to

Review Comment:
   Reminder for me to check how this is determined.



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraRecordEmitter.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.reader;
+
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.ExecutionInfo;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * {@link RecordEmitter} that converts the {@link CassandraRow} read by the 
{@link
+ * CassandraSplitReader} to specified POJO and output it. This class uses the 
Cassandra driver
+ * mapper to map the row to the POJO.
+ *
+ * @param <OUT> type of POJO record to output
+ */
+class CassandraRecordEmitter<OUT> implements RecordEmitter<CassandraRow, OUT, 
CassandraSplit> {
+
+    private final Function<ResultSet, OUT> map;
+
+    public CassandraRecordEmitter(Function<ResultSet, OUT> map) {
+        this.map = map;
+    }
+
+    @Override
+    public void emitRecord(
+            CassandraRow cassandraRow, SourceOutput<OUT> output, 
CassandraSplit cassandraSplit) {
+        final Row row = cassandraRow.getRow();
+        // Mapping from a row to a Class<OUT> is a complex operation involving 
reflection API.
+        // It is better to use Cassandra mapper for it.
+        // but the mapper takes only a resultSet as input hence forging one 
containing only the row
+        ResultSet resultSet =
+                new ResultSet() {

Review Comment:
   Create a dedicated class that we pass the row into as a constructor 
argument. Saves us from defining a new class for each row.



##########
flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java:
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+import org.apache.flink.connector.cassandra.source.split.SplitsGenerator;
+import org.apache.flink.connector.testframe.environment.ClusterControllable;
+import 
org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import 
org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
+import 
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
+import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
+import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions;
+import org.apache.flink.connectors.cassandra.utils.Pojo;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.TestTemplate;
+
+import java.util.List;
+
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static 
org.apache.flink.connector.cassandra.source.CassandraTestContext.CassandraTestContextFactory;
+import static 
org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.MURMUR3PARTITIONER;
+import static 
org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.RANDOMPARTITIONER;
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for the Cassandra source. */
+class CassandraSourceITCase extends SourceTestSuiteBase<Pojo> {
+
+    @TestEnv MiniClusterTestEnvironment flinkTestEnvironment = new 
MiniClusterTestEnvironment();
+
+    @TestExternalSystem
+    CassandraTestEnvironment cassandraTestEnvironment = new 
CassandraTestEnvironment();
+
+    @TestSemantics
+    CheckpointingMode[] semantics = new CheckpointingMode[] 
{CheckpointingMode.EXACTLY_ONCE};
+
+    @TestContext
+    CassandraTestContextFactory contextFactory =
+            new CassandraTestContextFactory(cassandraTestEnvironment);
+
+    @TestTemplate
+    @DisplayName("Test basic splitting with MURMUR3PARTITIONER (default 
Cassandra partitioner)")
+    public void testGenerateSplitsMurMur3Partitioner(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic) {
+        final int parallelism = 2;
+        SplitsGenerator generator =
+                new SplitsGenerator(
+                        MURMUR3PARTITIONER,
+                        cassandraTestEnvironment.getSession(),
+                        CassandraTestEnvironment.KEYSPACE,
+                        CassandraTestEnvironment.SPLITS_TABLE,
+                        parallelism,
+                        null);
+        List<CassandraSplit> splits = generator.generateSplits();
+
+        // no maxSplitMemorySize specified falling back number of splits = 
parallelism
+        assertThat(splits.size()).isEqualTo(parallelism);
+        
assertThat(splits.get(0).splitId()).isEqualTo("(-9223372036854775808,0)");
+        
assertThat(splits.get(1).splitId()).isEqualTo("(0,9223372036854775807)");
+    }
+
+    @TestTemplate
+    @DisplayName("Test basic splitting with RANDOMPARTITIONER")
+    public void testGenerateSplitsRandomPartitioner(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic) {
+        final int parallelism = 2;
+        final SplitsGenerator generator =
+                new SplitsGenerator(
+                        RANDOMPARTITIONER,
+                        cassandraTestEnvironment.getSession(),
+                        CassandraTestEnvironment.KEYSPACE,
+                        CassandraTestEnvironment.SPLITS_TABLE,
+                        parallelism,
+                        null);
+        List<CassandraSplit> splits = generator.generateSplits();
+
+        // no maxSplitMemorySize specified falling back number of splits = 
parallelism
+        assertThat(splits.size()).isEqualTo(parallelism);
+        
assertThat(splits.get(0).splitId()).isEqualTo("(0,85070591730234615865843651857942052864)");
+        assertThat(splits.get(1).splitId())
+                .isEqualTo(
+                        
"(85070591730234615865843651857942052864,170141183460469231731687303715884105727)");
+    }
+
+    @TestTemplate
+    @DisplayName("Test splitting with a correct split size set")
+    public void testGenerateSplitsWithCorrectSize(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        final int parallelism = 2;
+        SplitsGenerator generator =
+                new SplitsGenerator(
+                        MURMUR3PARTITIONER,
+                        cassandraTestEnvironment.getSession(),
+                        CassandraTestEnvironment.KEYSPACE,
+                        CassandraTestEnvironment.SPLITS_TABLE,
+                        parallelism,
+                        10000L);
+        assertThat(generator.estimateTableSize()).isEqualTo(35840L);

Review Comment:
   add a comment or error message that this is a sanity check for data being 
flushed to SSTs



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.split;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class generates {@link CassandraSplit}s based on Cassandra cluster 
partitioner and cluster
+ * statistics. It estimates the total size of the table using Cassandra system 
table
+ * system.size_estimates. But there is no way to estimate the size of the data 
with the optional SQL
+ * filters without reading the data. So the splits can be smaller than {@param 
maxSplitMemorySize}
+ * when the query is executed.
+ */
+public final class SplitsGenerator {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SplitsGenerator.class);
+    private static final int ACCEPTABLE_NB_SPLIT_PARALLELISM_RATIO = 10;
+
+    private final CassandraPartitioner partitioner;
+    private final Session session;
+    private final String keyspace;
+    private final String table;
+    private final int parallelism;
+    @Nullable private final Long maxSplitMemorySize;
+
+    public SplitsGenerator(
+            CassandraPartitioner partitioner,
+            Session session,
+            String keyspace,
+            String table,
+            int parallelism,
+            Long maxSplitMemorySize) {
+        this.partitioner = partitioner;
+        this.session = session;
+        this.keyspace = keyspace;
+        this.table = table;
+        this.parallelism = parallelism;
+        this.maxSplitMemorySize = maxSplitMemorySize;
+    }
+
+    /**
+     * Split Cassandra tokens ring into {@link CassandraSplit}s containing 
each a range of the
+     * Cassandra ring of {@param maxSplitMemorySize}. If {@param 
maxSplitMemorySize} is not defined,
+     * or is too high or too low compared to the task parallelism, then it 
generates as many {@link
+     * CassandraSplit}s as the task parallelism.
+     *
+     * @return list containing {@code numSplits} CassandraSplits.
+     */
+    public List<CassandraSplit> generateSplits() {
+        long numSplits;
+        if (maxSplitMemorySize != null) {
+            final long estimateTableSize = estimateTableSize();
+            LOG.debug("Estimated table size for table {} is {} bytes", table, 
estimateTableSize);
+            numSplits = estimateTableSize / maxSplitMemorySize;
+            if (numSplits == 0 // estimateTableSize can be null in some cases 
(see javadoc)
+                    || numSplits < parallelism / 
ACCEPTABLE_NB_SPLIT_PARALLELISM_RATIO // too low
+                    || numSplits
+                            > (long) parallelism
+                                    * ACCEPTABLE_NB_SPLIT_PARALLELISM_RATIO) { 
// too high

Review Comment:
   Not sure how you arrived at this particular ratio; 10 splits per subtasks 
seems perfectly fine to me. Overall it seems a bit...arbitrary?
   
   I don't think we should limit this ratio; so long as things work we're good, 
no?
   If someone wants to process a 1GB table in 1 MB chunks with 1 subtask they 
should be free to do so.
   We can solve the # number of in-memory splits by generating them lazily 
instead.
   
   I'd be inclined to drop this ratio stuff and enforce a minimum size (a few 
MB I guess) instead to avoid some stupid cases (like 1).
   
   As-is this can just blow-up suddenly in surprising ways. For example, a 
periodic batch job against a growing table may suddenly crash with a OOM 
because they now hit this ratio and get massive splits instead.



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.split;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class generates {@link CassandraSplit}s based on Cassandra cluster 
partitioner and cluster
+ * statistics. It estimates the total size of the table using Cassandra system 
table
+ * system.size_estimates. But there is no way to estimate the size of the data 
with the optional SQL
+ * filters without reading the data. So the splits can be smaller than {@param 
maxSplitMemorySize}
+ * when the query is executed.
+ */
+public final class SplitsGenerator {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SplitsGenerator.class);
+    private static final int ACCEPTABLE_NB_SPLIT_PARALLELISM_RATIO = 10;
+
+    private final CassandraPartitioner partitioner;
+    private final Session session;
+    private final String keyspace;
+    private final String table;
+    private final int parallelism;
+    @Nullable private final Long maxSplitMemorySize;
+
+    public SplitsGenerator(
+            CassandraPartitioner partitioner,
+            Session session,
+            String keyspace,
+            String table,
+            int parallelism,
+            Long maxSplitMemorySize) {
+        this.partitioner = partitioner;
+        this.session = session;
+        this.keyspace = keyspace;
+        this.table = table;
+        this.parallelism = parallelism;
+        this.maxSplitMemorySize = maxSplitMemorySize;
+    }
+
+    /**
+     * Split Cassandra tokens ring into {@link CassandraSplit}s containing 
each a range of the
+     * Cassandra ring of {@param maxSplitMemorySize}. If {@param 
maxSplitMemorySize} is not defined,
+     * or is too high or too low compared to the task parallelism, then it 
generates as many {@link
+     * CassandraSplit}s as the task parallelism.
+     *
+     * @return list containing {@code numSplits} CassandraSplits.
+     */
+    public List<CassandraSplit> generateSplits() {
+        long numSplits;
+        if (maxSplitMemorySize != null) {
+            final long estimateTableSize = estimateTableSize();
+            LOG.debug("Estimated table size for table {} is {} bytes", table, 
estimateTableSize);
+            numSplits = estimateTableSize / maxSplitMemorySize;
+            if (numSplits == 0 // estimateTableSize can be null in some cases 
(see javadoc)
+                    || numSplits < parallelism / 
ACCEPTABLE_NB_SPLIT_PARALLELISM_RATIO // too low
+                    || numSplits
+                            > (long) parallelism
+                                    * ACCEPTABLE_NB_SPLIT_PARALLELISM_RATIO) { 
// too high
+                LOG.info(
+                        "maxSplitMemorySize set value leads to {} splits with 
a task parallelism of {}. Creating as many splits as parallelism",
+                        numSplits,
+                        parallelism);
+                numSplits = parallelism;
+            }
+        } else { // not defined
+            LOG.info("maxSplitMemorySize not set. Creating as many splits as 
parallelism");

Review Comment:
   ```suggestion
               LOG.info("maxSplitMemorySize not set. Creating as many splits as 
parallelism ({})", parallelism);
   ```



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.split;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class generates {@link CassandraSplit}s based on Cassandra cluster 
partitioner and cluster
+ * statistics. It estimates the total size of the table using Cassandra system 
table
+ * system.size_estimates. But there is no way to estimate the size of the data 
with the optional SQL
+ * filters without reading the data. So the splits can be smaller than {@param 
maxSplitMemorySize}
+ * when the query is executed.
+ */
+public final class SplitsGenerator {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SplitsGenerator.class);
+    private static final int ACCEPTABLE_NB_SPLIT_PARALLELISM_RATIO = 10;
+
+    private final CassandraPartitioner partitioner;
+    private final Session session;
+    private final String keyspace;
+    private final String table;
+    private final int parallelism;
+    @Nullable private final Long maxSplitMemorySize;
+
+    public SplitsGenerator(
+            CassandraPartitioner partitioner,
+            Session session,
+            String keyspace,
+            String table,
+            int parallelism,
+            Long maxSplitMemorySize) {
+        this.partitioner = partitioner;
+        this.session = session;
+        this.keyspace = keyspace;
+        this.table = table;
+        this.parallelism = parallelism;
+        this.maxSplitMemorySize = maxSplitMemorySize;
+    }
+
+    /**
+     * Split Cassandra tokens ring into {@link CassandraSplit}s containing 
each a range of the
+     * Cassandra ring of {@param maxSplitMemorySize}. If {@param 
maxSplitMemorySize} is not defined,
+     * or is too high or too low compared to the task parallelism, then it 
generates as many {@link
+     * CassandraSplit}s as the task parallelism.
+     *
+     * @return list containing {@code numSplits} CassandraSplits.
+     */
+    public List<CassandraSplit> generateSplits() {
+        long numSplits;
+        if (maxSplitMemorySize != null) {
+            final long estimateTableSize = estimateTableSize();
+            LOG.debug("Estimated table size for table {} is {} bytes", table, 
estimateTableSize);
+            numSplits = estimateTableSize / maxSplitMemorySize;
+            if (numSplits == 0 // estimateTableSize can be null in some cases 
(see javadoc)
+                    || numSplits < parallelism / 
ACCEPTABLE_NB_SPLIT_PARALLELISM_RATIO // too low

Review Comment:
   ```suggestion
                       || numSplits < parallelism // too low
   ```
   I don't understand why there should be a ratio here.
   If `numSplits` is 10, then why should we treat p=90 and p=100 differently?



##########
flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java:
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+import org.apache.flink.connector.cassandra.source.split.SplitsGenerator;
+import org.apache.flink.connector.testframe.environment.ClusterControllable;
+import 
org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import 
org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
+import 
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
+import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
+import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions;
+import org.apache.flink.connectors.cassandra.utils.Pojo;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.TestTemplate;
+
+import java.util.List;
+
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static 
org.apache.flink.connector.cassandra.source.CassandraTestContext.CassandraTestContextFactory;
+import static 
org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.MURMUR3PARTITIONER;
+import static 
org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.RANDOMPARTITIONER;
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for the Cassandra source. */
+class CassandraSourceITCase extends SourceTestSuiteBase<Pojo> {
+
+    @TestEnv MiniClusterTestEnvironment flinkTestEnvironment = new 
MiniClusterTestEnvironment();
+
+    @TestExternalSystem
+    CassandraTestEnvironment cassandraTestEnvironment = new 
CassandraTestEnvironment();
+
+    @TestSemantics
+    CheckpointingMode[] semantics = new CheckpointingMode[] 
{CheckpointingMode.EXACTLY_ONCE};
+
+    @TestContext
+    CassandraTestContextFactory contextFactory =
+            new CassandraTestContextFactory(cassandraTestEnvironment);
+
+    @TestTemplate
+    @DisplayName("Test basic splitting with MURMUR3PARTITIONER (default 
Cassandra partitioner)")
+    public void testGenerateSplitsMurMur3Partitioner(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic) {
+        final int parallelism = 2;
+        SplitsGenerator generator =
+                new SplitsGenerator(
+                        MURMUR3PARTITIONER,
+                        cassandraTestEnvironment.getSession(),
+                        CassandraTestEnvironment.KEYSPACE,
+                        CassandraTestEnvironment.SPLITS_TABLE,
+                        parallelism,
+                        null);
+        List<CassandraSplit> splits = generator.generateSplits();
+
+        // no maxSplitMemorySize specified falling back number of splits = 
parallelism
+        assertThat(splits.size()).isEqualTo(parallelism);
+        
assertThat(splits.get(0).splitId()).isEqualTo("(-9223372036854775808,0)");
+        
assertThat(splits.get(1).splitId()).isEqualTo("(0,9223372036854775807)");
+    }
+
+    @TestTemplate
+    @DisplayName("Test basic splitting with RANDOMPARTITIONER")
+    public void testGenerateSplitsRandomPartitioner(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic) {
+        final int parallelism = 2;
+        final SplitsGenerator generator =
+                new SplitsGenerator(
+                        RANDOMPARTITIONER,
+                        cassandraTestEnvironment.getSession(),
+                        CassandraTestEnvironment.KEYSPACE,
+                        CassandraTestEnvironment.SPLITS_TABLE,
+                        parallelism,
+                        null);
+        List<CassandraSplit> splits = generator.generateSplits();
+
+        // no maxSplitMemorySize specified falling back number of splits = 
parallelism
+        assertThat(splits.size()).isEqualTo(parallelism);
+        
assertThat(splits.get(0).splitId()).isEqualTo("(0,85070591730234615865843651857942052864)");
+        assertThat(splits.get(1).splitId())
+                .isEqualTo(
+                        
"(85070591730234615865843651857942052864,170141183460469231731687303715884105727)");
+    }
+
+    @TestTemplate
+    @DisplayName("Test splitting with a correct split size set")
+    public void testGenerateSplitsWithCorrectSize(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        final int parallelism = 2;
+        SplitsGenerator generator =
+                new SplitsGenerator(
+                        MURMUR3PARTITIONER,
+                        cassandraTestEnvironment.getSession(),
+                        CassandraTestEnvironment.KEYSPACE,
+                        CassandraTestEnvironment.SPLITS_TABLE,
+                        parallelism,
+                        10000L);
+        assertThat(generator.estimateTableSize()).isEqualTo(35840L);
+        List<CassandraSplit> splits = generator.generateSplits();
+        // nb splits = tableSize / maxSplitMemorySize
+        assertThat(splits.size()).isEqualTo(3);
+    }
+
+    @TestTemplate
+    @DisplayName("Test splitting with a too big split size set")
+    public void testGenerateSplitsWithTooBigSize(

Review Comment:
   The test name is a bit ambiguous in that it's not clear whether it refers to 
the size of:
   * the split
   * the table
   * the max split size
   
   Maybe name it `testGenerateSplitsWithTableSizeLowerThanMaximumSplitSize`



##########
flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java:
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+import org.apache.flink.connector.cassandra.source.split.SplitsGenerator;
+import org.apache.flink.connector.testframe.environment.ClusterControllable;
+import 
org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import 
org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
+import 
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
+import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
+import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions;
+import org.apache.flink.connectors.cassandra.utils.Pojo;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.TestTemplate;
+
+import java.util.List;
+
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static 
org.apache.flink.connector.cassandra.source.CassandraTestContext.CassandraTestContextFactory;
+import static 
org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.MURMUR3PARTITIONER;
+import static 
org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.RANDOMPARTITIONER;
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for the Cassandra source. */
+class CassandraSourceITCase extends SourceTestSuiteBase<Pojo> {
+
+    @TestEnv MiniClusterTestEnvironment flinkTestEnvironment = new 
MiniClusterTestEnvironment();
+
+    @TestExternalSystem
+    CassandraTestEnvironment cassandraTestEnvironment = new 
CassandraTestEnvironment();
+
+    @TestSemantics
+    CheckpointingMode[] semantics = new CheckpointingMode[] 
{CheckpointingMode.EXACTLY_ONCE};
+
+    @TestContext
+    CassandraTestContextFactory contextFactory =
+            new CassandraTestContextFactory(cassandraTestEnvironment);
+
+    @TestTemplate
+    @DisplayName("Test basic splitting with MURMUR3PARTITIONER (default 
Cassandra partitioner)")
+    public void testGenerateSplitsMurMur3Partitioner(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic) {
+        final int parallelism = 2;
+        SplitsGenerator generator =
+                new SplitsGenerator(
+                        MURMUR3PARTITIONER,
+                        cassandraTestEnvironment.getSession(),
+                        CassandraTestEnvironment.KEYSPACE,
+                        CassandraTestEnvironment.SPLITS_TABLE,
+                        parallelism,
+                        null);
+        List<CassandraSplit> splits = generator.generateSplits();
+
+        // no maxSplitMemorySize specified falling back number of splits = 
parallelism
+        assertThat(splits.size()).isEqualTo(parallelism);
+        
assertThat(splits.get(0).splitId()).isEqualTo("(-9223372036854775808,0)");
+        
assertThat(splits.get(1).splitId()).isEqualTo("(0,9223372036854775807)");
+    }
+
+    @TestTemplate
+    @DisplayName("Test basic splitting with RANDOMPARTITIONER")
+    public void testGenerateSplitsRandomPartitioner(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic) {
+        final int parallelism = 2;
+        final SplitsGenerator generator =
+                new SplitsGenerator(
+                        RANDOMPARTITIONER,
+                        cassandraTestEnvironment.getSession(),
+                        CassandraTestEnvironment.KEYSPACE,
+                        CassandraTestEnvironment.SPLITS_TABLE,
+                        parallelism,
+                        null);
+        List<CassandraSplit> splits = generator.generateSplits();
+
+        // no maxSplitMemorySize specified falling back number of splits = 
parallelism
+        assertThat(splits.size()).isEqualTo(parallelism);
+        
assertThat(splits.get(0).splitId()).isEqualTo("(0,85070591730234615865843651857942052864)");
+        assertThat(splits.get(1).splitId())
+                .isEqualTo(
+                        
"(85070591730234615865843651857942052864,170141183460469231731687303715884105727)");
+    }
+
+    @TestTemplate
+    @DisplayName("Test splitting with a correct split size set")
+    public void testGenerateSplitsWithCorrectSize(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        final int parallelism = 2;
+        SplitsGenerator generator =
+                new SplitsGenerator(
+                        MURMUR3PARTITIONER,
+                        cassandraTestEnvironment.getSession(),
+                        CassandraTestEnvironment.KEYSPACE,
+                        CassandraTestEnvironment.SPLITS_TABLE,
+                        parallelism,
+                        10000L);
+        assertThat(generator.estimateTableSize()).isEqualTo(35840L);
+        List<CassandraSplit> splits = generator.generateSplits();
+        // nb splits = tableSize / maxSplitMemorySize
+        assertThat(splits.size()).isEqualTo(3);

Review Comment:
   I guess we can't really verify the actual size of a split (without reading 
it)?



##########
flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/reader/CassandraQueryTest.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.reader;
+
+import org.apache.flink.connector.cassandra.source.CassandraSource;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** tests for query generation and query sanity checks. */
+class CassandraQueryTest {
+
+    private static final Pattern SELECT_PATTERN = 
Pattern.compile(CassandraSource.SELECT_REGEXP);
+
+    @Test
+    public void testKeySpaceTableExtractionRegexp() {
+        Arrays.asList(
+                        "select field FROM keyspace.table where field = 
value;",
+                        "select * FROM keyspace.table;",
+                        "select field1, field2 from keyspace.table;",
+                        "select field1, field2 from keyspace.table 
LIMIT(1000);",
+                        "select field1 from keyspace.table ;",
+                        "select field1 from keyspace.table where field1=1;")
+                .forEach(CassandraQueryTest::assertQueryFormatCorrect);
+
+        Arrays.asList(
+                        "select field1 from table;", // missing keyspace
+                        "select field1 from keyspace.table" // missing ";"
+                        )

Review Comment:
   ```suggestion
                           "select field1 from table;", // missing keyspace
                           "select field1 from .table" // missing keyspace
                           "select field1 from keyspace;", // missing table
                           "select field1 from keyspace.;", // missing table
                           "select field1 from keyspace.table" // missing ";"
                           )
   ```
   Some more cases we could consider. In particular the `.table` variants could 
be interesting because they'd happen when users use something like 
`${keyspace}.${table}` in a script but one of the variables is empty/undefined.



##########
flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java:
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+import org.apache.flink.connector.cassandra.source.split.SplitsGenerator;
+import org.apache.flink.connector.testframe.environment.ClusterControllable;
+import 
org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import 
org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
+import 
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
+import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
+import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions;
+import org.apache.flink.connectors.cassandra.utils.Pojo;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.TestTemplate;
+
+import java.util.List;
+
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static 
org.apache.flink.connector.cassandra.source.CassandraTestContext.CassandraTestContextFactory;
+import static 
org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.MURMUR3PARTITIONER;
+import static 
org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.RANDOMPARTITIONER;
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for the Cassandra source. */
+class CassandraSourceITCase extends SourceTestSuiteBase<Pojo> {
+
+    @TestEnv MiniClusterTestEnvironment flinkTestEnvironment = new 
MiniClusterTestEnvironment();
+
+    @TestExternalSystem
+    CassandraTestEnvironment cassandraTestEnvironment = new 
CassandraTestEnvironment();
+
+    @TestSemantics
+    CheckpointingMode[] semantics = new CheckpointingMode[] 
{CheckpointingMode.EXACTLY_ONCE};
+
+    @TestContext
+    CassandraTestContextFactory contextFactory =
+            new CassandraTestContextFactory(cassandraTestEnvironment);
+
+    @TestTemplate
+    @DisplayName("Test basic splitting with MURMUR3PARTITIONER (default 
Cassandra partitioner)")
+    public void testGenerateSplitsMurMur3Partitioner(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic) {
+        final int parallelism = 2;
+        SplitsGenerator generator =
+                new SplitsGenerator(
+                        MURMUR3PARTITIONER,
+                        cassandraTestEnvironment.getSession(),
+                        CassandraTestEnvironment.KEYSPACE,
+                        CassandraTestEnvironment.SPLITS_TABLE,
+                        parallelism,
+                        null);
+        List<CassandraSplit> splits = generator.generateSplits();
+
+        // no maxSplitMemorySize specified falling back number of splits = 
parallelism
+        assertThat(splits.size()).isEqualTo(parallelism);
+        
assertThat(splits.get(0).splitId()).isEqualTo("(-9223372036854775808,0)");
+        
assertThat(splits.get(1).splitId()).isEqualTo("(0,9223372036854775807)");
+    }
+
+    @TestTemplate
+    @DisplayName("Test basic splitting with RANDOMPARTITIONER")
+    public void testGenerateSplitsRandomPartitioner(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic) {
+        final int parallelism = 2;
+        final SplitsGenerator generator =
+                new SplitsGenerator(
+                        RANDOMPARTITIONER,
+                        cassandraTestEnvironment.getSession(),
+                        CassandraTestEnvironment.KEYSPACE,
+                        CassandraTestEnvironment.SPLITS_TABLE,
+                        parallelism,
+                        null);
+        List<CassandraSplit> splits = generator.generateSplits();
+
+        // no maxSplitMemorySize specified falling back number of splits = 
parallelism
+        assertThat(splits.size()).isEqualTo(parallelism);
+        
assertThat(splits.get(0).splitId()).isEqualTo("(0,85070591730234615865843651857942052864)");
+        assertThat(splits.get(1).splitId())
+                .isEqualTo(
+                        
"(85070591730234615865843651857942052864,170141183460469231731687303715884105727)");
+    }
+
+    @TestTemplate
+    @DisplayName("Test splitting with a correct split size set")
+    public void testGenerateSplitsWithCorrectSize(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        final int parallelism = 2;
+        SplitsGenerator generator =
+                new SplitsGenerator(
+                        MURMUR3PARTITIONER,
+                        cassandraTestEnvironment.getSession(),
+                        CassandraTestEnvironment.KEYSPACE,
+                        CassandraTestEnvironment.SPLITS_TABLE,
+                        parallelism,
+                        10000L);
+        assertThat(generator.estimateTableSize()).isEqualTo(35840L);
+        List<CassandraSplit> splits = generator.generateSplits();
+        // nb splits = tableSize / maxSplitMemorySize
+        assertThat(splits.size()).isEqualTo(3);
+    }
+
+    @TestTemplate
+    @DisplayName("Test splitting with a too big split size set")
+    public void testGenerateSplitsWithTooBigSize(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        final int parallelism = 20;
+        SplitsGenerator generator =
+                new SplitsGenerator(
+                        MURMUR3PARTITIONER,
+                        cassandraTestEnvironment.getSession(),
+                        CassandraTestEnvironment.KEYSPACE,
+                        CassandraTestEnvironment.SPLITS_TABLE,
+                        parallelism,
+                        100_000_000L);
+        assertThat(generator.estimateTableSize()).isEqualTo(35840L);
+        List<CassandraSplit> splits = generator.generateSplits();
+        // tableSize / maxSplitMemorySize is too little compared to 
parallelism falling back to
+        // number of splits = parallelism
+        assertThat(splits.size()).isEqualTo(parallelism);
+    }
+
+    @TestTemplate
+    @DisplayName("Test splitting with a too small split size set")
+    public void testGenerateSplitsWithTooSmallSize(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        final int parallelism = 2;
+        SplitsGenerator generator =
+                new SplitsGenerator(
+                        MURMUR3PARTITIONER,
+                        cassandraTestEnvironment.getSession(),
+                        CassandraTestEnvironment.KEYSPACE,
+                        CassandraTestEnvironment.SPLITS_TABLE,
+                        parallelism,
+                        1L);
+        assertThat(generator.estimateTableSize()).isEqualTo(35840L);
+        List<CassandraSplit> splits = generator.generateSplits();
+
+        // tableSize / maxSplitMemorySize is too big compared to parallelism 
falling back to
+        // number of splits = parallelism
+        assertThat(splits.size()).isEqualTo(parallelism);
+    }
+
+    // overridden to use unordered checks
+    @Override
+    protected void checkResultWithSemantic(
+            CloseableIterator<Pojo> resultIterator,
+            List<List<Pojo>> testData,
+            CheckpointingMode semantic,
+            Integer limit) {
+        if (limit != null) {
+            Runnable runnable =
+                    () ->
+                            
CollectIteratorAssertions.assertUnordered(resultIterator)
+                                    .withNumRecordsLimit(limit)
+                                    .matchesRecordsFromSource(testData, 
semantic);
+
+            
assertThat(runAsync(runnable)).succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT);

Review Comment:
   Why is this using runAsync here but not in the else branch?



##########
flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java:
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+import org.apache.flink.connector.cassandra.source.split.SplitsGenerator;
+import org.apache.flink.connector.testframe.environment.ClusterControllable;
+import 
org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import 
org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
+import 
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
+import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
+import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions;
+import org.apache.flink.connectors.cassandra.utils.Pojo;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.TestTemplate;
+
+import java.util.List;
+
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static 
org.apache.flink.connector.cassandra.source.CassandraTestContext.CassandraTestContextFactory;
+import static 
org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.MURMUR3PARTITIONER;
+import static 
org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.RANDOMPARTITIONER;
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for the Cassandra source. */
+class CassandraSourceITCase extends SourceTestSuiteBase<Pojo> {
+
+    @TestEnv MiniClusterTestEnvironment flinkTestEnvironment = new 
MiniClusterTestEnvironment();
+
+    @TestExternalSystem
+    CassandraTestEnvironment cassandraTestEnvironment = new 
CassandraTestEnvironment();
+
+    @TestSemantics
+    CheckpointingMode[] semantics = new CheckpointingMode[] 
{CheckpointingMode.EXACTLY_ONCE};
+
+    @TestContext
+    CassandraTestContextFactory contextFactory =
+            new CassandraTestContextFactory(cassandraTestEnvironment);
+
+    @TestTemplate
+    @DisplayName("Test basic splitting with MURMUR3PARTITIONER (default 
Cassandra partitioner)")
+    public void testGenerateSplitsMurMur3Partitioner(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic) {
+        final int parallelism = 2;
+        SplitsGenerator generator =
+                new SplitsGenerator(
+                        MURMUR3PARTITIONER,
+                        cassandraTestEnvironment.getSession(),
+                        CassandraTestEnvironment.KEYSPACE,
+                        CassandraTestEnvironment.SPLITS_TABLE,
+                        parallelism,
+                        null);
+        List<CassandraSplit> splits = generator.generateSplits();
+
+        // no maxSplitMemorySize specified falling back number of splits = 
parallelism
+        assertThat(splits.size()).isEqualTo(parallelism);
+        
assertThat(splits.get(0).splitId()).isEqualTo("(-9223372036854775808,0)");
+        
assertThat(splits.get(1).splitId()).isEqualTo("(0,9223372036854775807)");
+    }
+
+    @TestTemplate
+    @DisplayName("Test basic splitting with RANDOMPARTITIONER")
+    public void testGenerateSplitsRandomPartitioner(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic) {
+        final int parallelism = 2;
+        final SplitsGenerator generator =
+                new SplitsGenerator(
+                        RANDOMPARTITIONER,
+                        cassandraTestEnvironment.getSession(),
+                        CassandraTestEnvironment.KEYSPACE,
+                        CassandraTestEnvironment.SPLITS_TABLE,
+                        parallelism,
+                        null);
+        List<CassandraSplit> splits = generator.generateSplits();
+
+        // no maxSplitMemorySize specified falling back number of splits = 
parallelism
+        assertThat(splits.size()).isEqualTo(parallelism);
+        
assertThat(splits.get(0).splitId()).isEqualTo("(0,85070591730234615865843651857942052864)");
+        assertThat(splits.get(1).splitId())
+                .isEqualTo(
+                        
"(85070591730234615865843651857942052864,170141183460469231731687303715884105727)");
+    }
+
+    @TestTemplate
+    @DisplayName("Test splitting with a correct split size set")
+    public void testGenerateSplitsWithCorrectSize(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        final int parallelism = 2;
+        SplitsGenerator generator =
+                new SplitsGenerator(
+                        MURMUR3PARTITIONER,
+                        cassandraTestEnvironment.getSession(),
+                        CassandraTestEnvironment.KEYSPACE,
+                        CassandraTestEnvironment.SPLITS_TABLE,
+                        parallelism,
+                        10000L);
+        assertThat(generator.estimateTableSize()).isEqualTo(35840L);
+        List<CassandraSplit> splits = generator.generateSplits();
+        // nb splits = tableSize / maxSplitMemorySize
+        assertThat(splits.size()).isEqualTo(3);
+    }
+
+    @TestTemplate
+    @DisplayName("Test splitting with a too big split size set")
+    public void testGenerateSplitsWithTooBigSize(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        final int parallelism = 20;
+        SplitsGenerator generator =
+                new SplitsGenerator(
+                        MURMUR3PARTITIONER,
+                        cassandraTestEnvironment.getSession(),
+                        CassandraTestEnvironment.KEYSPACE,
+                        CassandraTestEnvironment.SPLITS_TABLE,
+                        parallelism,
+                        100_000_000L);
+        assertThat(generator.estimateTableSize()).isEqualTo(35840L);
+        List<CassandraSplit> splits = generator.generateSplits();
+        // tableSize / maxSplitMemorySize is too little compared to 
parallelism falling back to
+        // number of splits = parallelism

Review Comment:
   Wondering if this shouldn't result in a single split instead :thinking: 



##########
flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import org.apache.flink.connector.testframe.TestResource;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.SocketOptions;
+import com.datastax.driver.core.Statement;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.CassandraContainer;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.OutputFrame;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.CassandraQueryWaitStrategy;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Junit test environment that contains everything needed at the test suite 
level: testContainer
+ * setup, keyspace setup, Cassandra cluster/session management ClusterBuilder 
setup).
+ */
+@Testcontainers
+public class CassandraTestEnvironment implements TestResource {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CassandraTestEnvironment.class);
+    private static final String DOCKER_CASSANDRA_IMAGE = "cassandra:4.0.8";
+    private static final int CQL_PORT = 9042;
+
+    private static final int READ_TIMEOUT_MILLIS = 36000;
+
+    private static final long FLUSH_MEMTABLES_DELAY =
+            30_000L; // updating flushing mem table to SS tables is long, it 
is the minimum delay.
+
+    static final String KEYSPACE = "flink";
+
+    private static final String CREATE_KEYSPACE_QUERY =
+            "CREATE KEYSPACE "
+                    + KEYSPACE
+                    + " WITH replication= {'class':'SimpleStrategy', 
'replication_factor':1};";
+
+    static final String SPLITS_TABLE = "flinksplits";
+    private static final String CREATE_SPLITS_TABLE_QUERY =
+            "CREATE TABLE " + KEYSPACE + "." + SPLITS_TABLE + " (id int 
PRIMARY KEY, counter int);";
+    private static final String INSERT_INTO_FLINK_SPLITS =
+            "INSERT INTO " + KEYSPACE + "." + SPLITS_TABLE + " (id, counter)" 
+ " VALUES (%d, %d)";
+    private static final int NB_SPLITS_RECORDS = 1000;
+
+    @Container private final CassandraContainer cassandraContainer;
+
+    private Cluster cluster;
+    private Session session;
+    private ClusterBuilder clusterBuilder;
+
+    public CassandraTestEnvironment() {
+        cassandraContainer = new CassandraContainer(DOCKER_CASSANDRA_IMAGE);
+        // more generous timeouts
+        addJavaOpts(
+                cassandraContainer,
+                "-Dcassandra.request_timeout_in_ms=30000",
+                "-Dcassandra.read_request_timeout_in_ms=15000",
+                "-Dcassandra.write_request_timeout_in_ms=6000");
+    }
+
+    @Override
+    public void startUp() throws Exception {
+        startEnv();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        stopEnv();
+    }
+
+    private static void addJavaOpts(GenericContainer<?> container, String... 
opts) {
+        String jvmOpts = container.getEnvMap().getOrDefault("JVM_OPTS", "");
+        container.withEnv("JVM_OPTS", jvmOpts + " " + StringUtils.join(opts, " 
"));
+    }
+
+    private void startEnv() throws Exception {
+        // configure container start to wait until cassandra is ready to 
receive queries
+        cassandraContainer.waitingFor(new CassandraQueryWaitStrategy());
+        // start with retrials
+        cassandraContainer.start();
+        cassandraContainer.followOutput(
+                new Slf4jLogConsumer(LOG),
+                OutputFrame.OutputType.END,
+                OutputFrame.OutputType.STDERR,
+                OutputFrame.OutputType.STDOUT);
+
+        cluster = cassandraContainer.getCluster();
+        clusterBuilder =
+                createBuilderWithConsistencyLevel(
+                        ConsistencyLevel.ONE,
+                        cassandraContainer.getHost(),
+                        cassandraContainer.getMappedPort(CQL_PORT));
+
+        session = cluster.connect();
+        session.execute(requestWithTimeout(CREATE_KEYSPACE_QUERY));
+        // create a dedicated table for split size tests (to avoid having to 
flush with each test)
+        insertTestDataForSplitSizeTests();

Review Comment:
   Note: If we ever use this environment in multiple tests we'd want this to be 
an opt-in thing so we don't wait unnecessarily for a table that we don't even 
need.



##########
flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import org.apache.flink.connector.testframe.TestResource;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.SocketOptions;
+import com.datastax.driver.core.Statement;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.CassandraContainer;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.OutputFrame;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.CassandraQueryWaitStrategy;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Junit test environment that contains everything needed at the test suite 
level: testContainer
+ * setup, keyspace setup, Cassandra cluster/session management ClusterBuilder 
setup).
+ */
+@Testcontainers
+public class CassandraTestEnvironment implements TestResource {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CassandraTestEnvironment.class);
+    private static final String DOCKER_CASSANDRA_IMAGE = "cassandra:4.0.8";
+    private static final int CQL_PORT = 9042;
+
+    private static final int READ_TIMEOUT_MILLIS = 36000;
+
+    private static final long FLUSH_MEMTABLES_DELAY =
+            30_000L; // updating flushing mem table to SS tables is long, it 
is the minimum delay.
+
+    static final String KEYSPACE = "flink";
+
+    private static final String CREATE_KEYSPACE_QUERY =
+            "CREATE KEYSPACE "
+                    + KEYSPACE
+                    + " WITH replication= {'class':'SimpleStrategy', 
'replication_factor':1};";
+
+    static final String SPLITS_TABLE = "flinksplits";
+    private static final String CREATE_SPLITS_TABLE_QUERY =
+            "CREATE TABLE " + KEYSPACE + "." + SPLITS_TABLE + " (id int 
PRIMARY KEY, counter int);";
+    private static final String INSERT_INTO_FLINK_SPLITS =
+            "INSERT INTO " + KEYSPACE + "." + SPLITS_TABLE + " (id, counter)" 
+ " VALUES (%d, %d)";
+    private static final int NB_SPLITS_RECORDS = 1000;
+
+    @Container private final CassandraContainer cassandraContainer;
+
+    private Cluster cluster;
+    private Session session;
+    private ClusterBuilder clusterBuilder;
+
+    public CassandraTestEnvironment() {
+        cassandraContainer = new CassandraContainer(DOCKER_CASSANDRA_IMAGE);
+        // more generous timeouts
+        addJavaOpts(
+                cassandraContainer,
+                "-Dcassandra.request_timeout_in_ms=30000",
+                "-Dcassandra.read_request_timeout_in_ms=15000",
+                "-Dcassandra.write_request_timeout_in_ms=6000");
+    }
+
+    @Override
+    public void startUp() throws Exception {
+        startEnv();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        stopEnv();
+    }
+
+    private static void addJavaOpts(GenericContainer<?> container, String... 
opts) {
+        String jvmOpts = container.getEnvMap().getOrDefault("JVM_OPTS", "");
+        container.withEnv("JVM_OPTS", jvmOpts + " " + StringUtils.join(opts, " 
"));
+    }
+
+    private void startEnv() throws Exception {
+        // configure container start to wait until cassandra is ready to 
receive queries
+        cassandraContainer.waitingFor(new CassandraQueryWaitStrategy());
+        // start with retrials
+        cassandraContainer.start();
+        cassandraContainer.followOutput(
+                new Slf4jLogConsumer(LOG),
+                OutputFrame.OutputType.END,
+                OutputFrame.OutputType.STDERR,
+                OutputFrame.OutputType.STDOUT);
+
+        cluster = cassandraContainer.getCluster();
+        clusterBuilder =
+                createBuilderWithConsistencyLevel(
+                        ConsistencyLevel.ONE,
+                        cassandraContainer.getHost(),
+                        cassandraContainer.getMappedPort(CQL_PORT));
+
+        session = cluster.connect();
+        session.execute(requestWithTimeout(CREATE_KEYSPACE_QUERY));
+        // create a dedicated table for split size tests (to avoid having to 
flush with each test)
+        insertTestDataForSplitSizeTests();
+    }
+
+    private void insertTestDataForSplitSizeTests() throws Exception {
+        session.execute(requestWithTimeout(CREATE_SPLITS_TABLE_QUERY));
+        for (int i = 0; i < NB_SPLITS_RECORDS; i++) {
+            
session.execute(requestWithTimeout(String.format(INSERT_INTO_FLINK_SPLITS, i, 
i)));
+        }
+        flushMemTables(SPLITS_TABLE);
+    }
+
+    private void stopEnv() {
+
+        if (session != null) {
+            session.close();
+        }
+        if (cluster != null) {
+            cluster.close();
+        }
+        cassandraContainer.stop();
+    }
+
+    private ClusterBuilder createBuilderWithConsistencyLevel(
+            ConsistencyLevel consistencyLevel, String host, int port) {
+        return new ClusterBuilder() {
+            @Override
+            protected Cluster buildCluster(Cluster.Builder builder) {
+                return builder.addContactPointsWithPorts(new 
InetSocketAddress(host, port))
+                        .withQueryOptions(
+                                new QueryOptions()
+                                        .setConsistencyLevel(consistencyLevel)
+                                        
.setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL))
+                        .withSocketOptions(
+                                new SocketOptions()
+                                        // default timeout x 3
+                                        .setConnectTimeoutMillis(15000)
+                                        // default timeout x3 and higher than
+                                        // request_timeout_in_ms at the 
cluster level
+                                        
.setReadTimeoutMillis(READ_TIMEOUT_MILLIS))
+                        .withoutJMXReporting()
+                        .withoutMetrics()
+                        .build();
+            }
+        };
+    }
+
+    /**
+     * Force the flush of cassandra memTables to SSTables in order to update 
size_estimates. It is
+     * needed for the tests because we just inserted records, we need to force 
cassandra to update
+     * size_estimates system table.
+     */
+    void flushMemTables(String table) throws Exception {
+        cassandraContainer.execInContainer("nodetool", "flush", KEYSPACE, 
table);

Review Comment:
   what's the difference between `flush`and 
https://docs.datastax.com/en/cassandra-oss/3.x/cassandra/tools/toolsRefreshSizeEstimates.html?



##########
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraSplitEnumerator.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.enumerator;
+
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+import org.apache.flink.connector.cassandra.source.split.SplitsGenerator;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Metadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+
+import static 
org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.MURMUR3PARTITIONER;
+import static 
org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.RANDOMPARTITIONER;
+
+/** {@link SplitEnumerator} that splits Cassandra cluster into {@link 
CassandraSplit}s. */
+public final class CassandraSplitEnumerator
+        implements SplitEnumerator<CassandraSplit, CassandraEnumeratorState> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CassandraSplitEnumerator.class);
+
+    private final SplitEnumeratorContext<CassandraSplit> enumeratorContext;
+    private final CassandraEnumeratorState state;
+    private final Cluster cluster;
+
+    public CassandraSplitEnumerator(
+            SplitEnumeratorContext<CassandraSplit> enumeratorContext,
+            CassandraEnumeratorState state,
+            ClusterBuilder clusterBuilder) {
+        this.enumeratorContext = enumeratorContext;
+        this.state = state == null ? new CassandraEnumeratorState() : state /* 
snapshot restore*/;
+        this.cluster = clusterBuilder.getCluster();
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname) {
+        checkReaderRegistered(subtaskId);
+        final CassandraSplit cassandraSplit = state.getASplit();
+        if (cassandraSplit != null) {
+            LOG.info("Assigning splits to reader {}", subtaskId);
+            enumeratorContext.assignSplit(cassandraSplit, subtaskId);
+        } else {
+            LOG.info(
+                    "No split assigned to reader {} because the enumerator has 
no unassigned split left",
+                    subtaskId);
+        }
+        if (!state.hasMoreSplits()) {
+            LOG.info(
+                    "No more CassandraSplits to assign. Sending 
NoMoreSplitsEvent to reader {}.",
+                    subtaskId);
+            enumeratorContext.signalNoMoreSplits(subtaskId);
+        }
+    }
+
+    @Override
+    public void start() {
+        // discover the splits and update unprocessed splits and then assign 
them.
+        // There is only an initial splits discovery, no periodic discovery.
+        enumeratorContext.callAsync(
+                this::discoverSplits,
+                (splits, throwable) -> {
+                    LOG.info("Add {} splits to CassandraSplitEnumerator.", 
splits.size());
+                    state.addNewSplits(splits);
+                });
+    }
+
+    private List<CassandraSplit> discoverSplits() {
+        final int numberOfSplits = enumeratorContext.currentParallelism();
+        final Metadata clusterMetadata = cluster.getMetadata();
+        final String partitionerName = clusterMetadata.getPartitioner();
+        final SplitsGenerator.CassandraPartitioner partitioner =
+                partitionerName.contains(MURMUR3PARTITIONER.className())
+                        ? MURMUR3PARTITIONER
+                        : RANDOMPARTITIONER;
+        return new SplitsGenerator(partitioner).generateSplits(numberOfSplits);

Review Comment:
   Looking at SplitsGenerator#generateaSplits it should be straight-forward to 
lazily generate splits, no?
   The start-/endToken, increment and numSplits would be your state.
   
   This seems safer than to add safeguards for split counts, which'd be 
difficult to asses imo as to how effective/dangerous they are.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to