This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-cassandra.git


The following commit(s) were added to refs/heads/main by this push:
     new 72e3bef  [FLINK-26822] Add Cassandra Source
72e3bef is described below

commit 72e3bef1fb9ee6042955b5e9871a9f70a8837cca
Author: Etienne Chauchot <echauc...@apache.org>
AuthorDate: Wed Mar 22 09:38:17 2023 +0100

    [FLINK-26822] Add Cassandra Source
---
 flink-connector-cassandra/pom.xml                  |   7 +
 .../cassandra/source/CassandraSource.java          | 203 +++++++++++++++++
 .../enumerator/CassandraEnumeratorState.java       | 167 ++++++++++++++
 .../CassandraEnumeratorStateSerializer.java        | 106 +++++++++
 .../enumerator/CassandraSplitEnumerator.java       | 148 +++++++++++++
 .../source/reader/CassandraRecordEmitter.java      | 138 ++++++++++++
 .../cassandra/source/reader/CassandraRow.java      |  46 ++++
 .../source/reader/CassandraSourceReader.java       | 104 +++++++++
 .../reader/CassandraSourceReaderFactory.java       |  55 +++++
 .../source/reader/CassandraSplitReader.java        | 205 +++++++++++++++++
 .../cassandra/source/split/CassandraSplit.java     |  75 +++++++
 .../source/split/CassandraSplitSerializer.java     |  63 ++++++
 .../cassandra/source/split/SplitsGenerator.java    | 242 +++++++++++++++++++++
 .../source/utils/BigIntegerSerializationUtils.java |  40 ++++
 .../cassandra/example/BatchPojoExample.java        |   1 +
 .../cassandra/source/CassandraSourceITCase.java    | 234 ++++++++++++++++++++
 .../cassandra/source/CassandraTestContext.java     | 161 ++++++++++++++
 .../cassandra/source/CassandraTestEnvironment.java | 196 +++++++++++++++++
 .../CassandraEnumeratorStateSerializerTest.java    |  58 +++++
 .../source/reader/CassandraQueryTest.java          | 119 ++++++++++
 .../source/split/CassandraSplitSerializerTest.java |  43 ++++
 .../cassandra/utils}/Pojo.java                     |  26 ++-
 tools/maven/suppressions.xml                       |   2 +-
 23 files changed, 2437 insertions(+), 2 deletions(-)

diff --git a/flink-connector-cassandra/pom.xml 
b/flink-connector-cassandra/pom.xml
index fa78a65..58d70b4 100644
--- a/flink-connector-cassandra/pom.xml
+++ b/flink-connector-cassandra/pom.xml
@@ -78,6 +78,13 @@ under the License.
                        <scope>provided</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-base</artifactId>
+                       <version>${flink.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
                <dependency>
                        <groupId>org.scala-lang</groupId>
                        <artifactId>scala-library</artifactId>
diff --git 
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java
 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java
new file mode 100644
index 0000000..dd45913
--- /dev/null
+++ 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.MemorySize;
+import 
org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorState;
+import 
org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorStateSerializer;
+import 
org.apache.flink.connector.cassandra.source.enumerator.CassandraSplitEnumerator;
+import 
org.apache.flink.connector.cassandra.source.reader.CassandraSourceReaderFactory;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+import 
org.apache.flink.connector.cassandra.source.split.CassandraSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A bounded source to read from Cassandra and return a collection of entities 
as {@code
+ * DataStream<Entity>}. An entity is built by Cassandra mapper ({@code
+ * com.datastax.driver.mapping.EntityMapper}) based on a POJO containing 
annotations (as described
+ * in <a
+ * 
href="https://docs.datastax.com/en/developer/java-driver/3.11/manual/object_mapper/creating/";>
+ * Cassandra object mapper</a>).
+ *
+ * <p>To use it, do the following:
+ *
+ * <pre>{@code
+ * ClusterBuilder clusterBuilder = new ClusterBuilder() {
+ *   @Override
+ *   protected Cluster buildCluster(Cluster.Builder builder) {
+ *     return builder.addContactPointsWithPorts(new 
InetSocketAddress(HOST,PORT))
+ *                   .withQueryOptions(new 
QueryOptions().setConsistencyLevel(CL))
+ *                   .withSocketOptions(new SocketOptions()
+ *                   .setConnectTimeoutMillis(CONNECT_TIMEOUT)
+ *                   .setReadTimeoutMillis(READ_TIMEOUT))
+ *                   .build();
+ *   }
+ * };
+ * long maxSplitMemorySize = ... //optional max split size in bytes. If not 
set, maxSplitMemorySize = tableSize / parallelism
+ * Source cassandraSource = new CassandraSource(clusterBuilder,
+ *                                              maxSplitMemorySize,
+ *                                              Pojo.class,
+ *                                              "select ... from 
KEYSPACE.TABLE ...;",
+ *                                              () -> new Mapper.Option[] 
{Mapper.Option.saveNullFields(true)});
+ *
+ * DataStream<Pojo> stream = env.fromSource(cassandraSource, 
WatermarkStrategy.noWatermarks(),
+ * "CassandraSource");
+ * }</pre>
+ */
+@PublicEvolving
+public class CassandraSource<OUT>
+        implements Source<OUT, CassandraSplit, CassandraEnumeratorState>, 
ResultTypeQueryable<OUT> {
+
+    public static final Pattern CQL_PROHIBITED_CLAUSES_REGEXP =
+            Pattern.compile("(?i).*(AVG|COUNT|MIN|MAX|SUM|ORDER|GROUP BY).*");
+    public static final Pattern SELECT_REGEXP =
+            Pattern.compile("(?i)select .+ from (\\w+)\\.(\\w+).*;$");
+
+    private static final long serialVersionUID = 1L;
+
+    private final ClusterBuilder clusterBuilder;
+    private final Class<OUT> pojoClass;
+    private final String query;
+    private final String keyspace;
+    private final String table;
+    private final MapperOptions mapperOptions;
+
+    private final long maxSplitMemorySize;
+    private static final long MIN_SPLIT_MEMORY_SIZE = 
MemorySize.ofMebiBytes(10).getBytes();
+    static final long MAX_SPLIT_MEMORY_SIZE_DEFAULT = 
MemorySize.ofMebiBytes(64).getBytes();
+
+    public CassandraSource(
+            ClusterBuilder clusterBuilder,
+            Class<OUT> pojoClass,
+            String query,
+            MapperOptions mapperOptions) {
+        this(clusterBuilder, MAX_SPLIT_MEMORY_SIZE_DEFAULT, pojoClass, query, 
mapperOptions);
+    }
+
+    public CassandraSource(
+            ClusterBuilder clusterBuilder,
+            long maxSplitMemorySize,
+            Class<OUT> pojoClass,
+            String query,
+            MapperOptions mapperOptions) {
+        checkNotNull(clusterBuilder, "ClusterBuilder required but not 
provided");
+        checkNotNull(pojoClass, "POJO class required but not provided");
+        checkNotNull(query, "query required but not provided");
+        checkState(
+                maxSplitMemorySize >= MIN_SPLIT_MEMORY_SIZE,
+                "Defined maxSplitMemorySize (%s) is below minimum (%s)",
+                maxSplitMemorySize,
+                MIN_SPLIT_MEMORY_SIZE);
+        this.maxSplitMemorySize = maxSplitMemorySize;
+        final Matcher queryMatcher = checkQueryValidity(query);
+        this.query = query;
+        this.keyspace = queryMatcher.group(1);
+        this.table = queryMatcher.group(2);
+        this.clusterBuilder = clusterBuilder;
+        ClosureCleaner.clean(clusterBuilder, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+        this.pojoClass = pojoClass;
+        this.mapperOptions = mapperOptions;
+    }
+
+    @VisibleForTesting
+    public static Matcher checkQueryValidity(String query) {
+        checkState(
+                !query.matches(CQL_PROHIBITED_CLAUSES_REGEXP.pattern()),
+                "Aggregations/OrderBy are not supported because the query is 
executed on subsets/partitions of the input table");
+        final Matcher queryMatcher = SELECT_REGEXP.matcher(query);
+        checkState(
+                queryMatcher.matches(),
+                "Query must be of the form select ... from keyspace.table 
...;");
+        return queryMatcher;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    @Internal
+    @Override
+    public SourceReader<OUT, CassandraSplit> createReader(SourceReaderContext 
readerContext) {
+        return new CassandraSourceReaderFactory<OUT>()
+                .create(
+                        readerContext,
+                        clusterBuilder,
+                        pojoClass,
+                        query,
+                        keyspace,
+                        table,
+                        mapperOptions);
+    }
+
+    @Internal
+    @Override
+    public SplitEnumerator<CassandraSplit, CassandraEnumeratorState> 
createEnumerator(
+            SplitEnumeratorContext<CassandraSplit> enumContext) {
+        return new CassandraSplitEnumerator(
+                enumContext, null, clusterBuilder, maxSplitMemorySize, 
keyspace, table);
+    }
+
+    @Internal
+    @Override
+    public SplitEnumerator<CassandraSplit, CassandraEnumeratorState> 
restoreEnumerator(
+            SplitEnumeratorContext<CassandraSplit> enumContext,
+            CassandraEnumeratorState enumCheckpoint) {
+        return new CassandraSplitEnumerator(
+                enumContext, enumCheckpoint, clusterBuilder, 
maxSplitMemorySize, keyspace, table);
+    }
+
+    @Internal
+    @Override
+    public SimpleVersionedSerializer<CassandraSplit> getSplitSerializer() {
+        return CassandraSplitSerializer.INSTANCE;
+    }
+
+    @Internal
+    @Override
+    public SimpleVersionedSerializer<CassandraEnumeratorState> 
getEnumeratorCheckpointSerializer() {
+        return CassandraEnumeratorStateSerializer.INSTANCE;
+    }
+
+    @Override
+    public TypeInformation<OUT> getProducedType() {
+        return TypeInformation.of(pojoClass);
+    }
+}
diff --git 
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorState.java
 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorState.java
new file mode 100644
index 0000000..c16fe13
--- /dev/null
+++ 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorState.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.enumerator;
+
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+import org.apache.flink.connector.cassandra.source.split.SplitsGenerator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.math.BigInteger;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Queue;
+
+/**
+ * State for {@link CassandraSplitEnumerator}. It stores the offset ({@code 
startToken}) of the last
+ * lazy {@link CassandraSplit} generation and the number of splits left to 
generate. Upon
+ * restoration of this sate, {@link SplitsGenerator#prepareSplits()} is 
obviously not re-run. So we
+ * need to store also the result of this initial splits preparation ({@code 
increment} and {@code
+ * maxToken}).
+ */
+public class CassandraEnumeratorState {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CassandraEnumeratorState.class);
+    private long numSplitsLeftToGenerate;
+    private BigInteger increment;
+    private BigInteger startToken;
+    private BigInteger maxToken;
+    // splits that were assigned to a failed reader and that were not part of 
a checkpoint, so after
+    // restoration, they need to be reassigned.
+    private final Queue<CassandraSplit> splitsToReassign;
+
+    CassandraEnumeratorState() {
+        this.splitsToReassign = new ArrayDeque<>();
+    }
+
+    public CassandraEnumeratorState(
+            long numSplitsLeftToGenerate,
+            BigInteger increment,
+            BigInteger startToken,
+            BigInteger maxToken,
+            Queue<CassandraSplit> splitsToReassign) {
+        this.numSplitsLeftToGenerate = numSplitsLeftToGenerate;
+        this.increment = increment;
+        this.startToken = startToken;
+        this.maxToken = maxToken;
+        this.splitsToReassign = splitsToReassign;
+    }
+
+    Queue<CassandraSplit> getSplitsToReassign() {
+        return splitsToReassign;
+    }
+
+    public long getNumSplitsLeftToGenerate() {
+        return numSplitsLeftToGenerate;
+    }
+
+    BigInteger getIncrement() {
+        return increment;
+    }
+
+    BigInteger getStartToken() {
+        return startToken;
+    }
+
+    BigInteger getMaxToken() {
+        return maxToken;
+    }
+
+    void addSplitsBack(Collection<CassandraSplit> splits) {
+        LOG.info(
+                "Add {} splits back to CassandraSplitEnumerator for 
reassignment after failover",
+                splits.size());
+        splitsToReassign.addAll(splits);
+    }
+
+    /**
+     * Provide a {@link CassandraSplit} that was assigned to a failed reader 
or lazily create one.
+     * Splits contain a range of the Cassandra ring of {@code 
maxSplitMemorySize}. There is no way
+     * to estimate the size of the data with the optional SQL filters without 
reading the data. So
+     * the split can be smaller than {@code maxSplitMemorySize} when the query 
is actually executed.
+     */
+    public @Nullable CassandraSplit getNextSplit() {
+        // serve slits to reassign first
+        final CassandraSplit splitToReassign = splitsToReassign.poll();
+        if (splitToReassign != null) {
+            return splitToReassign;
+        } // else no more splits to reassign, generate one
+        if (numSplitsLeftToGenerate == 0) {
+            return null; // enumerator will send the no more split message to 
the requesting reader
+        }
+        BigInteger endToken =
+                numSplitsLeftToGenerate == 1
+                        // last split to generate, round up to the last token 
of the ring
+                        ? maxToken
+                        : startToken.add(increment);
+        CassandraSplit split = new CassandraSplit(startToken, endToken);
+        // prepare for next call
+        this.startToken = endToken;
+        numSplitsLeftToGenerate--;
+        return split;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        CassandraEnumeratorState that = (CassandraEnumeratorState) o;
+        if (this.splitsToReassign.size() != that.splitsToReassign.size()) {
+            return false;
+        }
+        for (CassandraSplit cassandraSplit : splitsToReassign) {
+            if (!that.splitsToReassign.contains(cassandraSplit)) {
+                return false;
+            }
+        }
+        return numSplitsLeftToGenerate == that.numSplitsLeftToGenerate
+                && increment.equals(that.increment)
+                && startToken.equals(that.startToken)
+                && maxToken.equals(that.maxToken);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                numSplitsLeftToGenerate, increment, startToken, maxToken, 
splitsToReassign);
+    }
+
+    @Override
+    public String toString() {
+        return "CassandraEnumeratorState{"
+                + "numSplitsLeftToGenerate="
+                + numSplitsLeftToGenerate
+                + ", increment="
+                + increment
+                + ", startToken="
+                + startToken
+                + ", maxToken="
+                + maxToken
+                + ", splitsToReassign="
+                + splitsToReassign
+                + '}';
+    }
+}
diff --git 
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java
 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java
new file mode 100644
index 0000000..3725d47
--- /dev/null
+++ 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.enumerator;
+
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+import 
org.apache.flink.connector.cassandra.source.split.CassandraSplitSerializer;
+import 
org.apache.flink.connector.cassandra.source.utils.BigIntegerSerializationUtils;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.IOUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.math.BigInteger;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+/** Serializer for {@link CassandraEnumeratorState}. */
+public class CassandraEnumeratorStateSerializer
+        implements SimpleVersionedSerializer<CassandraEnumeratorState> {
+
+    public static final CassandraEnumeratorStateSerializer INSTANCE =
+            new CassandraEnumeratorStateSerializer();
+    public static final int CURRENT_VERSION = 0;
+
+    private CassandraEnumeratorStateSerializer() {}
+
+    @Override
+    public int getVersion() {
+        return CURRENT_VERSION;
+    }
+
+    @Override
+    public byte[] serialize(CassandraEnumeratorState cassandraEnumeratorState) 
throws IOException {
+        try (final ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+                final ObjectOutputStream objectOutputStream =
+                        new ObjectOutputStream(byteArrayOutputStream)) {
+            final Queue<CassandraSplit> splitsToReassign =
+                    cassandraEnumeratorState.getSplitsToReassign();
+            objectOutputStream.writeInt(splitsToReassign.size());
+            for (CassandraSplit cassandraSplit : splitsToReassign) {
+                final byte[] serializedSplit =
+                        
CassandraSplitSerializer.INSTANCE.serialize(cassandraSplit);
+                objectOutputStream.writeInt(serializedSplit.length);
+                objectOutputStream.write(serializedSplit);
+            }
+
+            
objectOutputStream.writeLong(cassandraEnumeratorState.getNumSplitsLeftToGenerate());
+            BigIntegerSerializationUtils.write(
+                    cassandraEnumeratorState.getIncrement(), 
objectOutputStream);
+            BigIntegerSerializationUtils.write(
+                    cassandraEnumeratorState.getStartToken(), 
objectOutputStream);
+            BigIntegerSerializationUtils.write(
+                    cassandraEnumeratorState.getMaxToken(), 
objectOutputStream);
+
+            objectOutputStream.flush();
+            return byteArrayOutputStream.toByteArray();
+        }
+    }
+
+    @Override
+    public CassandraEnumeratorState deserialize(int version, byte[] 
serialized) throws IOException {
+        try (final ByteArrayInputStream byteArrayInputStream =
+                        new ByteArrayInputStream(serialized);
+                final ObjectInputStream objectInputStream =
+                        new ObjectInputStream(byteArrayInputStream)) {
+            final Queue<CassandraSplit> splitsToReassign = new ArrayDeque<>();
+            final int splitsToReassignSize = objectInputStream.readInt();
+            for (int i = 0; i < splitsToReassignSize; i++) {
+                final int splitSize = objectInputStream.readInt();
+                final byte[] splitBytes = new byte[splitSize];
+                IOUtils.readFully(objectInputStream, splitBytes, 0, splitSize);
+                final CassandraSplit split =
+                        CassandraSplitSerializer.INSTANCE.deserialize(
+                                CassandraSplitSerializer.CURRENT_VERSION, 
splitBytes);
+                splitsToReassign.add(split);
+            }
+
+            final long numSplitsLeftToGenerate = objectInputStream.readLong();
+            final BigInteger increment = 
BigIntegerSerializationUtils.read(objectInputStream);
+            final BigInteger startToken = 
BigIntegerSerializationUtils.read(objectInputStream);
+            final BigInteger maxToken = 
BigIntegerSerializationUtils.read(objectInputStream);
+
+            return new CassandraEnumeratorState(
+                    numSplitsLeftToGenerate, increment, startToken, maxToken, 
splitsToReassign);
+        }
+    }
+}
diff --git 
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraSplitEnumerator.java
 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraSplitEnumerator.java
new file mode 100644
index 0000000..1337846
--- /dev/null
+++ 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraSplitEnumerator.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.enumerator;
+
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+import org.apache.flink.connector.cassandra.source.split.SplitsGenerator;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+
+import static 
org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.MURMUR3PARTITIONER;
+import static 
org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.RANDOMPARTITIONER;
+
+/** {@link SplitEnumerator} that splits Cassandra cluster into {@link 
CassandraSplit}s. */
+public final class CassandraSplitEnumerator
+        implements SplitEnumerator<CassandraSplit, CassandraEnumeratorState> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CassandraSplitEnumerator.class);
+
+    private final SplitEnumeratorContext<CassandraSplit> enumeratorContext;
+    private CassandraEnumeratorState state;
+    private final Cluster cluster;
+    private final Long maxSplitMemorySize;
+    private final Session session;
+    private final String keyspace;
+    private final String table;
+
+    public CassandraSplitEnumerator(
+            SplitEnumeratorContext<CassandraSplit> enumeratorContext,
+            CassandraEnumeratorState state,
+            ClusterBuilder clusterBuilder,
+            Long maxSplitMemorySize,
+            String keyspace,
+            String table) {
+        this.enumeratorContext = enumeratorContext;
+        this.state = state == null ? new CassandraEnumeratorState() : state /* 
snapshot restore*/;
+        this.cluster = clusterBuilder.getCluster();
+        this.maxSplitMemorySize = maxSplitMemorySize;
+        this.session = cluster.newSession();
+        this.keyspace = keyspace;
+        this.table = table;
+    }
+
+    @Override
+    public void start() {
+        enumeratorContext.callAsync(
+                this::prepareSplits,
+                (preparedState, throwable) -> {
+                    LOG.debug("Initialized CassandraEnumeratorState: {}", 
preparedState.toString());
+                    state = preparedState;
+                });
+    }
+
+    private CassandraEnumeratorState prepareSplits() {
+        final int parallelism = enumeratorContext.currentParallelism();
+        final String partitionerName = cluster.getMetadata().getPartitioner();
+        final SplitsGenerator.CassandraPartitioner partitioner =
+                partitionerName.contains(MURMUR3PARTITIONER.getClassName())
+                        ? MURMUR3PARTITIONER
+                        : RANDOMPARTITIONER;
+        final SplitsGenerator splitsGenerator =
+                new SplitsGenerator(
+                        partitioner, session, keyspace, table, parallelism, 
maxSplitMemorySize);
+        return splitsGenerator.prepareSplits();
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname) {
+        checkReaderRegistered(subtaskId);
+        final CassandraSplit cassandraSplit = state.getNextSplit();
+        if (cassandraSplit != null) {
+            LOG.info("Assigning splits to reader {}", subtaskId);
+            enumeratorContext.assignSplit(cassandraSplit, subtaskId);
+        } else {
+            LOG.info(
+                    "No split assigned to reader {} because the enumerator has 
no unassigned split left. Sending NoMoreSplitsEvent to reader",
+                    subtaskId);
+            enumeratorContext.signalNoMoreSplits(subtaskId);
+        }
+    }
+
+    @Override
+    public void addSplitsBack(List<CassandraSplit> splits, int subtaskId) {
+        // splits that were assigned to a failed reader and that were not part 
of a checkpoint, so
+        // after restoration, they need to be reassigned
+        state.addSplitsBack(splits);
+    }
+
+    @Override
+    public void addReader(int subtaskId) {
+        // nothing to do on reader registration as the CassandraSplits are 
generated lazily
+    }
+
+    private void checkReaderRegistered(int readerId) {
+        if (!enumeratorContext.registeredReaders().containsKey(readerId)) {
+            throw new IllegalStateException(
+                    String.format("Reader %d is not registered to source 
coordinator", readerId));
+        }
+    }
+
+    @Override
+    public CassandraEnumeratorState snapshotState(long checkpointId) {
+        return state;
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            if (session != null) {
+                session.close();
+            }
+        } catch (Exception e) {
+            LOG.error("Error while closing session.", e);
+        }
+        try {
+            if (cluster != null) {
+                cluster.close();
+            }
+        } catch (Exception e) {
+            LOG.error("Error while closing cluster.", e);
+        }
+    }
+}
diff --git 
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraRecordEmitter.java
 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraRecordEmitter.java
new file mode 100644
index 0000000..8287057
--- /dev/null
+++ 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraRecordEmitter.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.reader;
+
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.ExecutionInfo;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * {@link RecordEmitter} that converts the {@link CassandraRow} read by the 
{@link
+ * CassandraSplitReader} to specified POJO and output it. This class uses the 
Cassandra driver
+ * mapper to map the row to the POJO.
+ *
+ * @param <OUT> type of POJO record to output
+ */
+class CassandraRecordEmitter<OUT> implements RecordEmitter<CassandraRow, OUT, 
CassandraSplit> {
+
+    private final Function<ResultSet, OUT> map;
+
+    public CassandraRecordEmitter(Function<ResultSet, OUT> map) {
+        this.map = map;
+    }
+
+    @Override
+    public void emitRecord(
+            CassandraRow cassandraRow, SourceOutput<OUT> output, 
CassandraSplit cassandraSplit) {
+        // Mapping from a row to a Class<OUT> is a complex operation involving 
reflection API.
+        // It is better to use Cassandra mapper for it.
+        // But the mapper takes only a resultSet as input hence forging one 
containing only the Row
+        ResultSet resultSet = new SingleRowResultSet(cassandraRow);
+        // output the pojo based on the cassandraRow
+        output.collect(map.apply(resultSet));
+    }
+
+    private static class SingleRowResultSet implements ResultSet {
+        private final CassandraRow cassandraRow;
+        private final Row row;
+
+        private SingleRowResultSet(CassandraRow cassandraRow) {
+            this.cassandraRow = cassandraRow;
+            this.row = cassandraRow.getRow();
+        }
+
+        @Override
+        public Row one() {
+            return row;
+        }
+
+        @Override
+        public ColumnDefinitions getColumnDefinitions() {
+            return row.getColumnDefinitions();
+        }
+
+        @Override
+        public boolean wasApplied() {
+            return true;
+        }
+
+        @Override
+        public boolean isExhausted() {
+            return true;
+        }
+
+        @Override
+        public boolean isFullyFetched() {
+            return true;
+        }
+
+        @Override
+        public int getAvailableWithoutFetching() {
+            return 1;
+        }
+
+        @Override
+        public ListenableFuture<ResultSet> fetchMoreResults() {
+            return Futures.immediateFuture(null);
+        }
+
+        @Override
+        public List<Row> all() {
+            return Collections.singletonList(row);
+        }
+
+        @Override
+        public Iterator<Row> iterator() {
+            return new Iterator<Row>() {
+
+                @Override
+                public boolean hasNext() {
+                    return true;
+                }
+
+                @Override
+                public Row next() {
+                    return row;
+                }
+            };
+        }
+
+        @Override
+        public ExecutionInfo getExecutionInfo() {
+            return cassandraRow.getExecutionInfo();
+        }
+
+        @Override
+        public List<ExecutionInfo> getAllExecutionInfo() {
+            return Collections.singletonList(cassandraRow.getExecutionInfo());
+        }
+    }
+}
diff --git 
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraRow.java
 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraRow.java
new file mode 100644
index 0000000..11c078e
--- /dev/null
+++ 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraRow.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.reader;
+
+import com.datastax.driver.core.ExecutionInfo;
+import com.datastax.driver.core.Row;
+
+/**
+ * Wrapper for Cassandra {@link Row} that stores {@link ExecutionInfo} 
Cassandra statistics about
+ * the query execution that produced this row. {@link ExecutionInfo} is useful 
for using the
+ * Cassandra mapper during row translation to pojo.
+ */
+public class CassandraRow {
+
+    private final Row row;
+    private final ExecutionInfo executionInfo;
+
+    public CassandraRow(Row row, ExecutionInfo executionInfo) {
+        this.row = row;
+        this.executionInfo = executionInfo;
+    }
+
+    public Row getRow() {
+        return row;
+    }
+
+    public ExecutionInfo getExecutionInfo() {
+        return executionInfo;
+    }
+}
diff --git 
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSourceReader.java
 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSourceReader.java
new file mode 100644
index 0000000..66eefcb
--- /dev/null
+++ 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSourceReader.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.reader;
+
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.mapping.Mapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Cassandra {@link SourceReader} that reads one {@link CassandraSplit} using 
a single thread.
+ *
+ * @param <OUT> the type of elements produced by the source
+ */
+class CassandraSourceReader<OUT>
+        extends SingleThreadMultiplexSourceReaderBase<
+                CassandraRow, OUT, CassandraSplit, CassandraSplit> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CassandraSourceReader.class);
+
+    private final Cluster cluster;
+    private final Session session;
+
+    // created by the factory
+    CassandraSourceReader(
+            SourceReaderContext context,
+            String query,
+            String keyspace,
+            String table,
+            Cluster cluster,
+            Session session,
+            Mapper<OUT> mapper) {
+        super(
+                () -> new CassandraSplitReader(cluster, session, query, 
keyspace, table),
+                new CassandraRecordEmitter<>(resultSet -> 
mapper.map(resultSet).one()),
+                context.getConfiguration(),
+                context);
+        this.cluster = cluster;
+        this.session = session;
+    }
+
+    @Override
+    public void start() {
+        context.sendSplitRequest();
+    }
+
+    @Override
+    protected void onSplitFinished(Map<String, CassandraSplit> 
finishedSplitIds) {
+        context.sendSplitRequest();
+    }
+
+    @Override
+    protected CassandraSplit initializedState(CassandraSplit cassandraSplit) {
+        return cassandraSplit;
+    }
+
+    @Override
+    protected CassandraSplit toSplitType(String splitId, CassandraSplit 
cassandraSplit) {
+        return cassandraSplit;
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        try {
+            if (session != null) {
+                session.close();
+            }
+        } catch (Exception e) {
+            LOG.error("Error while closing session.", e);
+        }
+        try {
+            if (cluster != null) {
+                cluster.close();
+            }
+        } catch (Exception e) {
+            LOG.error("Error while closing cluster.", e);
+        }
+    }
+}
diff --git 
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSourceReaderFactory.java
 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSourceReaderFactory.java
new file mode 100644
index 0000000..3d06097
--- /dev/null
+++ 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSourceReaderFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.reader;
+
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.mapping.Mapper;
+import com.datastax.driver.mapping.MappingManager;
+
+/**
+ * Factory to create {@link CassandraSourceReader}s and allow sharing the 
cluster and the session
+ * objects.
+ */
+public class CassandraSourceReaderFactory<OUT> {
+    public CassandraSourceReader<OUT> create(
+            SourceReaderContext context,
+            ClusterBuilder clusterBuilder,
+            Class<OUT> pojoClass,
+            String query,
+            String keyspace,
+            String table,
+            MapperOptions mapperOptions) {
+        Cluster cluster = clusterBuilder.getCluster();
+        Session session = cluster.connect();
+        Mapper<OUT> mapper = new MappingManager(session).mapper(pojoClass);
+        if (mapperOptions != null) {
+            Mapper.Option[] optionsArray = mapperOptions.getMapperOptions();
+            if (optionsArray != null) {
+                mapper.setDefaultGetOptions(optionsArray);
+            }
+        }
+        return new CassandraSourceReader<>(
+                context, query, keyspace, table, cluster, session, mapper);
+    }
+}
diff --git 
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java
 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java
new file mode 100644
index 0000000..2344705
--- /dev/null
+++ 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.reader;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.cassandra.source.CassandraSource;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.stream.Collectors;
+
+/**
+ * {@link SplitReader} for Cassandra source. This class is responsible for 
fetching the records as
+ * {@link CassandraRow}s. For that, it executes a range query (query that 
outputs records belonging
+ * to Cassandra token range) based on the user specified query.
+ */
+class CassandraSplitReader implements SplitReader<CassandraRow, 
CassandraSplit> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CassandraSplitReader.class);
+
+    private final Cluster cluster;
+    private final Session session;
+    private final Set<CassandraSplit> unprocessedSplits;
+    private final AtomicBoolean wakeup = new AtomicBoolean(false);
+    private final String query;
+    private final String keyspace;
+    private final String table;
+
+    public CassandraSplitReader(
+            Cluster cluster, Session session, String query, String keyspace, 
String table) {
+        this.unprocessedSplits = new HashSet<>();
+        this.query = query;
+        this.keyspace = keyspace;
+        this.table = table;
+        this.cluster = cluster;
+        this.session = session;
+    }
+
+    @Override
+    public RecordsWithSplitIds<CassandraRow> fetch() {
+        Map<String, Collection<CassandraRow>> recordsBySplit = new HashMap<>();
+        Set<String> finishedSplits = new HashSet<>();
+
+        Metadata clusterMetadata = cluster.getMetadata();
+        String partitionKey = getPartitionKey(clusterMetadata);
+        String finalQuery = generateRangeQuery(query, partitionKey);
+        PreparedStatement preparedStatement = session.prepare(finalQuery);
+
+        // Set wakeup to false to start consuming
+        wakeup.compareAndSet(true, false);
+        for (CassandraSplit cassandraSplit : unprocessedSplits) {
+            // allow to interrupt the reading of splits especially the 
blocking session.execute()
+            // call as requested in the API
+            if (wakeup.get()) {
+                break;
+            }
+            try {
+                Token startToken =
+                        
clusterMetadata.newToken(cassandraSplit.getRingRangeStart().toString());
+                Token endToken =
+                        
clusterMetadata.newToken(cassandraSplit.getRingRangeEnd().toString());
+                final ResultSet resultSet =
+                        session.execute(
+                                preparedStatement
+                                        .bind()
+                                        .setToken(0, startToken)
+                                        .setToken(1, endToken));
+                // add all the records of the split to the output (in memory).
+                // It is safe because each split has a configurable maximum 
memory size
+                addRecordsToOutput(resultSet, cassandraSplit, recordsBySplit);
+                // add the already read (or even empty) split to finished 
splits
+                finishedSplits.add(cassandraSplit.splitId());
+                // for reentrant calls: if fetch is restarted,
+                // do not reprocess the already processed splits
+                unprocessedSplits.remove(cassandraSplit);
+            } catch (Exception ex) {
+                LOG.error("Error while reading split ", ex);
+            }
+        }
+        return new RecordsBySplits<>(recordsBySplit, finishedSplits);
+    }
+
+    private String getPartitionKey(Metadata clusterMetadata) {
+        return 
clusterMetadata.getKeyspace(keyspace).getTable(table).getPartitionKey().stream()
+                .map(ColumnMetadata::getName)
+                .collect(Collectors.joining(","));
+    }
+
+    @Override
+    public void wakeUp() {
+        wakeup.compareAndSet(false, true);
+    }
+
+    @Override
+    public void handleSplitsChanges(SplitsChange<CassandraSplit> 
splitsChanges) {
+        unprocessedSplits.addAll(splitsChanges.splits());
+    }
+
+    /**
+     * Utility method to add the ring token filtering clauses to the user 
query to generate the
+     * split query. For example:
+     *
+     * <ul>
+     *   <li><code>"select * from
+     *       keyspace.table where field1=value1;"</code> will be transformed 
into <code>
+     *       "select * from
+     *       keyspace.table where (token(partitionKey) >= ?) AND 
(token(partitionKey) < ?) AND
+     *       field1=value1;"</code>
+     *   <li><code>"select * from
+     *       keyspace.table;"</code> will be transformed into <code>
+     *       "select * from keyspace.table WHERE
+     *       (token(%s) >= ?) AND (token(%s) < ?);"</code>
+     * </ul>
+     *
+     * @param query the user input query
+     * @param partitionKey Cassandra partition key of the user provided table
+     * @return the final split query that will be sent to the Cassandra cluster
+     */
+    @VisibleForTesting
+    static String generateRangeQuery(String query, String partitionKey) {
+        Matcher queryMatcher = CassandraSource.SELECT_REGEXP.matcher(query);
+        if (!queryMatcher.matches()) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Failed to generate range query out of the 
provided query: %s", query));
+        }
+        final int whereIndex = query.toLowerCase().indexOf("where");
+        int insertionPoint;
+        String filter;
+        if (whereIndex != -1) {
+            insertionPoint = whereIndex + "where".length();
+            filter =
+                    String.format(
+                            " (token(%s) >= ?) AND (token(%s) < ?) AND",
+                            partitionKey, partitionKey);
+        } else {
+            // end of keyspace.table
+            insertionPoint = queryMatcher.end(2);
+            filter =
+                    String.format(
+                            " WHERE (token(%s) >= ?) AND (token(%s) < ?)",
+                            partitionKey, partitionKey);
+        }
+        return String.format(
+                "%s%s%s",
+                query.substring(0, insertionPoint), filter, 
query.substring(insertionPoint));
+    }
+
+    /**
+     * This method populates the {@code Map<String, Collection<CassandraRow>> 
recordsBySplit} map
+     * that is used to create the {@link RecordsBySplits} that are output by 
the fetch method. It
+     * modifies its {@code output} parameter.
+     */
+    private void addRecordsToOutput(
+            ResultSet resultSet,
+            CassandraSplit cassandraSplit,
+            Map<String, Collection<CassandraRow>> output) {
+        resultSet.forEach(
+                row ->
+                        output.computeIfAbsent(cassandraSplit.splitId(), id -> 
new ArrayList<>())
+                                .add(new CassandraRow(row, 
resultSet.getExecutionInfo())));
+    }
+
+    @Override
+    public void close() throws Exception {
+        // nothing to do as the cluster/session is managed by the SourceReader
+    }
+}
diff --git 
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplit.java
 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplit.java
new file mode 100644
index 0000000..556b87d
--- /dev/null
+++ 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplit.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.split;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+
+import java.io.Serializable;
+import java.math.BigInteger;
+
+/**
+ * Immutable {@link SourceSplit} for Cassandra source. A Cassandra split is a 
slice of the Cassandra
+ * tokens ring (i.e. a ringRange).
+ */
+public class CassandraSplit implements SourceSplit, Serializable {
+
+    private final BigInteger ringRangeStart;
+    private final BigInteger ringRangeEnd;
+
+    public CassandraSplit(BigInteger ringRangeStart, BigInteger ringRangeEnd) {
+        this.ringRangeStart = ringRangeStart;
+        this.ringRangeEnd = ringRangeEnd;
+    }
+
+    public BigInteger getRingRangeStart() {
+        return ringRangeStart;
+    }
+
+    public BigInteger getRingRangeEnd() {
+        return ringRangeEnd;
+    }
+
+    @Override
+    public String splitId() {
+        return String.format("(%s,%s)", ringRangeStart.toString(), 
ringRangeEnd.toString());
+    }
+
+    @Override
+    public String toString() {
+        return splitId();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        CassandraSplit other = (CassandraSplit) o;
+        return ringRangeStart.equals(other.ringRangeStart)
+                && ringRangeEnd.equals(other.ringRangeEnd);
+    }
+
+    @Override
+    public int hashCode() {
+        return 31 * ringRangeStart.hashCode() + ringRangeEnd.hashCode();
+    }
+}
diff --git 
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java
 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java
new file mode 100644
index 0000000..74fa573
--- /dev/null
+++ 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.split;
+
+import 
org.apache.flink.connector.cassandra.source.utils.BigIntegerSerializationUtils;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+
+import java.io.IOException;
+import java.math.BigInteger;
+
+/** Serializer for {@link CassandraSplit}. */
+public class CassandraSplitSerializer implements 
SimpleVersionedSerializer<CassandraSplit> {
+
+    public static final CassandraSplitSerializer INSTANCE = new 
CassandraSplitSerializer();
+    private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
+            ThreadLocal.withInitial(() -> new DataOutputSerializer(64));
+
+    public static final int CURRENT_VERSION = 0;
+
+    private CassandraSplitSerializer() {}
+
+    @Override
+    public int getVersion() {
+        return CURRENT_VERSION;
+    }
+
+    @Override
+    public byte[] serialize(CassandraSplit cassandraSplit) throws IOException {
+        final DataOutputSerializer out = SERIALIZER_CACHE.get();
+        BigIntegerSerializationUtils.write(cassandraSplit.getRingRangeStart(), 
out);
+        BigIntegerSerializationUtils.write(cassandraSplit.getRingRangeEnd(), 
out);
+        final byte[] result = out.getCopyOfBuffer();
+        out.clear();
+        return result;
+    }
+
+    @Override
+    public CassandraSplit deserialize(int version, byte[] serialized) throws 
IOException {
+        final DataInputDeserializer in = new DataInputDeserializer(serialized);
+
+        final BigInteger ringRangeStart = 
BigIntegerSerializationUtils.read(in);
+        final BigInteger ringRangeEnd = BigIntegerSerializationUtils.read(in);
+        return new CassandraSplit(ringRangeStart, ringRangeEnd);
+    }
+}
diff --git 
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java
 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java
new file mode 100644
index 0000000..e44be34
--- /dev/null
+++ 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.split;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import 
org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorState;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class prepares the generation of {@link CassandraSplit}s based on 
Cassandra cluster
+ * partitioner and cluster statistics. It estimates the total size of the 
table using Cassandra
+ * system table system.size_estimates.
+ */
+public final class SplitsGenerator {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SplitsGenerator.class);
+
+    private final CassandraPartitioner partitioner;
+    private final Session session;
+    private final String keyspace;
+    private final String table;
+    private final int parallelism;
+    private final long maxSplitMemorySize;
+
+    public SplitsGenerator(
+            CassandraPartitioner partitioner,
+            Session session,
+            String keyspace,
+            String table,
+            int parallelism,
+            long maxSplitMemorySize) {
+        this.partitioner = partitioner;
+        this.session = session;
+        this.keyspace = keyspace;
+        this.table = table;
+        this.parallelism = parallelism;
+        this.maxSplitMemorySize = maxSplitMemorySize;
+    }
+
+    /**
+     * Prepare the {@param CassandraEnumeratorState} for lazy generation of 
{@link CassandraSplit}s:
+     * calculate {@code numSplitsToGenerate} based on estimated target table 
size and provided
+     * {@code maxSplitMemorySize} and calculate {@code increment} which is the 
size of a split in
+     * tokens.
+     */
+    public CassandraEnumeratorState prepareSplits() {
+        final long numSplitsToGenerate = decideOnNumSplits();
+        final BigInteger increment =
+                (partitioner.ringSize).divide(new 
BigInteger(String.valueOf(numSplitsToGenerate)));
+        final BigInteger startToken = partitioner.minToken;
+        return new CassandraEnumeratorState(
+                numSplitsToGenerate,
+                increment,
+                startToken,
+                partitioner.maxToken,
+                new ArrayDeque<>());
+    }
+
+    /**
+     * Determine {@code numSplits} based on the estimation of the target table 
size and configured
+     * {@code maxSplitMemorySize}. Provide fallbacks when table size is 
unavailable, too few splits
+     * are calculated.
+     */
+    private long decideOnNumSplits() {
+        long numSplits;
+        final long estimateTableSize = estimateTableSize();
+        if (estimateTableSize == 0) { // size estimates unavailable
+            LOG.info(
+                    "Cassandra size estimates are not available for {}.{} 
table. Creating as many splits as parallelism ({})",
+                    keyspace,
+                    table,
+                    parallelism);
+            numSplits = parallelism;
+        } else { // create estimateTableSize / maxSplitMemorySize splits. 
Otherwise, create
+            // parallelism splits if that makes too few splits.
+            LOG.debug(
+                    "Estimated size for {}.{} table is {} bytes",
+                    keyspace,
+                    table,
+                    estimateTableSize);
+            numSplits =
+                    estimateTableSize / maxSplitMemorySize == 0
+                            ? parallelism
+                            : estimateTableSize / maxSplitMemorySize;
+            LOG.info(
+                    "maxSplitMemorySize set value ({}) leads to the creation 
of {} splits",
+                    maxSplitMemorySize,
+                    numSplits);
+        }
+        return numSplits;
+    }
+
+    /**
+     * Estimates the size of the table in bytes. Cassandra size estimates can 
be 0 if the data was
+     * just inserted and the amount of data in the table was small. This is 
very common situation
+     * during tests.
+     */
+    @VisibleForTesting
+    public long estimateTableSize() {
+        List<TokenRange> tokenRanges = getTokenRangesOfTable();
+        long size = 0L;
+        for (TokenRange tokenRange : tokenRanges) {
+            size += tokenRange.meanPartitionSize * tokenRange.partitionCount;
+        }
+        final float ringFraction = getRingFraction(tokenRanges);
+        // ringFraction can be null if the size estimates are not available
+        return ringFraction != 0 ? Math.round(size / ringFraction) : 0L;
+    }
+
+    /**
+     * The values that we get from system.size_estimates are for one node. We 
need to extrapolate to
+     * the whole cluster. This method estimates the percentage, the node 
represents in the cluster.
+     *
+     * @param tokenRanges The list of {@link TokenRange} to estimate
+     * @return The percentage the node represent in the whole cluster
+     */
+    private float getRingFraction(List<TokenRange> tokenRanges) {
+        BigInteger addressedTokens = BigInteger.ZERO;
+        for (TokenRange tokenRange : tokenRanges) {
+            addressedTokens =
+                    addressedTokens.add(distance(tokenRange.rangeStart, 
tokenRange.rangeEnd));
+        }
+        // it is < 1 because it is a percentage
+        return addressedTokens.divide(partitioner.ringSize).floatValue();
+    }
+
+    /** Gets the list of token ranges that the table occupies on a given 
Cassandra node. */
+    private List<TokenRange> getTokenRangesOfTable() {
+        ResultSet resultSet =
+                session.execute(
+                        "SELECT range_start, range_end, partitions_count, 
mean_partition_size FROM "
+                                + "system.size_estimates WHERE keyspace_name = 
? AND table_name = ?",
+                        keyspace,
+                        table);
+
+        ArrayList<TokenRange> tokenRanges = new ArrayList<>();
+        for (Row row : resultSet) {
+            TokenRange tokenRange =
+                    new TokenRange(
+                            row.getLong("partitions_count"),
+                            row.getLong("mean_partition_size"),
+                            row.getString("range_start"),
+                            row.getString("range_end"));
+            tokenRanges.add(tokenRange);
+        }
+        // The table may not contain the estimates yet
+        // or have partitions_count and mean_partition_size fields = 0
+        // if the data was just inserted and the amount of data in the table 
was small.
+        // This is very common situation during tests,
+        // when we insert a few rows and immediately query them.
+        // However, for tiny data sets the lack of size estimates is not a 
problem at all,
+        // because we don't want to split tiny data anyways.
+        // Therefore, we're not issuing a warning if the result set was empty
+        // or mean_partition_size and partitions_count = 0.
+        return tokenRanges;
+    }
+
+    /**
+     * Measure distance between two tokens.
+     *
+     * @param token1 The measure is symmetrical so token1 and token2 can be 
exchanged
+     * @param token2 The measure is symmetrical so token1 and token2 can be 
exchanged
+     * @return Number of tokens that separate token1 and token2
+     */
+    private BigInteger distance(BigInteger token1, BigInteger token2) {
+        // token2 > token1
+        if (token2.compareTo(token1) > 0) {
+            return token2.subtract(token1);
+        } else {
+            return token2.subtract(token1).add(partitioner.ringSize);
+        }
+    }
+
+    /** enum to configure the SplitGenerator based on Apache Cassandra 
partitioners. */
+    public enum CassandraPartitioner {
+        MURMUR3PARTITIONER(
+                "Murmur3Partitioner",
+                BigInteger.valueOf(2).pow(63).negate(),
+                BigInteger.valueOf(2).pow(63).subtract(BigInteger.ONE)),
+        RANDOMPARTITIONER(
+                "RandomPartitioner",
+                BigInteger.ZERO,
+                BigInteger.valueOf(2).pow(127).subtract(BigInteger.ONE));
+
+        private final BigInteger minToken;
+        private final BigInteger maxToken;
+        private final BigInteger ringSize;
+        private final String className;
+
+        CassandraPartitioner(String className, BigInteger minToken, BigInteger 
maxToken) {
+            this.className = className;
+            this.minToken = minToken;
+            this.maxToken = maxToken;
+            this.ringSize = maxToken.subtract(minToken).add(BigInteger.ONE);
+        }
+
+        public String getClassName() {
+            return className;
+        }
+    }
+
+    private static class TokenRange {
+        private final long partitionCount;
+        private final long meanPartitionSize;
+        private final BigInteger rangeStart;
+        private final BigInteger rangeEnd;
+
+        private TokenRange(
+                long partitionCount, long meanPartitionSize, String 
rangeStart, String rangeEnd) {
+            this.partitionCount = partitionCount;
+            this.meanPartitionSize = meanPartitionSize;
+            this.rangeStart = new BigInteger(rangeStart);
+            this.rangeEnd = new BigInteger(rangeEnd);
+        }
+    }
+}
diff --git 
a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/utils/BigIntegerSerializationUtils.java
 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/utils/BigIntegerSerializationUtils.java
new file mode 100644
index 0000000..74f25d0
--- /dev/null
+++ 
b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/utils/BigIntegerSerializationUtils.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.utils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.math.BigInteger;
+
+/** Utils for BigInteger reading and writing in serde context. */
+public class BigIntegerSerializationUtils {
+    public static void write(BigInteger bigInteger, DataOutput output) throws 
IOException {
+        final byte[] bigIntegerBytes = bigInteger.toByteArray();
+        output.writeInt(bigIntegerBytes.length);
+        output.write(bigIntegerBytes);
+    }
+
+    public static BigInteger read(DataInput input) throws IOException {
+        final int bigIntegerSize = input.readInt();
+        final byte[] bigIntegerBytes = new byte[bigIntegerSize];
+        input.readFully(bigIntegerBytes);
+        return new BigInteger(bigIntegerBytes);
+    }
+}
diff --git 
a/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
 
b/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
index 764c001..441cc09 100644
--- 
a/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
+++ 
b/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat;
 import org.apache.flink.batch.connectors.cassandra.CassandraPojoOutputFormat;
+import org.apache.flink.connectors.cassandra.utils.Pojo;
 import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
 
 import com.datastax.driver.core.Cluster;
diff --git 
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java
 
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java
new file mode 100644
index 0000000..83ecaae
--- /dev/null
+++ 
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import 
org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorState;
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+import org.apache.flink.connector.cassandra.source.split.SplitsGenerator;
+import org.apache.flink.connector.testframe.environment.ClusterControllable;
+import 
org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import 
org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
+import 
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
+import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
+import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions;
+import org.apache.flink.connectors.cassandra.utils.Pojo;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.TestTemplate;
+
+import java.util.List;
+
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static 
org.apache.flink.connector.cassandra.source.CassandraTestContext.CassandraTestContextFactory;
+import static 
org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.MURMUR3PARTITIONER;
+import static 
org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.RANDOMPARTITIONER;
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for the Cassandra source. */
+class CassandraSourceITCase extends SourceTestSuiteBase<Pojo> {
+
+    @TestEnv MiniClusterTestEnvironment flinkTestEnvironment = new 
MiniClusterTestEnvironment();
+
+    @TestExternalSystem
+    CassandraTestEnvironment cassandraTestEnvironment = new 
CassandraTestEnvironment(true);
+
+    @TestSemantics
+    CheckpointingMode[] semantics = new CheckpointingMode[] 
{CheckpointingMode.EXACTLY_ONCE};
+
+    @TestContext
+    CassandraTestContextFactory contextFactory =
+            new CassandraTestContextFactory(cassandraTestEnvironment);
+
+    @TestTemplate
+    @DisplayName("Test basic splitting with MURMUR3PARTITIONER (default 
Cassandra partitioner)")
+    public void testGenerateSplitsMurMur3Partitioner(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic) {
+        final int parallelism = 2;
+        SplitsGenerator generator =
+                new SplitsGenerator(
+                        MURMUR3PARTITIONER,
+                        cassandraTestEnvironment.getSession(),
+                        CassandraTestEnvironment.KEYSPACE,
+                        CassandraTestEnvironment.SPLITS_TABLE,
+                        parallelism,
+                        CassandraSource.MAX_SPLIT_MEMORY_SIZE_DEFAULT);
+        final CassandraEnumeratorState state = generator.prepareSplits();
+
+        // no maxSplitMemorySize specified falling back number of splits = 
parallelism
+        assertThat(state.getNumSplitsLeftToGenerate()).isEqualTo(parallelism);
+
+        final CassandraSplit split1 = state.getNextSplit();
+        checkNotNull(split1, "No splits left to generate in 
CassandraEnumeratorState");
+        assertThat(split1.splitId()).isEqualTo("(-9223372036854775808,0)");
+
+        final CassandraSplit split2 = state.getNextSplit();
+        checkNotNull(split2, "No splits left to generate in 
CassandraEnumeratorState");
+        assertThat(split2.splitId()).isEqualTo("(0,9223372036854775807)");
+    }
+
+    @TestTemplate
+    @DisplayName("Test basic splitting with RANDOMPARTITIONER")
+    public void testGenerateSplitsRandomPartitioner(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic) {
+        final int parallelism = 2;
+        final SplitsGenerator generator =
+                new SplitsGenerator(
+                        RANDOMPARTITIONER,
+                        cassandraTestEnvironment.getSession(),
+                        CassandraTestEnvironment.KEYSPACE,
+                        CassandraTestEnvironment.SPLITS_TABLE,
+                        parallelism,
+                        CassandraSource.MAX_SPLIT_MEMORY_SIZE_DEFAULT);
+        final CassandraEnumeratorState state = generator.prepareSplits();
+
+        // no maxSplitMemorySize specified falling back number of splits = 
parallelism
+        assertThat(state.getNumSplitsLeftToGenerate()).isEqualTo(parallelism);
+
+        final CassandraSplit split1 = state.getNextSplit();
+        checkNotNull(split1, "No splits left to generate in 
CassandraEnumeratorState");
+        
assertThat(split1.splitId()).isEqualTo("(0,85070591730234615865843651857942052864)");
+
+        final CassandraSplit split2 = state.getNextSplit();
+        checkNotNull(split2, "No splits left to generate in 
CassandraEnumeratorState");
+        assertThat(split2.splitId())
+                .isEqualTo(
+                        
"(85070591730234615865843651857942052864,170141183460469231731687303715884105727)");
+    }
+
+    @TestTemplate
+    @DisplayName("Test splitting with a correct split size set")
+    public void testGenerateSplitsWithCorrectSize(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        final int parallelism = 2;
+        final long maxSplitMemorySize = 10000L;
+        final SplitsGenerator generator =
+                new SplitsGenerator(
+                        MURMUR3PARTITIONER,
+                        cassandraTestEnvironment.getSession(),
+                        CassandraTestEnvironment.KEYSPACE,
+                        CassandraTestEnvironment.SPLITS_TABLE,
+                        parallelism,
+                        maxSplitMemorySize);
+        final long tableSize = generator.estimateTableSize();
+        // sanity check to ensure that the size estimates were updated in the 
Cassandra cluster
+        assertThat(tableSize).isEqualTo(35840L);
+        final CassandraEnumeratorState cassandraEnumeratorState = 
generator.prepareSplits();
+        assertThat(cassandraEnumeratorState.getNumSplitsLeftToGenerate())
+                // regular case
+                .isEqualTo(tableSize / maxSplitMemorySize);
+    }
+
+    @TestTemplate
+    @DisplayName("Test splitting with a too big split size set")
+    public void testGenerateSplitsWithTooHighMaximumSplitSize(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        final int parallelism = 20;
+        final SplitsGenerator generator =
+                new SplitsGenerator(
+                        MURMUR3PARTITIONER,
+                        cassandraTestEnvironment.getSession(),
+                        CassandraTestEnvironment.KEYSPACE,
+                        CassandraTestEnvironment.SPLITS_TABLE,
+                        parallelism,
+                        100_000_000L);
+        // sanity check to ensure that the size estimates were updated in the 
Cassandra cluster
+        assertThat(generator.estimateTableSize()).isEqualTo(35840L);
+        final CassandraEnumeratorState cassandraEnumeratorState = 
generator.prepareSplits();
+        // maxSplitMemorySize is too high compared to table size. Falling back 
to parallelism splits
+        // too low maxSplitMemorySize is guarded by an assertion > min at 
source creation
+        
assertThat(cassandraEnumeratorState.getNumSplitsLeftToGenerate()).isEqualTo(parallelism);
+    }
+
+    // overridden to use unordered checks
+    @Override
+    protected void checkResultWithSemantic(
+            CloseableIterator<Pojo> resultIterator,
+            List<List<Pojo>> testData,
+            CheckpointingMode semantic,
+            Integer limit) {
+        if (limit != null) {
+            Runnable runnable =
+                    () ->
+                            
CollectIteratorAssertions.assertUnordered(resultIterator)
+                                    .withNumRecordsLimit(limit)
+                                    .matchesRecordsFromSource(testData, 
semantic);
+
+            
assertThat(runAsync(runnable)).succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT);
+        } else {
+            CollectIteratorAssertions.assertUnordered(resultIterator)
+                    .matchesRecordsFromSource(testData, semantic);
+        }
+    }
+
+    @Disabled("Not a unbounded source")
+    @Override
+    public void testSourceMetrics(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {}
+
+    @Disabled("Not a unbounded source")
+    @Override
+    public void testSavepoint(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic) {}
+
+    @Disabled("Not a unbounded source")
+    @Override
+    public void testScaleUp(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic) {}
+
+    @Disabled("Not a unbounded source")
+    @Override
+    public void testScaleDown(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            CheckpointingMode semantic) {}
+
+    @Disabled("Not a unbounded source")
+    @Override
+    public void testTaskManagerFailure(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<Pojo> externalContext,
+            ClusterControllable controller,
+            CheckpointingMode semantic) {}
+}
diff --git 
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestContext.java
 
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestContext.java
new file mode 100644
index 0000000..fb69f2b
--- /dev/null
+++ 
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestContext.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.connector.testframe.external.ExternalContextFactory;
+import 
org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
+import 
org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
+import 
org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
+import org.apache.flink.connectors.cassandra.utils.Pojo;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
+
+import com.datastax.driver.core.Session;
+import com.datastax.driver.mapping.Mapper;
+import com.datastax.driver.mapping.MappingManager;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Junit {@link DataStreamSourceExternalContext} that contains everything 
related to Cassandra
+ * source test cases especially test table management.
+ */
+public class CassandraTestContext implements 
DataStreamSourceExternalContext<Pojo> {
+
+    static final String TABLE_NAME = "batches";
+
+    private static final String CREATE_TABLE_QUERY =
+            "CREATE TABLE "
+                    + CassandraTestEnvironment.KEYSPACE
+                    + "."
+                    + TABLE_NAME
+                    + " (id text PRIMARY KEY, counter int, batch_id int)"
+                    + ";";
+
+    private static final String DROP_TABLE_QUERY =
+            "DROP TABLE " + CassandraTestEnvironment.KEYSPACE + "." + 
TABLE_NAME + ";";
+
+    private static final int RECORDS_PER_SPLIT = 20;
+
+    private final Mapper<Pojo> mapper;
+    private final MapperOptions mapperOptions;
+    private final ClusterBuilder clusterBuilder;
+    private final Session session;
+    private ExternalSystemSplitDataWriter<Pojo> splitDataWriter;
+
+    public CassandraTestContext(CassandraTestEnvironment 
cassandraTestEnvironment) {
+        clusterBuilder = cassandraTestEnvironment.getClusterBuilder();
+        session = cassandraTestEnvironment.getSession();
+        createTable();
+        mapper = new 
MappingManager(cassandraTestEnvironment.getSession()).mapper(Pojo.class);
+        mapperOptions = () -> new Mapper.Option[] 
{Mapper.Option.saveNullFields(true)};
+    }
+
+    @Override
+    public TypeInformation<Pojo> getProducedType() {
+        return TypeInformation.of(Pojo.class);
+    }
+
+    @Override
+    public List<URL> getConnectorJarPaths() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public Source<Pojo, ?, ?> createSource(TestingSourceSettings 
sourceSettings)
+            throws UnsupportedOperationException {
+
+        return new CassandraSource<>(
+                clusterBuilder,
+                Pojo.class,
+                String.format(
+                        "SELECT * FROM %s.%s;", 
CassandraTestEnvironment.KEYSPACE, TABLE_NAME),
+                mapperOptions);
+    }
+
+    @Override
+    public ExternalSystemSplitDataWriter<Pojo> createSourceSplitDataWriter(
+            TestingSourceSettings sourceSettings) {
+        splitDataWriter =
+                new ExternalSystemSplitDataWriter<Pojo>() {
+
+                    @Override
+                    public void writeRecords(List<Pojo> records) {
+                        for (Pojo pojo : records) {
+                            mapper.save(pojo, 
mapperOptions.getMapperOptions());
+                        }
+                    }
+
+                    @Override
+                    public void close() {
+                        // nothing to do, cluster/session is shared at the 
CassandraTestEnvironment
+                        // level
+                    }
+                };
+        return splitDataWriter;
+    }
+
+    @Override
+    public List<Pojo> generateTestData(
+            TestingSourceSettings sourceSettings, int splitIndex, long seed) {
+        List<Pojo> testData = new ArrayList<>(RECORDS_PER_SPLIT);
+        // generate RECORDS_PER_SPLIT pojos per split and use splitId as pojo 
batchIndex so that
+        // pojos are considered equal when they belong to the same split
+        // as requested in implementation notes.
+        for (int i = 0; i < RECORDS_PER_SPLIT; i++) {
+            Pojo pojo = new Pojo(String.valueOf(seed + i), i, splitIndex);
+            testData.add(pojo);
+        }
+        return testData;
+    }
+
+    @Override
+    public void close() throws Exception {
+        dropTable();
+        // NB: cluster/session is shared at the CassandraTestEnvironment level
+    }
+
+    private void createTable() {
+        
session.execute(CassandraTestEnvironment.requestWithTimeout(CREATE_TABLE_QUERY));
+    }
+
+    private void dropTable() {
+        
session.execute(CassandraTestEnvironment.requestWithTimeout(DROP_TABLE_QUERY));
+    }
+
+    static class CassandraTestContextFactory
+            implements ExternalContextFactory<CassandraTestContext> {
+
+        private final CassandraTestEnvironment cassandraTestEnvironment;
+
+        public CassandraTestContextFactory(CassandraTestEnvironment 
cassandraTestEnvironment) {
+            this.cassandraTestEnvironment = cassandraTestEnvironment;
+        }
+
+        @Override
+        public CassandraTestContext createExternalContext(String testName) {
+            return new CassandraTestContext(cassandraTestEnvironment);
+        }
+    }
+}
diff --git 
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java
 
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java
new file mode 100644
index 0000000..24b9e60
--- /dev/null
+++ 
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source;
+
+import org.apache.flink.connector.testframe.TestResource;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.SocketOptions;
+import com.datastax.driver.core.Statement;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.CassandraContainer;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.OutputFrame;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.CassandraQueryWaitStrategy;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Junit test environment that contains everything needed at the test suite 
level: testContainer
+ * setup, keyspace setup, Cassandra cluster/session management ClusterBuilder 
setup).
+ */
+@Testcontainers
+public class CassandraTestEnvironment implements TestResource {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CassandraTestEnvironment.class);
+    private static final String DOCKER_CASSANDRA_IMAGE = "cassandra:4.0.8";
+    private static final int CQL_PORT = 9042;
+
+    private static final int READ_TIMEOUT_MILLIS = 36000;
+
+    // flushing mem table to SS tables is an asynchronous operation that may 
take a while
+    private static final long FLUSH_MEMTABLES_DELAY = 30_000L;
+
+    static final String KEYSPACE = "flink";
+
+    private static final String CREATE_KEYSPACE_QUERY =
+            "CREATE KEYSPACE "
+                    + KEYSPACE
+                    + " WITH replication= {'class':'SimpleStrategy', 
'replication_factor':1};";
+
+    static final String SPLITS_TABLE = "flinksplits";
+    private static final String CREATE_SPLITS_TABLE_QUERY =
+            "CREATE TABLE " + KEYSPACE + "." + SPLITS_TABLE + " (id int 
PRIMARY KEY, counter int);";
+    private static final String INSERT_INTO_FLINK_SPLITS =
+            "INSERT INTO " + KEYSPACE + "." + SPLITS_TABLE + " (id, counter)" 
+ " VALUES (%d, %d)";
+    private static final int NB_SPLITS_RECORDS = 1000;
+
+    @Container private final CassandraContainer cassandraContainer;
+
+    boolean insertTestDataForSplitSizeTests;
+    private Cluster cluster;
+    private Session session;
+    private ClusterBuilder clusterBuilder;
+
+    public CassandraTestEnvironment(boolean insertTestDataForSplitSizeTests) {
+        this.insertTestDataForSplitSizeTests = insertTestDataForSplitSizeTests;
+        cassandraContainer = new CassandraContainer(DOCKER_CASSANDRA_IMAGE);
+        // more generous timeouts
+        addJavaOpts(
+                cassandraContainer,
+                "-Dcassandra.request_timeout_in_ms=30000",
+                "-Dcassandra.read_request_timeout_in_ms=15000",
+                "-Dcassandra.write_request_timeout_in_ms=6000");
+    }
+
+    @Override
+    public void startUp() throws Exception {
+        startEnv();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        stopEnv();
+    }
+
+    private static void addJavaOpts(GenericContainer<?> container, String... 
opts) {
+        String jvmOpts = container.getEnvMap().getOrDefault("JVM_OPTS", "");
+        container.withEnv("JVM_OPTS", jvmOpts + " " + StringUtils.join(opts, " 
"));
+    }
+
+    private void startEnv() throws Exception {
+        // configure container start to wait until cassandra is ready to 
receive queries
+        cassandraContainer.waitingFor(new CassandraQueryWaitStrategy());
+        // start with retrials
+        cassandraContainer.start();
+        cassandraContainer.followOutput(
+                new Slf4jLogConsumer(LOG),
+                OutputFrame.OutputType.END,
+                OutputFrame.OutputType.STDERR,
+                OutputFrame.OutputType.STDOUT);
+
+        cluster = cassandraContainer.getCluster();
+        clusterBuilder =
+                createBuilderWithConsistencyLevel(
+                        ConsistencyLevel.ONE,
+                        cassandraContainer.getHost(),
+                        cassandraContainer.getMappedPort(CQL_PORT));
+
+        session = cluster.connect();
+        session.execute(requestWithTimeout(CREATE_KEYSPACE_QUERY));
+        // create a dedicated table for split size tests (to avoid having to 
flush with each test)
+        if (insertTestDataForSplitSizeTests) {
+            insertTestDataForSplitSizeTests();
+        }
+    }
+
+    private void insertTestDataForSplitSizeTests() throws Exception {
+        session.execute(requestWithTimeout(CREATE_SPLITS_TABLE_QUERY));
+        for (int i = 0; i < NB_SPLITS_RECORDS; i++) {
+            
session.execute(requestWithTimeout(String.format(INSERT_INTO_FLINK_SPLITS, i, 
i)));
+        }
+        flushMemTables(SPLITS_TABLE);
+    }
+
+    private void stopEnv() {
+
+        if (session != null) {
+            session.close();
+        }
+        if (cluster != null) {
+            cluster.close();
+        }
+        cassandraContainer.stop();
+    }
+
+    private ClusterBuilder createBuilderWithConsistencyLevel(
+            ConsistencyLevel consistencyLevel, String host, int port) {
+        return new ClusterBuilder() {
+            @Override
+            protected Cluster buildCluster(Cluster.Builder builder) {
+                return builder.addContactPointsWithPorts(new 
InetSocketAddress(host, port))
+                        .withQueryOptions(
+                                new QueryOptions()
+                                        .setConsistencyLevel(consistencyLevel)
+                                        
.setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL))
+                        .withSocketOptions(
+                                new SocketOptions()
+                                        // default timeout x 3
+                                        .setConnectTimeoutMillis(15000)
+                                        // default timeout x3 and higher than
+                                        // request_timeout_in_ms at the 
cluster level
+                                        
.setReadTimeoutMillis(READ_TIMEOUT_MILLIS))
+                        .withoutJMXReporting()
+                        .withoutMetrics()
+                        .build();
+            }
+        };
+    }
+
+    /**
+     * Force the flush of cassandra memTables to SSTables in order to update 
size_estimates. It is
+     * needed for the tests because we just inserted records, we need to force 
cassandra to update
+     * size_estimates system table.
+     */
+    void flushMemTables(String table) throws Exception {
+        cassandraContainer.execInContainer("nodetool", "flush", KEYSPACE, 
table);
+        Thread.sleep(FLUSH_MEMTABLES_DELAY);
+    }
+
+    static Statement requestWithTimeout(String query) {
+        return new 
SimpleStatement(query).setReadTimeoutMillis(READ_TIMEOUT_MILLIS);
+    }
+
+    public ClusterBuilder getClusterBuilder() {
+        return clusterBuilder;
+    }
+
+    public Session getSession() {
+        return session;
+    }
+}
diff --git 
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializerTest.java
 
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializerTest.java
new file mode 100644
index 0000000..0a4a3ec
--- /dev/null
+++ 
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializerTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.enumerator;
+
+import org.apache.flink.connector.cassandra.source.split.CassandraSplit;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigInteger;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link CassandraEnumeratorStateSerializer}. */
+class CassandraEnumeratorStateSerializerTest {
+
+    @Test
+    public void testSerdeRoundtrip() throws Exception {
+        final Queue<CassandraSplit> splitsToReassign =
+                new ArrayDeque<>(
+                        ImmutableList.of(
+                                new CassandraSplit(BigInteger.ZERO, 
BigInteger.TEN),
+                                new CassandraSplit(BigInteger.TEN, 
BigInteger.ZERO)));
+
+        final CassandraEnumeratorState cassandraEnumeratorState =
+                new CassandraEnumeratorState(
+                        10, BigInteger.ONE, BigInteger.ZERO, BigInteger.TEN, 
splitsToReassign);
+
+        final byte[] serialized =
+                
CassandraEnumeratorStateSerializer.INSTANCE.serialize(cassandraEnumeratorState);
+        final CassandraEnumeratorState deserialized =
+                CassandraEnumeratorStateSerializer.INSTANCE.deserialize(
+                        CassandraEnumeratorStateSerializer.CURRENT_VERSION, 
serialized);
+        assertThat(deserialized)
+                .isEqualTo(cassandraEnumeratorState)
+                .withFailMessage(
+                        "CassandraEnumeratorState is not the same as input 
object after serde roundtrip");
+    }
+}
diff --git 
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/reader/CassandraQueryTest.java
 
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/reader/CassandraQueryTest.java
new file mode 100644
index 0000000..649eeca
--- /dev/null
+++ 
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/reader/CassandraQueryTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.reader;
+
+import org.apache.flink.connector.cassandra.source.CassandraSource;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.regex.Matcher;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** tests for query generation and query sanity checks. */
+class CassandraQueryTest {
+
+    @Test
+    public void testKeySpaceTableExtractionRegexp() {
+        Arrays.asList(
+                        "select field FROM keyspace.table where field = 
value;",
+                        "select * FROM keyspace.table;",
+                        "select field1, field2 from keyspace.table;",
+                        "select field1, field2 from keyspace.table 
LIMIT(1000);",
+                        "select field1 from keyspace.table ;",
+                        "select field1 from keyspace.table where field1=1;")
+                .forEach(CassandraQueryTest::assertQueryFormatCorrect);
+
+        Arrays.asList(
+                        "select field1 from table;", // missing keyspace
+                        "select field1 from .table", // undefined keyspace var 
in a script
+                        "select field1 from keyspace.;", // undefined table 
var in a script
+                        "select field1 from keyspace.table" // missing ";"
+                        )
+                .forEach(CassandraQueryTest::assertQueryFormatIncorrect);
+    }
+
+    @Test
+    public void testProhibitedClauses() {
+        Arrays.asList(
+                        "SELECT COUNT(*) from flink.table;",
+                        "SELECT AVG(*) from flink.table;",
+                        "SELECT MIN(*) from flink.table;",
+                        "SELECT MAX(*) from flink.table;",
+                        "SELECT SUM(*) from flink.table;",
+                        "SELECT field1, field2 from flink.table ORDER BY 
field1;",
+                        "SELECT field1, field2 from flink.table GROUP BY 
field1;")
+                .forEach(CassandraQueryTest::assertProhibitedClauseRejected);
+    }
+
+    @Test
+    public void testGenerateRangeQuery() {
+        String query;
+        String outputQuery;
+
+        // query with where clause
+        query = "SELECT field FROM keyspace.table WHERE field = value;";
+        outputQuery = CassandraSplitReader.generateRangeQuery(query, "field");
+        assertThat(outputQuery)
+                .isEqualTo(
+                        "SELECT field FROM keyspace.table WHERE (token(field) 
>= ?) AND (token(field) < ?) AND field = value;");
+
+        // query without where clause
+        query = "SELECT * FROM keyspace.table;";
+        outputQuery = CassandraSplitReader.generateRangeQuery(query, "field");
+        assertThat(outputQuery)
+                .isEqualTo(
+                        "SELECT * FROM keyspace.table WHERE (token(field) >= 
?) AND (token(field) < ?);");
+
+        // query without where clause but with another trailing clause
+        query = "SELECT field FROM keyspace.table LIMIT(1000);";
+        outputQuery = CassandraSplitReader.generateRangeQuery(query, "field");
+        assertThat(outputQuery)
+                .isEqualTo(
+                        "SELECT field FROM keyspace.table WHERE (token(field) 
>= ?) AND (token(field) < ?) LIMIT(1000);");
+
+        // query with where clause and another trailing clause
+        query = "SELECT field FROM keyspace.table WHERE field = value 
LIMIT(1000);";
+        outputQuery = CassandraSplitReader.generateRangeQuery(query, "field");
+        assertThat(outputQuery)
+                .isEqualTo(
+                        "SELECT field FROM keyspace.table WHERE (token(field) 
>= ?) AND (token(field) < ?) AND field = value LIMIT(1000);");
+    }
+
+    private static void assertQueryFormatIncorrect(String query) {
+        assertThatThrownBy(() -> CassandraSource.checkQueryValidity(query))
+                .hasMessageContaining(
+                        "Query must be of the form select ... from 
keyspace.table ...;");
+    }
+
+    private static void assertQueryFormatCorrect(String query) {
+        Matcher matcher = CassandraSource.SELECT_REGEXP.matcher(query);
+        assertThat(matcher.matches()).isTrue();
+        assertThat(matcher.group(1)).isEqualTo("keyspace");
+        assertThat(matcher.group(2)).isEqualTo("table");
+    }
+
+    private static void assertProhibitedClauseRejected(String query) {
+        assertThatThrownBy(() -> CassandraSource.checkQueryValidity(query))
+                .hasMessageContaining(
+                        "Aggregations/OrderBy are not supported because the 
query is executed on subsets/partitions of the input table");
+    }
+}
diff --git 
a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializerTest.java
 
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializerTest.java
new file mode 100644
index 0000000..b79fba6
--- /dev/null
+++ 
b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializerTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.split;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.math.BigInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link CassandraSplitSerializer}. */
+class CassandraSplitSerializerTest {
+
+    @Test
+    public void testSerdeRoundtrip() throws IOException {
+        final CassandraSplit testData = new CassandraSplit(BigInteger.ONE, 
BigInteger.TEN);
+        final byte[] serialized = 
CassandraSplitSerializer.INSTANCE.serialize(testData);
+        final CassandraSplit deserialized =
+                CassandraSplitSerializer.INSTANCE.deserialize(
+                        CassandraSplitSerializer.CURRENT_VERSION, serialized);
+        assertThat(deserialized)
+                .isEqualTo(testData)
+                .withFailMessage(
+                        "CassandraSplit is not the same as input object after 
serde roundtrip");
+    }
+}
diff --git 
a/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/Pojo.java
 
b/flink-connector-cassandra/src/test/java/org/apache/flink/connectors/cassandra/utils/Pojo.java
similarity index 73%
rename from 
flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/Pojo.java
rename to 
flink-connector-cassandra/src/test/java/org/apache/flink/connectors/cassandra/utils/Pojo.java
index 559f107..a32e492 100644
--- 
a/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/Pojo.java
+++ 
b/flink-connector-cassandra/src/test/java/org/apache/flink/connectors/cassandra/utils/Pojo.java
@@ -15,12 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.flink.batch.connectors.cassandra.example;
+package org.apache.flink.connectors.cassandra.utils;
 
 import com.datastax.driver.mapping.annotations.Column;
 import com.datastax.driver.mapping.annotations.Table;
 
 import java.io.Serializable;
+import java.util.Objects;
 
 /** Test Pojo with DataStax annotations used. */
 @Table(keyspace = "flink", name = "batches")
@@ -69,4 +70,27 @@ public class Pojo implements Serializable {
     public void setBatchID(int batchId) {
         this.batchID = batchId;
     }
+
+    @Override
+    public String toString() {
+        return String.format(
+                "{\"id\":\"%s\", \"counter\":%d, \"batchID\":%d}", id, 
counter, batchID);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        Pojo pojo = (Pojo) o;
+        return counter == pojo.counter && batchID == pojo.batchID && 
id.equals(pojo.id);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(id, counter, batchID);
+    }
 }
diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml
index 8cf47a1..770d68b 100644
--- a/tools/maven/suppressions.xml
+++ b/tools/maven/suppressions.xml
@@ -25,7 +25,7 @@ under the License.
 <suppressions>
        <!-- Cassandra connectors have to use guava directly -->
        <suppress
-               
files="AbstractCassandraTupleSink.java|CassandraInputFormat.java|CassandraOutputFormatBase.java|OutputFormatBase.java|OutputFormatBaseTest.java|CassandraColumnarOutputFormatBase.java|CassandraSinkBase.java|CassandraSinkBaseTest.java|CassandraPojoSink.java|CassandraRowSink.java|CassandraTupleWriteAheadSink.java|CassandraRowWriteAheadSink.java|CassandraPojoOutputFormat.java"
+               
files="AbstractCassandraTupleSink.java|CassandraInputFormat.java|CassandraOutputFormatBase.java|OutputFormatBase.java|OutputFormatBaseTest.java|CassandraColumnarOutputFormatBase.java|CassandraSinkBase.java|CassandraSinkBaseTest.java|CassandraPojoSink.java|CassandraRowSink.java|CassandraTupleWriteAheadSink.java|CassandraRowWriteAheadSink.java|CassandraPojoOutputFormat.java|CassandraRecordEmitter"
                checks="IllegalImport"/>
 
 </suppressions>

Reply via email to