This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-cassandra.git
The following commit(s) were added to refs/heads/main by this push: new 72e3bef [FLINK-26822] Add Cassandra Source 72e3bef is described below commit 72e3bef1fb9ee6042955b5e9871a9f70a8837cca Author: Etienne Chauchot <echauc...@apache.org> AuthorDate: Wed Mar 22 09:38:17 2023 +0100 [FLINK-26822] Add Cassandra Source --- flink-connector-cassandra/pom.xml | 7 + .../cassandra/source/CassandraSource.java | 203 +++++++++++++++++ .../enumerator/CassandraEnumeratorState.java | 167 ++++++++++++++ .../CassandraEnumeratorStateSerializer.java | 106 +++++++++ .../enumerator/CassandraSplitEnumerator.java | 148 +++++++++++++ .../source/reader/CassandraRecordEmitter.java | 138 ++++++++++++ .../cassandra/source/reader/CassandraRow.java | 46 ++++ .../source/reader/CassandraSourceReader.java | 104 +++++++++ .../reader/CassandraSourceReaderFactory.java | 55 +++++ .../source/reader/CassandraSplitReader.java | 205 +++++++++++++++++ .../cassandra/source/split/CassandraSplit.java | 75 +++++++ .../source/split/CassandraSplitSerializer.java | 63 ++++++ .../cassandra/source/split/SplitsGenerator.java | 242 +++++++++++++++++++++ .../source/utils/BigIntegerSerializationUtils.java | 40 ++++ .../cassandra/example/BatchPojoExample.java | 1 + .../cassandra/source/CassandraSourceITCase.java | 234 ++++++++++++++++++++ .../cassandra/source/CassandraTestContext.java | 161 ++++++++++++++ .../cassandra/source/CassandraTestEnvironment.java | 196 +++++++++++++++++ .../CassandraEnumeratorStateSerializerTest.java | 58 +++++ .../source/reader/CassandraQueryTest.java | 119 ++++++++++ .../source/split/CassandraSplitSerializerTest.java | 43 ++++ .../cassandra/utils}/Pojo.java | 26 ++- tools/maven/suppressions.xml | 2 +- 23 files changed, 2437 insertions(+), 2 deletions(-) diff --git a/flink-connector-cassandra/pom.xml b/flink-connector-cassandra/pom.xml index fa78a65..58d70b4 100644 --- a/flink-connector-cassandra/pom.xml +++ b/flink-connector-cassandra/pom.xml @@ -78,6 +78,13 @@ under the License. <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-base</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> diff --git a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java new file mode 100644 index 0000000..dd45913 --- /dev/null +++ b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorState; +import org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorStateSerializer; +import org.apache.flink.connector.cassandra.source.enumerator.CassandraSplitEnumerator; +import org.apache.flink.connector.cassandra.source.reader.CassandraSourceReaderFactory; +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.CassandraSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.apache.flink.streaming.connectors.cassandra.MapperOptions; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A bounded source to read from Cassandra and return a collection of entities as {@code + * DataStream<Entity>}. An entity is built by Cassandra mapper ({@code + * com.datastax.driver.mapping.EntityMapper}) based on a POJO containing annotations (as described + * in <a + * href="https://docs.datastax.com/en/developer/java-driver/3.11/manual/object_mapper/creating/"> + * Cassandra object mapper</a>). + * + * <p>To use it, do the following: + * + * <pre>{@code + * ClusterBuilder clusterBuilder = new ClusterBuilder() { + * @Override + * protected Cluster buildCluster(Cluster.Builder builder) { + * return builder.addContactPointsWithPorts(new InetSocketAddress(HOST,PORT)) + * .withQueryOptions(new QueryOptions().setConsistencyLevel(CL)) + * .withSocketOptions(new SocketOptions() + * .setConnectTimeoutMillis(CONNECT_TIMEOUT) + * .setReadTimeoutMillis(READ_TIMEOUT)) + * .build(); + * } + * }; + * long maxSplitMemorySize = ... //optional max split size in bytes. If not set, maxSplitMemorySize = tableSize / parallelism + * Source cassandraSource = new CassandraSource(clusterBuilder, + * maxSplitMemorySize, + * Pojo.class, + * "select ... from KEYSPACE.TABLE ...;", + * () -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)}); + * + * DataStream<Pojo> stream = env.fromSource(cassandraSource, WatermarkStrategy.noWatermarks(), + * "CassandraSource"); + * }</pre> + */ +@PublicEvolving +public class CassandraSource<OUT> + implements Source<OUT, CassandraSplit, CassandraEnumeratorState>, ResultTypeQueryable<OUT> { + + public static final Pattern CQL_PROHIBITED_CLAUSES_REGEXP = + Pattern.compile("(?i).*(AVG|COUNT|MIN|MAX|SUM|ORDER|GROUP BY).*"); + public static final Pattern SELECT_REGEXP = + Pattern.compile("(?i)select .+ from (\\w+)\\.(\\w+).*;$"); + + private static final long serialVersionUID = 1L; + + private final ClusterBuilder clusterBuilder; + private final Class<OUT> pojoClass; + private final String query; + private final String keyspace; + private final String table; + private final MapperOptions mapperOptions; + + private final long maxSplitMemorySize; + private static final long MIN_SPLIT_MEMORY_SIZE = MemorySize.ofMebiBytes(10).getBytes(); + static final long MAX_SPLIT_MEMORY_SIZE_DEFAULT = MemorySize.ofMebiBytes(64).getBytes(); + + public CassandraSource( + ClusterBuilder clusterBuilder, + Class<OUT> pojoClass, + String query, + MapperOptions mapperOptions) { + this(clusterBuilder, MAX_SPLIT_MEMORY_SIZE_DEFAULT, pojoClass, query, mapperOptions); + } + + public CassandraSource( + ClusterBuilder clusterBuilder, + long maxSplitMemorySize, + Class<OUT> pojoClass, + String query, + MapperOptions mapperOptions) { + checkNotNull(clusterBuilder, "ClusterBuilder required but not provided"); + checkNotNull(pojoClass, "POJO class required but not provided"); + checkNotNull(query, "query required but not provided"); + checkState( + maxSplitMemorySize >= MIN_SPLIT_MEMORY_SIZE, + "Defined maxSplitMemorySize (%s) is below minimum (%s)", + maxSplitMemorySize, + MIN_SPLIT_MEMORY_SIZE); + this.maxSplitMemorySize = maxSplitMemorySize; + final Matcher queryMatcher = checkQueryValidity(query); + this.query = query; + this.keyspace = queryMatcher.group(1); + this.table = queryMatcher.group(2); + this.clusterBuilder = clusterBuilder; + ClosureCleaner.clean(clusterBuilder, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + this.pojoClass = pojoClass; + this.mapperOptions = mapperOptions; + } + + @VisibleForTesting + public static Matcher checkQueryValidity(String query) { + checkState( + !query.matches(CQL_PROHIBITED_CLAUSES_REGEXP.pattern()), + "Aggregations/OrderBy are not supported because the query is executed on subsets/partitions of the input table"); + final Matcher queryMatcher = SELECT_REGEXP.matcher(query); + checkState( + queryMatcher.matches(), + "Query must be of the form select ... from keyspace.table ...;"); + return queryMatcher; + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.BOUNDED; + } + + @Internal + @Override + public SourceReader<OUT, CassandraSplit> createReader(SourceReaderContext readerContext) { + return new CassandraSourceReaderFactory<OUT>() + .create( + readerContext, + clusterBuilder, + pojoClass, + query, + keyspace, + table, + mapperOptions); + } + + @Internal + @Override + public SplitEnumerator<CassandraSplit, CassandraEnumeratorState> createEnumerator( + SplitEnumeratorContext<CassandraSplit> enumContext) { + return new CassandraSplitEnumerator( + enumContext, null, clusterBuilder, maxSplitMemorySize, keyspace, table); + } + + @Internal + @Override + public SplitEnumerator<CassandraSplit, CassandraEnumeratorState> restoreEnumerator( + SplitEnumeratorContext<CassandraSplit> enumContext, + CassandraEnumeratorState enumCheckpoint) { + return new CassandraSplitEnumerator( + enumContext, enumCheckpoint, clusterBuilder, maxSplitMemorySize, keyspace, table); + } + + @Internal + @Override + public SimpleVersionedSerializer<CassandraSplit> getSplitSerializer() { + return CassandraSplitSerializer.INSTANCE; + } + + @Internal + @Override + public SimpleVersionedSerializer<CassandraEnumeratorState> getEnumeratorCheckpointSerializer() { + return CassandraEnumeratorStateSerializer.INSTANCE; + } + + @Override + public TypeInformation<OUT> getProducedType() { + return TypeInformation.of(pojoClass); + } +} diff --git a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorState.java b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorState.java new file mode 100644 index 0000000..c16fe13 --- /dev/null +++ b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorState.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.enumerator; + +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.SplitsGenerator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.math.BigInteger; +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Objects; +import java.util.Queue; + +/** + * State for {@link CassandraSplitEnumerator}. It stores the offset ({@code startToken}) of the last + * lazy {@link CassandraSplit} generation and the number of splits left to generate. Upon + * restoration of this sate, {@link SplitsGenerator#prepareSplits()} is obviously not re-run. So we + * need to store also the result of this initial splits preparation ({@code increment} and {@code + * maxToken}). + */ +public class CassandraEnumeratorState { + private static final Logger LOG = LoggerFactory.getLogger(CassandraEnumeratorState.class); + private long numSplitsLeftToGenerate; + private BigInteger increment; + private BigInteger startToken; + private BigInteger maxToken; + // splits that were assigned to a failed reader and that were not part of a checkpoint, so after + // restoration, they need to be reassigned. + private final Queue<CassandraSplit> splitsToReassign; + + CassandraEnumeratorState() { + this.splitsToReassign = new ArrayDeque<>(); + } + + public CassandraEnumeratorState( + long numSplitsLeftToGenerate, + BigInteger increment, + BigInteger startToken, + BigInteger maxToken, + Queue<CassandraSplit> splitsToReassign) { + this.numSplitsLeftToGenerate = numSplitsLeftToGenerate; + this.increment = increment; + this.startToken = startToken; + this.maxToken = maxToken; + this.splitsToReassign = splitsToReassign; + } + + Queue<CassandraSplit> getSplitsToReassign() { + return splitsToReassign; + } + + public long getNumSplitsLeftToGenerate() { + return numSplitsLeftToGenerate; + } + + BigInteger getIncrement() { + return increment; + } + + BigInteger getStartToken() { + return startToken; + } + + BigInteger getMaxToken() { + return maxToken; + } + + void addSplitsBack(Collection<CassandraSplit> splits) { + LOG.info( + "Add {} splits back to CassandraSplitEnumerator for reassignment after failover", + splits.size()); + splitsToReassign.addAll(splits); + } + + /** + * Provide a {@link CassandraSplit} that was assigned to a failed reader or lazily create one. + * Splits contain a range of the Cassandra ring of {@code maxSplitMemorySize}. There is no way + * to estimate the size of the data with the optional SQL filters without reading the data. So + * the split can be smaller than {@code maxSplitMemorySize} when the query is actually executed. + */ + public @Nullable CassandraSplit getNextSplit() { + // serve slits to reassign first + final CassandraSplit splitToReassign = splitsToReassign.poll(); + if (splitToReassign != null) { + return splitToReassign; + } // else no more splits to reassign, generate one + if (numSplitsLeftToGenerate == 0) { + return null; // enumerator will send the no more split message to the requesting reader + } + BigInteger endToken = + numSplitsLeftToGenerate == 1 + // last split to generate, round up to the last token of the ring + ? maxToken + : startToken.add(increment); + CassandraSplit split = new CassandraSplit(startToken, endToken); + // prepare for next call + this.startToken = endToken; + numSplitsLeftToGenerate--; + return split; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CassandraEnumeratorState that = (CassandraEnumeratorState) o; + if (this.splitsToReassign.size() != that.splitsToReassign.size()) { + return false; + } + for (CassandraSplit cassandraSplit : splitsToReassign) { + if (!that.splitsToReassign.contains(cassandraSplit)) { + return false; + } + } + return numSplitsLeftToGenerate == that.numSplitsLeftToGenerate + && increment.equals(that.increment) + && startToken.equals(that.startToken) + && maxToken.equals(that.maxToken); + } + + @Override + public int hashCode() { + return Objects.hash( + numSplitsLeftToGenerate, increment, startToken, maxToken, splitsToReassign); + } + + @Override + public String toString() { + return "CassandraEnumeratorState{" + + "numSplitsLeftToGenerate=" + + numSplitsLeftToGenerate + + ", increment=" + + increment + + ", startToken=" + + startToken + + ", maxToken=" + + maxToken + + ", splitsToReassign=" + + splitsToReassign + + '}'; + } +} diff --git a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java new file mode 100644 index 0000000..3725d47 --- /dev/null +++ b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.enumerator; + +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.CassandraSplitSerializer; +import org.apache.flink.connector.cassandra.source.utils.BigIntegerSerializationUtils; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.IOUtils; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.math.BigInteger; +import java.util.ArrayDeque; +import java.util.Queue; + +/** Serializer for {@link CassandraEnumeratorState}. */ +public class CassandraEnumeratorStateSerializer + implements SimpleVersionedSerializer<CassandraEnumeratorState> { + + public static final CassandraEnumeratorStateSerializer INSTANCE = + new CassandraEnumeratorStateSerializer(); + public static final int CURRENT_VERSION = 0; + + private CassandraEnumeratorStateSerializer() {} + + @Override + public int getVersion() { + return CURRENT_VERSION; + } + + @Override + public byte[] serialize(CassandraEnumeratorState cassandraEnumeratorState) throws IOException { + try (final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + final ObjectOutputStream objectOutputStream = + new ObjectOutputStream(byteArrayOutputStream)) { + final Queue<CassandraSplit> splitsToReassign = + cassandraEnumeratorState.getSplitsToReassign(); + objectOutputStream.writeInt(splitsToReassign.size()); + for (CassandraSplit cassandraSplit : splitsToReassign) { + final byte[] serializedSplit = + CassandraSplitSerializer.INSTANCE.serialize(cassandraSplit); + objectOutputStream.writeInt(serializedSplit.length); + objectOutputStream.write(serializedSplit); + } + + objectOutputStream.writeLong(cassandraEnumeratorState.getNumSplitsLeftToGenerate()); + BigIntegerSerializationUtils.write( + cassandraEnumeratorState.getIncrement(), objectOutputStream); + BigIntegerSerializationUtils.write( + cassandraEnumeratorState.getStartToken(), objectOutputStream); + BigIntegerSerializationUtils.write( + cassandraEnumeratorState.getMaxToken(), objectOutputStream); + + objectOutputStream.flush(); + return byteArrayOutputStream.toByteArray(); + } + } + + @Override + public CassandraEnumeratorState deserialize(int version, byte[] serialized) throws IOException { + try (final ByteArrayInputStream byteArrayInputStream = + new ByteArrayInputStream(serialized); + final ObjectInputStream objectInputStream = + new ObjectInputStream(byteArrayInputStream)) { + final Queue<CassandraSplit> splitsToReassign = new ArrayDeque<>(); + final int splitsToReassignSize = objectInputStream.readInt(); + for (int i = 0; i < splitsToReassignSize; i++) { + final int splitSize = objectInputStream.readInt(); + final byte[] splitBytes = new byte[splitSize]; + IOUtils.readFully(objectInputStream, splitBytes, 0, splitSize); + final CassandraSplit split = + CassandraSplitSerializer.INSTANCE.deserialize( + CassandraSplitSerializer.CURRENT_VERSION, splitBytes); + splitsToReassign.add(split); + } + + final long numSplitsLeftToGenerate = objectInputStream.readLong(); + final BigInteger increment = BigIntegerSerializationUtils.read(objectInputStream); + final BigInteger startToken = BigIntegerSerializationUtils.read(objectInputStream); + final BigInteger maxToken = BigIntegerSerializationUtils.read(objectInputStream); + + return new CassandraEnumeratorState( + numSplitsLeftToGenerate, increment, startToken, maxToken, splitsToReassign); + } + } +} diff --git a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraSplitEnumerator.java b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraSplitEnumerator.java new file mode 100644 index 0000000..1337846 --- /dev/null +++ b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraSplitEnumerator.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.enumerator; + +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.SplitsGenerator; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.List; + +import static org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.MURMUR3PARTITIONER; +import static org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.RANDOMPARTITIONER; + +/** {@link SplitEnumerator} that splits Cassandra cluster into {@link CassandraSplit}s. */ +public final class CassandraSplitEnumerator + implements SplitEnumerator<CassandraSplit, CassandraEnumeratorState> { + private static final Logger LOG = LoggerFactory.getLogger(CassandraSplitEnumerator.class); + + private final SplitEnumeratorContext<CassandraSplit> enumeratorContext; + private CassandraEnumeratorState state; + private final Cluster cluster; + private final Long maxSplitMemorySize; + private final Session session; + private final String keyspace; + private final String table; + + public CassandraSplitEnumerator( + SplitEnumeratorContext<CassandraSplit> enumeratorContext, + CassandraEnumeratorState state, + ClusterBuilder clusterBuilder, + Long maxSplitMemorySize, + String keyspace, + String table) { + this.enumeratorContext = enumeratorContext; + this.state = state == null ? new CassandraEnumeratorState() : state /* snapshot restore*/; + this.cluster = clusterBuilder.getCluster(); + this.maxSplitMemorySize = maxSplitMemorySize; + this.session = cluster.newSession(); + this.keyspace = keyspace; + this.table = table; + } + + @Override + public void start() { + enumeratorContext.callAsync( + this::prepareSplits, + (preparedState, throwable) -> { + LOG.debug("Initialized CassandraEnumeratorState: {}", preparedState.toString()); + state = preparedState; + }); + } + + private CassandraEnumeratorState prepareSplits() { + final int parallelism = enumeratorContext.currentParallelism(); + final String partitionerName = cluster.getMetadata().getPartitioner(); + final SplitsGenerator.CassandraPartitioner partitioner = + partitionerName.contains(MURMUR3PARTITIONER.getClassName()) + ? MURMUR3PARTITIONER + : RANDOMPARTITIONER; + final SplitsGenerator splitsGenerator = + new SplitsGenerator( + partitioner, session, keyspace, table, parallelism, maxSplitMemorySize); + return splitsGenerator.prepareSplits(); + } + + @Override + public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { + checkReaderRegistered(subtaskId); + final CassandraSplit cassandraSplit = state.getNextSplit(); + if (cassandraSplit != null) { + LOG.info("Assigning splits to reader {}", subtaskId); + enumeratorContext.assignSplit(cassandraSplit, subtaskId); + } else { + LOG.info( + "No split assigned to reader {} because the enumerator has no unassigned split left. Sending NoMoreSplitsEvent to reader", + subtaskId); + enumeratorContext.signalNoMoreSplits(subtaskId); + } + } + + @Override + public void addSplitsBack(List<CassandraSplit> splits, int subtaskId) { + // splits that were assigned to a failed reader and that were not part of a checkpoint, so + // after restoration, they need to be reassigned + state.addSplitsBack(splits); + } + + @Override + public void addReader(int subtaskId) { + // nothing to do on reader registration as the CassandraSplits are generated lazily + } + + private void checkReaderRegistered(int readerId) { + if (!enumeratorContext.registeredReaders().containsKey(readerId)) { + throw new IllegalStateException( + String.format("Reader %d is not registered to source coordinator", readerId)); + } + } + + @Override + public CassandraEnumeratorState snapshotState(long checkpointId) { + return state; + } + + @Override + public void close() throws IOException { + try { + if (session != null) { + session.close(); + } + } catch (Exception e) { + LOG.error("Error while closing session.", e); + } + try { + if (cluster != null) { + cluster.close(); + } + } catch (Exception e) { + LOG.error("Error while closing cluster.", e); + } + } +} diff --git a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraRecordEmitter.java b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraRecordEmitter.java new file mode 100644 index 0000000..8287057 --- /dev/null +++ b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraRecordEmitter.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.reader; + +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.connector.base.source.reader.RecordEmitter; +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; + +import com.datastax.driver.core.ColumnDefinitions; +import com.datastax.driver.core.ExecutionInfo; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.function.Function; + +/** + * {@link RecordEmitter} that converts the {@link CassandraRow} read by the {@link + * CassandraSplitReader} to specified POJO and output it. This class uses the Cassandra driver + * mapper to map the row to the POJO. + * + * @param <OUT> type of POJO record to output + */ +class CassandraRecordEmitter<OUT> implements RecordEmitter<CassandraRow, OUT, CassandraSplit> { + + private final Function<ResultSet, OUT> map; + + public CassandraRecordEmitter(Function<ResultSet, OUT> map) { + this.map = map; + } + + @Override + public void emitRecord( + CassandraRow cassandraRow, SourceOutput<OUT> output, CassandraSplit cassandraSplit) { + // Mapping from a row to a Class<OUT> is a complex operation involving reflection API. + // It is better to use Cassandra mapper for it. + // But the mapper takes only a resultSet as input hence forging one containing only the Row + ResultSet resultSet = new SingleRowResultSet(cassandraRow); + // output the pojo based on the cassandraRow + output.collect(map.apply(resultSet)); + } + + private static class SingleRowResultSet implements ResultSet { + private final CassandraRow cassandraRow; + private final Row row; + + private SingleRowResultSet(CassandraRow cassandraRow) { + this.cassandraRow = cassandraRow; + this.row = cassandraRow.getRow(); + } + + @Override + public Row one() { + return row; + } + + @Override + public ColumnDefinitions getColumnDefinitions() { + return row.getColumnDefinitions(); + } + + @Override + public boolean wasApplied() { + return true; + } + + @Override + public boolean isExhausted() { + return true; + } + + @Override + public boolean isFullyFetched() { + return true; + } + + @Override + public int getAvailableWithoutFetching() { + return 1; + } + + @Override + public ListenableFuture<ResultSet> fetchMoreResults() { + return Futures.immediateFuture(null); + } + + @Override + public List<Row> all() { + return Collections.singletonList(row); + } + + @Override + public Iterator<Row> iterator() { + return new Iterator<Row>() { + + @Override + public boolean hasNext() { + return true; + } + + @Override + public Row next() { + return row; + } + }; + } + + @Override + public ExecutionInfo getExecutionInfo() { + return cassandraRow.getExecutionInfo(); + } + + @Override + public List<ExecutionInfo> getAllExecutionInfo() { + return Collections.singletonList(cassandraRow.getExecutionInfo()); + } + } +} diff --git a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraRow.java b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraRow.java new file mode 100644 index 0000000..11c078e --- /dev/null +++ b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraRow.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.reader; + +import com.datastax.driver.core.ExecutionInfo; +import com.datastax.driver.core.Row; + +/** + * Wrapper for Cassandra {@link Row} that stores {@link ExecutionInfo} Cassandra statistics about + * the query execution that produced this row. {@link ExecutionInfo} is useful for using the + * Cassandra mapper during row translation to pojo. + */ +public class CassandraRow { + + private final Row row; + private final ExecutionInfo executionInfo; + + public CassandraRow(Row row, ExecutionInfo executionInfo) { + this.row = row; + this.executionInfo = executionInfo; + } + + public Row getRow() { + return row; + } + + public ExecutionInfo getExecutionInfo() { + return executionInfo; + } +} diff --git a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSourceReader.java b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSourceReader.java new file mode 100644 index 0000000..66eefcb --- /dev/null +++ b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSourceReader.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.reader; + +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import com.datastax.driver.mapping.Mapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * Cassandra {@link SourceReader} that reads one {@link CassandraSplit} using a single thread. + * + * @param <OUT> the type of elements produced by the source + */ +class CassandraSourceReader<OUT> + extends SingleThreadMultiplexSourceReaderBase< + CassandraRow, OUT, CassandraSplit, CassandraSplit> { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraSourceReader.class); + + private final Cluster cluster; + private final Session session; + + // created by the factory + CassandraSourceReader( + SourceReaderContext context, + String query, + String keyspace, + String table, + Cluster cluster, + Session session, + Mapper<OUT> mapper) { + super( + () -> new CassandraSplitReader(cluster, session, query, keyspace, table), + new CassandraRecordEmitter<>(resultSet -> mapper.map(resultSet).one()), + context.getConfiguration(), + context); + this.cluster = cluster; + this.session = session; + } + + @Override + public void start() { + context.sendSplitRequest(); + } + + @Override + protected void onSplitFinished(Map<String, CassandraSplit> finishedSplitIds) { + context.sendSplitRequest(); + } + + @Override + protected CassandraSplit initializedState(CassandraSplit cassandraSplit) { + return cassandraSplit; + } + + @Override + protected CassandraSplit toSplitType(String splitId, CassandraSplit cassandraSplit) { + return cassandraSplit; + } + + @Override + public void close() throws Exception { + super.close(); + try { + if (session != null) { + session.close(); + } + } catch (Exception e) { + LOG.error("Error while closing session.", e); + } + try { + if (cluster != null) { + cluster.close(); + } + } catch (Exception e) { + LOG.error("Error while closing cluster.", e); + } + } +} diff --git a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSourceReaderFactory.java b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSourceReaderFactory.java new file mode 100644 index 0000000..3d06097 --- /dev/null +++ b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSourceReaderFactory.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.reader; + +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.apache.flink.streaming.connectors.cassandra.MapperOptions; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import com.datastax.driver.mapping.Mapper; +import com.datastax.driver.mapping.MappingManager; + +/** + * Factory to create {@link CassandraSourceReader}s and allow sharing the cluster and the session + * objects. + */ +public class CassandraSourceReaderFactory<OUT> { + public CassandraSourceReader<OUT> create( + SourceReaderContext context, + ClusterBuilder clusterBuilder, + Class<OUT> pojoClass, + String query, + String keyspace, + String table, + MapperOptions mapperOptions) { + Cluster cluster = clusterBuilder.getCluster(); + Session session = cluster.connect(); + Mapper<OUT> mapper = new MappingManager(session).mapper(pojoClass); + if (mapperOptions != null) { + Mapper.Option[] optionsArray = mapperOptions.getMapperOptions(); + if (optionsArray != null) { + mapper.setDefaultGetOptions(optionsArray); + } + } + return new CassandraSourceReader<>( + context, query, keyspace, table, cluster, session, mapper); + } +} diff --git a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java new file mode 100644 index 0000000..2344705 --- /dev/null +++ b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.reader; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.cassandra.source.CassandraSource; +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ColumnMetadata; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Matcher; +import java.util.stream.Collectors; + +/** + * {@link SplitReader} for Cassandra source. This class is responsible for fetching the records as + * {@link CassandraRow}s. For that, it executes a range query (query that outputs records belonging + * to Cassandra token range) based on the user specified query. + */ +class CassandraSplitReader implements SplitReader<CassandraRow, CassandraSplit> { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraSplitReader.class); + + private final Cluster cluster; + private final Session session; + private final Set<CassandraSplit> unprocessedSplits; + private final AtomicBoolean wakeup = new AtomicBoolean(false); + private final String query; + private final String keyspace; + private final String table; + + public CassandraSplitReader( + Cluster cluster, Session session, String query, String keyspace, String table) { + this.unprocessedSplits = new HashSet<>(); + this.query = query; + this.keyspace = keyspace; + this.table = table; + this.cluster = cluster; + this.session = session; + } + + @Override + public RecordsWithSplitIds<CassandraRow> fetch() { + Map<String, Collection<CassandraRow>> recordsBySplit = new HashMap<>(); + Set<String> finishedSplits = new HashSet<>(); + + Metadata clusterMetadata = cluster.getMetadata(); + String partitionKey = getPartitionKey(clusterMetadata); + String finalQuery = generateRangeQuery(query, partitionKey); + PreparedStatement preparedStatement = session.prepare(finalQuery); + + // Set wakeup to false to start consuming + wakeup.compareAndSet(true, false); + for (CassandraSplit cassandraSplit : unprocessedSplits) { + // allow to interrupt the reading of splits especially the blocking session.execute() + // call as requested in the API + if (wakeup.get()) { + break; + } + try { + Token startToken = + clusterMetadata.newToken(cassandraSplit.getRingRangeStart().toString()); + Token endToken = + clusterMetadata.newToken(cassandraSplit.getRingRangeEnd().toString()); + final ResultSet resultSet = + session.execute( + preparedStatement + .bind() + .setToken(0, startToken) + .setToken(1, endToken)); + // add all the records of the split to the output (in memory). + // It is safe because each split has a configurable maximum memory size + addRecordsToOutput(resultSet, cassandraSplit, recordsBySplit); + // add the already read (or even empty) split to finished splits + finishedSplits.add(cassandraSplit.splitId()); + // for reentrant calls: if fetch is restarted, + // do not reprocess the already processed splits + unprocessedSplits.remove(cassandraSplit); + } catch (Exception ex) { + LOG.error("Error while reading split ", ex); + } + } + return new RecordsBySplits<>(recordsBySplit, finishedSplits); + } + + private String getPartitionKey(Metadata clusterMetadata) { + return clusterMetadata.getKeyspace(keyspace).getTable(table).getPartitionKey().stream() + .map(ColumnMetadata::getName) + .collect(Collectors.joining(",")); + } + + @Override + public void wakeUp() { + wakeup.compareAndSet(false, true); + } + + @Override + public void handleSplitsChanges(SplitsChange<CassandraSplit> splitsChanges) { + unprocessedSplits.addAll(splitsChanges.splits()); + } + + /** + * Utility method to add the ring token filtering clauses to the user query to generate the + * split query. For example: + * + * <ul> + * <li><code>"select * from + * keyspace.table where field1=value1;"</code> will be transformed into <code> + * "select * from + * keyspace.table where (token(partitionKey) >= ?) AND (token(partitionKey) < ?) AND + * field1=value1;"</code> + * <li><code>"select * from + * keyspace.table;"</code> will be transformed into <code> + * "select * from keyspace.table WHERE + * (token(%s) >= ?) AND (token(%s) < ?);"</code> + * </ul> + * + * @param query the user input query + * @param partitionKey Cassandra partition key of the user provided table + * @return the final split query that will be sent to the Cassandra cluster + */ + @VisibleForTesting + static String generateRangeQuery(String query, String partitionKey) { + Matcher queryMatcher = CassandraSource.SELECT_REGEXP.matcher(query); + if (!queryMatcher.matches()) { + throw new IllegalStateException( + String.format( + "Failed to generate range query out of the provided query: %s", query)); + } + final int whereIndex = query.toLowerCase().indexOf("where"); + int insertionPoint; + String filter; + if (whereIndex != -1) { + insertionPoint = whereIndex + "where".length(); + filter = + String.format( + " (token(%s) >= ?) AND (token(%s) < ?) AND", + partitionKey, partitionKey); + } else { + // end of keyspace.table + insertionPoint = queryMatcher.end(2); + filter = + String.format( + " WHERE (token(%s) >= ?) AND (token(%s) < ?)", + partitionKey, partitionKey); + } + return String.format( + "%s%s%s", + query.substring(0, insertionPoint), filter, query.substring(insertionPoint)); + } + + /** + * This method populates the {@code Map<String, Collection<CassandraRow>> recordsBySplit} map + * that is used to create the {@link RecordsBySplits} that are output by the fetch method. It + * modifies its {@code output} parameter. + */ + private void addRecordsToOutput( + ResultSet resultSet, + CassandraSplit cassandraSplit, + Map<String, Collection<CassandraRow>> output) { + resultSet.forEach( + row -> + output.computeIfAbsent(cassandraSplit.splitId(), id -> new ArrayList<>()) + .add(new CassandraRow(row, resultSet.getExecutionInfo()))); + } + + @Override + public void close() throws Exception { + // nothing to do as the cluster/session is managed by the SourceReader + } +} diff --git a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplit.java b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplit.java new file mode 100644 index 0000000..556b87d --- /dev/null +++ b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplit.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.split; + +import org.apache.flink.api.connector.source.SourceSplit; + +import java.io.Serializable; +import java.math.BigInteger; + +/** + * Immutable {@link SourceSplit} for Cassandra source. A Cassandra split is a slice of the Cassandra + * tokens ring (i.e. a ringRange). + */ +public class CassandraSplit implements SourceSplit, Serializable { + + private final BigInteger ringRangeStart; + private final BigInteger ringRangeEnd; + + public CassandraSplit(BigInteger ringRangeStart, BigInteger ringRangeEnd) { + this.ringRangeStart = ringRangeStart; + this.ringRangeEnd = ringRangeEnd; + } + + public BigInteger getRingRangeStart() { + return ringRangeStart; + } + + public BigInteger getRingRangeEnd() { + return ringRangeEnd; + } + + @Override + public String splitId() { + return String.format("(%s,%s)", ringRangeStart.toString(), ringRangeEnd.toString()); + } + + @Override + public String toString() { + return splitId(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CassandraSplit other = (CassandraSplit) o; + return ringRangeStart.equals(other.ringRangeStart) + && ringRangeEnd.equals(other.ringRangeEnd); + } + + @Override + public int hashCode() { + return 31 * ringRangeStart.hashCode() + ringRangeEnd.hashCode(); + } +} diff --git a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java new file mode 100644 index 0000000..74fa573 --- /dev/null +++ b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.split; + +import org.apache.flink.connector.cassandra.source.utils.BigIntegerSerializationUtils; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; + +import java.io.IOException; +import java.math.BigInteger; + +/** Serializer for {@link CassandraSplit}. */ +public class CassandraSplitSerializer implements SimpleVersionedSerializer<CassandraSplit> { + + public static final CassandraSplitSerializer INSTANCE = new CassandraSplitSerializer(); + private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE = + ThreadLocal.withInitial(() -> new DataOutputSerializer(64)); + + public static final int CURRENT_VERSION = 0; + + private CassandraSplitSerializer() {} + + @Override + public int getVersion() { + return CURRENT_VERSION; + } + + @Override + public byte[] serialize(CassandraSplit cassandraSplit) throws IOException { + final DataOutputSerializer out = SERIALIZER_CACHE.get(); + BigIntegerSerializationUtils.write(cassandraSplit.getRingRangeStart(), out); + BigIntegerSerializationUtils.write(cassandraSplit.getRingRangeEnd(), out); + final byte[] result = out.getCopyOfBuffer(); + out.clear(); + return result; + } + + @Override + public CassandraSplit deserialize(int version, byte[] serialized) throws IOException { + final DataInputDeserializer in = new DataInputDeserializer(serialized); + + final BigInteger ringRangeStart = BigIntegerSerializationUtils.read(in); + final BigInteger ringRangeEnd = BigIntegerSerializationUtils.read(in); + return new CassandraSplit(ringRangeStart, ringRangeEnd); + } +} diff --git a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java new file mode 100644 index 0000000..e44be34 --- /dev/null +++ b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/SplitsGenerator.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.split; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorState; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigInteger; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; + +/** + * This class prepares the generation of {@link CassandraSplit}s based on Cassandra cluster + * partitioner and cluster statistics. It estimates the total size of the table using Cassandra + * system table system.size_estimates. + */ +public final class SplitsGenerator { + + private static final Logger LOG = LoggerFactory.getLogger(SplitsGenerator.class); + + private final CassandraPartitioner partitioner; + private final Session session; + private final String keyspace; + private final String table; + private final int parallelism; + private final long maxSplitMemorySize; + + public SplitsGenerator( + CassandraPartitioner partitioner, + Session session, + String keyspace, + String table, + int parallelism, + long maxSplitMemorySize) { + this.partitioner = partitioner; + this.session = session; + this.keyspace = keyspace; + this.table = table; + this.parallelism = parallelism; + this.maxSplitMemorySize = maxSplitMemorySize; + } + + /** + * Prepare the {@param CassandraEnumeratorState} for lazy generation of {@link CassandraSplit}s: + * calculate {@code numSplitsToGenerate} based on estimated target table size and provided + * {@code maxSplitMemorySize} and calculate {@code increment} which is the size of a split in + * tokens. + */ + public CassandraEnumeratorState prepareSplits() { + final long numSplitsToGenerate = decideOnNumSplits(); + final BigInteger increment = + (partitioner.ringSize).divide(new BigInteger(String.valueOf(numSplitsToGenerate))); + final BigInteger startToken = partitioner.minToken; + return new CassandraEnumeratorState( + numSplitsToGenerate, + increment, + startToken, + partitioner.maxToken, + new ArrayDeque<>()); + } + + /** + * Determine {@code numSplits} based on the estimation of the target table size and configured + * {@code maxSplitMemorySize}. Provide fallbacks when table size is unavailable, too few splits + * are calculated. + */ + private long decideOnNumSplits() { + long numSplits; + final long estimateTableSize = estimateTableSize(); + if (estimateTableSize == 0) { // size estimates unavailable + LOG.info( + "Cassandra size estimates are not available for {}.{} table. Creating as many splits as parallelism ({})", + keyspace, + table, + parallelism); + numSplits = parallelism; + } else { // create estimateTableSize / maxSplitMemorySize splits. Otherwise, create + // parallelism splits if that makes too few splits. + LOG.debug( + "Estimated size for {}.{} table is {} bytes", + keyspace, + table, + estimateTableSize); + numSplits = + estimateTableSize / maxSplitMemorySize == 0 + ? parallelism + : estimateTableSize / maxSplitMemorySize; + LOG.info( + "maxSplitMemorySize set value ({}) leads to the creation of {} splits", + maxSplitMemorySize, + numSplits); + } + return numSplits; + } + + /** + * Estimates the size of the table in bytes. Cassandra size estimates can be 0 if the data was + * just inserted and the amount of data in the table was small. This is very common situation + * during tests. + */ + @VisibleForTesting + public long estimateTableSize() { + List<TokenRange> tokenRanges = getTokenRangesOfTable(); + long size = 0L; + for (TokenRange tokenRange : tokenRanges) { + size += tokenRange.meanPartitionSize * tokenRange.partitionCount; + } + final float ringFraction = getRingFraction(tokenRanges); + // ringFraction can be null if the size estimates are not available + return ringFraction != 0 ? Math.round(size / ringFraction) : 0L; + } + + /** + * The values that we get from system.size_estimates are for one node. We need to extrapolate to + * the whole cluster. This method estimates the percentage, the node represents in the cluster. + * + * @param tokenRanges The list of {@link TokenRange} to estimate + * @return The percentage the node represent in the whole cluster + */ + private float getRingFraction(List<TokenRange> tokenRanges) { + BigInteger addressedTokens = BigInteger.ZERO; + for (TokenRange tokenRange : tokenRanges) { + addressedTokens = + addressedTokens.add(distance(tokenRange.rangeStart, tokenRange.rangeEnd)); + } + // it is < 1 because it is a percentage + return addressedTokens.divide(partitioner.ringSize).floatValue(); + } + + /** Gets the list of token ranges that the table occupies on a given Cassandra node. */ + private List<TokenRange> getTokenRangesOfTable() { + ResultSet resultSet = + session.execute( + "SELECT range_start, range_end, partitions_count, mean_partition_size FROM " + + "system.size_estimates WHERE keyspace_name = ? AND table_name = ?", + keyspace, + table); + + ArrayList<TokenRange> tokenRanges = new ArrayList<>(); + for (Row row : resultSet) { + TokenRange tokenRange = + new TokenRange( + row.getLong("partitions_count"), + row.getLong("mean_partition_size"), + row.getString("range_start"), + row.getString("range_end")); + tokenRanges.add(tokenRange); + } + // The table may not contain the estimates yet + // or have partitions_count and mean_partition_size fields = 0 + // if the data was just inserted and the amount of data in the table was small. + // This is very common situation during tests, + // when we insert a few rows and immediately query them. + // However, for tiny data sets the lack of size estimates is not a problem at all, + // because we don't want to split tiny data anyways. + // Therefore, we're not issuing a warning if the result set was empty + // or mean_partition_size and partitions_count = 0. + return tokenRanges; + } + + /** + * Measure distance between two tokens. + * + * @param token1 The measure is symmetrical so token1 and token2 can be exchanged + * @param token2 The measure is symmetrical so token1 and token2 can be exchanged + * @return Number of tokens that separate token1 and token2 + */ + private BigInteger distance(BigInteger token1, BigInteger token2) { + // token2 > token1 + if (token2.compareTo(token1) > 0) { + return token2.subtract(token1); + } else { + return token2.subtract(token1).add(partitioner.ringSize); + } + } + + /** enum to configure the SplitGenerator based on Apache Cassandra partitioners. */ + public enum CassandraPartitioner { + MURMUR3PARTITIONER( + "Murmur3Partitioner", + BigInteger.valueOf(2).pow(63).negate(), + BigInteger.valueOf(2).pow(63).subtract(BigInteger.ONE)), + RANDOMPARTITIONER( + "RandomPartitioner", + BigInteger.ZERO, + BigInteger.valueOf(2).pow(127).subtract(BigInteger.ONE)); + + private final BigInteger minToken; + private final BigInteger maxToken; + private final BigInteger ringSize; + private final String className; + + CassandraPartitioner(String className, BigInteger minToken, BigInteger maxToken) { + this.className = className; + this.minToken = minToken; + this.maxToken = maxToken; + this.ringSize = maxToken.subtract(minToken).add(BigInteger.ONE); + } + + public String getClassName() { + return className; + } + } + + private static class TokenRange { + private final long partitionCount; + private final long meanPartitionSize; + private final BigInteger rangeStart; + private final BigInteger rangeEnd; + + private TokenRange( + long partitionCount, long meanPartitionSize, String rangeStart, String rangeEnd) { + this.partitionCount = partitionCount; + this.meanPartitionSize = meanPartitionSize; + this.rangeStart = new BigInteger(rangeStart); + this.rangeEnd = new BigInteger(rangeEnd); + } + } +} diff --git a/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/utils/BigIntegerSerializationUtils.java b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/utils/BigIntegerSerializationUtils.java new file mode 100644 index 0000000..74f25d0 --- /dev/null +++ b/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/utils/BigIntegerSerializationUtils.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.utils; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.math.BigInteger; + +/** Utils for BigInteger reading and writing in serde context. */ +public class BigIntegerSerializationUtils { + public static void write(BigInteger bigInteger, DataOutput output) throws IOException { + final byte[] bigIntegerBytes = bigInteger.toByteArray(); + output.writeInt(bigIntegerBytes.length); + output.write(bigIntegerBytes); + } + + public static BigInteger read(DataInput input) throws IOException { + final int bigIntegerSize = input.readInt(); + final byte[] bigIntegerBytes = new byte[bigIntegerSize]; + input.readFully(bigIntegerBytes); + return new BigInteger(bigIntegerBytes); + } +} diff --git a/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java b/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java index 764c001..441cc09 100644 --- a/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java +++ b/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java @@ -21,6 +21,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat; import org.apache.flink.batch.connectors.cassandra.CassandraPojoOutputFormat; +import org.apache.flink.connectors.cassandra.utils.Pojo; import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; import com.datastax.driver.core.Cluster; diff --git a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java new file mode 100644 index 0000000..83ecaae --- /dev/null +++ b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source; + +import org.apache.flink.connector.cassandra.source.enumerator.CassandraEnumeratorState; +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; +import org.apache.flink.connector.cassandra.source.split.SplitsGenerator; +import org.apache.flink.connector.testframe.environment.ClusterControllable; +import org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment; +import org.apache.flink.connector.testframe.environment.TestEnvironment; +import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext; +import org.apache.flink.connector.testframe.junit.annotations.TestContext; +import org.apache.flink.connector.testframe.junit.annotations.TestEnv; +import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem; +import org.apache.flink.connector.testframe.junit.annotations.TestSemantics; +import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase; +import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions; +import org.apache.flink.connectors.cassandra.utils.Pojo; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.util.CloseableIterator; + +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.TestTemplate; + +import java.util.List; + +import static java.util.concurrent.CompletableFuture.runAsync; +import static org.apache.flink.connector.cassandra.source.CassandraTestContext.CassandraTestContextFactory; +import static org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.MURMUR3PARTITIONER; +import static org.apache.flink.connector.cassandra.source.split.SplitsGenerator.CassandraPartitioner.RANDOMPARTITIONER; +import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for the Cassandra source. */ +class CassandraSourceITCase extends SourceTestSuiteBase<Pojo> { + + @TestEnv MiniClusterTestEnvironment flinkTestEnvironment = new MiniClusterTestEnvironment(); + + @TestExternalSystem + CassandraTestEnvironment cassandraTestEnvironment = new CassandraTestEnvironment(true); + + @TestSemantics + CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE}; + + @TestContext + CassandraTestContextFactory contextFactory = + new CassandraTestContextFactory(cassandraTestEnvironment); + + @TestTemplate + @DisplayName("Test basic splitting with MURMUR3PARTITIONER (default Cassandra partitioner)") + public void testGenerateSplitsMurMur3Partitioner( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) { + final int parallelism = 2; + SplitsGenerator generator = + new SplitsGenerator( + MURMUR3PARTITIONER, + cassandraTestEnvironment.getSession(), + CassandraTestEnvironment.KEYSPACE, + CassandraTestEnvironment.SPLITS_TABLE, + parallelism, + CassandraSource.MAX_SPLIT_MEMORY_SIZE_DEFAULT); + final CassandraEnumeratorState state = generator.prepareSplits(); + + // no maxSplitMemorySize specified falling back number of splits = parallelism + assertThat(state.getNumSplitsLeftToGenerate()).isEqualTo(parallelism); + + final CassandraSplit split1 = state.getNextSplit(); + checkNotNull(split1, "No splits left to generate in CassandraEnumeratorState"); + assertThat(split1.splitId()).isEqualTo("(-9223372036854775808,0)"); + + final CassandraSplit split2 = state.getNextSplit(); + checkNotNull(split2, "No splits left to generate in CassandraEnumeratorState"); + assertThat(split2.splitId()).isEqualTo("(0,9223372036854775807)"); + } + + @TestTemplate + @DisplayName("Test basic splitting with RANDOMPARTITIONER") + public void testGenerateSplitsRandomPartitioner( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) { + final int parallelism = 2; + final SplitsGenerator generator = + new SplitsGenerator( + RANDOMPARTITIONER, + cassandraTestEnvironment.getSession(), + CassandraTestEnvironment.KEYSPACE, + CassandraTestEnvironment.SPLITS_TABLE, + parallelism, + CassandraSource.MAX_SPLIT_MEMORY_SIZE_DEFAULT); + final CassandraEnumeratorState state = generator.prepareSplits(); + + // no maxSplitMemorySize specified falling back number of splits = parallelism + assertThat(state.getNumSplitsLeftToGenerate()).isEqualTo(parallelism); + + final CassandraSplit split1 = state.getNextSplit(); + checkNotNull(split1, "No splits left to generate in CassandraEnumeratorState"); + assertThat(split1.splitId()).isEqualTo("(0,85070591730234615865843651857942052864)"); + + final CassandraSplit split2 = state.getNextSplit(); + checkNotNull(split2, "No splits left to generate in CassandraEnumeratorState"); + assertThat(split2.splitId()) + .isEqualTo( + "(85070591730234615865843651857942052864,170141183460469231731687303715884105727)"); + } + + @TestTemplate + @DisplayName("Test splitting with a correct split size set") + public void testGenerateSplitsWithCorrectSize( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) + throws Exception { + final int parallelism = 2; + final long maxSplitMemorySize = 10000L; + final SplitsGenerator generator = + new SplitsGenerator( + MURMUR3PARTITIONER, + cassandraTestEnvironment.getSession(), + CassandraTestEnvironment.KEYSPACE, + CassandraTestEnvironment.SPLITS_TABLE, + parallelism, + maxSplitMemorySize); + final long tableSize = generator.estimateTableSize(); + // sanity check to ensure that the size estimates were updated in the Cassandra cluster + assertThat(tableSize).isEqualTo(35840L); + final CassandraEnumeratorState cassandraEnumeratorState = generator.prepareSplits(); + assertThat(cassandraEnumeratorState.getNumSplitsLeftToGenerate()) + // regular case + .isEqualTo(tableSize / maxSplitMemorySize); + } + + @TestTemplate + @DisplayName("Test splitting with a too big split size set") + public void testGenerateSplitsWithTooHighMaximumSplitSize( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) + throws Exception { + final int parallelism = 20; + final SplitsGenerator generator = + new SplitsGenerator( + MURMUR3PARTITIONER, + cassandraTestEnvironment.getSession(), + CassandraTestEnvironment.KEYSPACE, + CassandraTestEnvironment.SPLITS_TABLE, + parallelism, + 100_000_000L); + // sanity check to ensure that the size estimates were updated in the Cassandra cluster + assertThat(generator.estimateTableSize()).isEqualTo(35840L); + final CassandraEnumeratorState cassandraEnumeratorState = generator.prepareSplits(); + // maxSplitMemorySize is too high compared to table size. Falling back to parallelism splits + // too low maxSplitMemorySize is guarded by an assertion > min at source creation + assertThat(cassandraEnumeratorState.getNumSplitsLeftToGenerate()).isEqualTo(parallelism); + } + + // overridden to use unordered checks + @Override + protected void checkResultWithSemantic( + CloseableIterator<Pojo> resultIterator, + List<List<Pojo>> testData, + CheckpointingMode semantic, + Integer limit) { + if (limit != null) { + Runnable runnable = + () -> + CollectIteratorAssertions.assertUnordered(resultIterator) + .withNumRecordsLimit(limit) + .matchesRecordsFromSource(testData, semantic); + + assertThat(runAsync(runnable)).succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT); + } else { + CollectIteratorAssertions.assertUnordered(resultIterator) + .matchesRecordsFromSource(testData, semantic); + } + } + + @Disabled("Not a unbounded source") + @Override + public void testSourceMetrics( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) + throws Exception {} + + @Disabled("Not a unbounded source") + @Override + public void testSavepoint( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) {} + + @Disabled("Not a unbounded source") + @Override + public void testScaleUp( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) {} + + @Disabled("Not a unbounded source") + @Override + public void testScaleDown( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + CheckpointingMode semantic) {} + + @Disabled("Not a unbounded source") + @Override + public void testTaskManagerFailure( + TestEnvironment testEnv, + DataStreamSourceExternalContext<Pojo> externalContext, + ClusterControllable controller, + CheckpointingMode semantic) {} +} diff --git a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestContext.java b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestContext.java new file mode 100644 index 0000000..fb69f2b --- /dev/null +++ b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestContext.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.connector.testframe.external.ExternalContextFactory; +import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter; +import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext; +import org.apache.flink.connector.testframe.external.source.TestingSourceSettings; +import org.apache.flink.connectors.cassandra.utils.Pojo; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.apache.flink.streaming.connectors.cassandra.MapperOptions; + +import com.datastax.driver.core.Session; +import com.datastax.driver.mapping.Mapper; +import com.datastax.driver.mapping.MappingManager; + +import java.net.URL; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Junit {@link DataStreamSourceExternalContext} that contains everything related to Cassandra + * source test cases especially test table management. + */ +public class CassandraTestContext implements DataStreamSourceExternalContext<Pojo> { + + static final String TABLE_NAME = "batches"; + + private static final String CREATE_TABLE_QUERY = + "CREATE TABLE " + + CassandraTestEnvironment.KEYSPACE + + "." + + TABLE_NAME + + " (id text PRIMARY KEY, counter int, batch_id int)" + + ";"; + + private static final String DROP_TABLE_QUERY = + "DROP TABLE " + CassandraTestEnvironment.KEYSPACE + "." + TABLE_NAME + ";"; + + private static final int RECORDS_PER_SPLIT = 20; + + private final Mapper<Pojo> mapper; + private final MapperOptions mapperOptions; + private final ClusterBuilder clusterBuilder; + private final Session session; + private ExternalSystemSplitDataWriter<Pojo> splitDataWriter; + + public CassandraTestContext(CassandraTestEnvironment cassandraTestEnvironment) { + clusterBuilder = cassandraTestEnvironment.getClusterBuilder(); + session = cassandraTestEnvironment.getSession(); + createTable(); + mapper = new MappingManager(cassandraTestEnvironment.getSession()).mapper(Pojo.class); + mapperOptions = () -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)}; + } + + @Override + public TypeInformation<Pojo> getProducedType() { + return TypeInformation.of(Pojo.class); + } + + @Override + public List<URL> getConnectorJarPaths() { + return Collections.emptyList(); + } + + @Override + public Source<Pojo, ?, ?> createSource(TestingSourceSettings sourceSettings) + throws UnsupportedOperationException { + + return new CassandraSource<>( + clusterBuilder, + Pojo.class, + String.format( + "SELECT * FROM %s.%s;", CassandraTestEnvironment.KEYSPACE, TABLE_NAME), + mapperOptions); + } + + @Override + public ExternalSystemSplitDataWriter<Pojo> createSourceSplitDataWriter( + TestingSourceSettings sourceSettings) { + splitDataWriter = + new ExternalSystemSplitDataWriter<Pojo>() { + + @Override + public void writeRecords(List<Pojo> records) { + for (Pojo pojo : records) { + mapper.save(pojo, mapperOptions.getMapperOptions()); + } + } + + @Override + public void close() { + // nothing to do, cluster/session is shared at the CassandraTestEnvironment + // level + } + }; + return splitDataWriter; + } + + @Override + public List<Pojo> generateTestData( + TestingSourceSettings sourceSettings, int splitIndex, long seed) { + List<Pojo> testData = new ArrayList<>(RECORDS_PER_SPLIT); + // generate RECORDS_PER_SPLIT pojos per split and use splitId as pojo batchIndex so that + // pojos are considered equal when they belong to the same split + // as requested in implementation notes. + for (int i = 0; i < RECORDS_PER_SPLIT; i++) { + Pojo pojo = new Pojo(String.valueOf(seed + i), i, splitIndex); + testData.add(pojo); + } + return testData; + } + + @Override + public void close() throws Exception { + dropTable(); + // NB: cluster/session is shared at the CassandraTestEnvironment level + } + + private void createTable() { + session.execute(CassandraTestEnvironment.requestWithTimeout(CREATE_TABLE_QUERY)); + } + + private void dropTable() { + session.execute(CassandraTestEnvironment.requestWithTimeout(DROP_TABLE_QUERY)); + } + + static class CassandraTestContextFactory + implements ExternalContextFactory<CassandraTestContext> { + + private final CassandraTestEnvironment cassandraTestEnvironment; + + public CassandraTestContextFactory(CassandraTestEnvironment cassandraTestEnvironment) { + this.cassandraTestEnvironment = cassandraTestEnvironment; + } + + @Override + public CassandraTestContext createExternalContext(String testName) { + return new CassandraTestContext(cassandraTestEnvironment); + } + } +} diff --git a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java new file mode 100644 index 0000000..24b9e60 --- /dev/null +++ b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source; + +import org.apache.flink.connector.testframe.TestResource; +import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.QueryOptions; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; +import com.datastax.driver.core.SocketOptions; +import com.datastax.driver.core.Statement; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.CassandraContainer; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.OutputFrame; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.CassandraQueryWaitStrategy; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.net.InetSocketAddress; + +/** + * Junit test environment that contains everything needed at the test suite level: testContainer + * setup, keyspace setup, Cassandra cluster/session management ClusterBuilder setup). + */ +@Testcontainers +public class CassandraTestEnvironment implements TestResource { + private static final Logger LOG = LoggerFactory.getLogger(CassandraTestEnvironment.class); + private static final String DOCKER_CASSANDRA_IMAGE = "cassandra:4.0.8"; + private static final int CQL_PORT = 9042; + + private static final int READ_TIMEOUT_MILLIS = 36000; + + // flushing mem table to SS tables is an asynchronous operation that may take a while + private static final long FLUSH_MEMTABLES_DELAY = 30_000L; + + static final String KEYSPACE = "flink"; + + private static final String CREATE_KEYSPACE_QUERY = + "CREATE KEYSPACE " + + KEYSPACE + + " WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};"; + + static final String SPLITS_TABLE = "flinksplits"; + private static final String CREATE_SPLITS_TABLE_QUERY = + "CREATE TABLE " + KEYSPACE + "." + SPLITS_TABLE + " (id int PRIMARY KEY, counter int);"; + private static final String INSERT_INTO_FLINK_SPLITS = + "INSERT INTO " + KEYSPACE + "." + SPLITS_TABLE + " (id, counter)" + " VALUES (%d, %d)"; + private static final int NB_SPLITS_RECORDS = 1000; + + @Container private final CassandraContainer cassandraContainer; + + boolean insertTestDataForSplitSizeTests; + private Cluster cluster; + private Session session; + private ClusterBuilder clusterBuilder; + + public CassandraTestEnvironment(boolean insertTestDataForSplitSizeTests) { + this.insertTestDataForSplitSizeTests = insertTestDataForSplitSizeTests; + cassandraContainer = new CassandraContainer(DOCKER_CASSANDRA_IMAGE); + // more generous timeouts + addJavaOpts( + cassandraContainer, + "-Dcassandra.request_timeout_in_ms=30000", + "-Dcassandra.read_request_timeout_in_ms=15000", + "-Dcassandra.write_request_timeout_in_ms=6000"); + } + + @Override + public void startUp() throws Exception { + startEnv(); + } + + @Override + public void tearDown() throws Exception { + stopEnv(); + } + + private static void addJavaOpts(GenericContainer<?> container, String... opts) { + String jvmOpts = container.getEnvMap().getOrDefault("JVM_OPTS", ""); + container.withEnv("JVM_OPTS", jvmOpts + " " + StringUtils.join(opts, " ")); + } + + private void startEnv() throws Exception { + // configure container start to wait until cassandra is ready to receive queries + cassandraContainer.waitingFor(new CassandraQueryWaitStrategy()); + // start with retrials + cassandraContainer.start(); + cassandraContainer.followOutput( + new Slf4jLogConsumer(LOG), + OutputFrame.OutputType.END, + OutputFrame.OutputType.STDERR, + OutputFrame.OutputType.STDOUT); + + cluster = cassandraContainer.getCluster(); + clusterBuilder = + createBuilderWithConsistencyLevel( + ConsistencyLevel.ONE, + cassandraContainer.getHost(), + cassandraContainer.getMappedPort(CQL_PORT)); + + session = cluster.connect(); + session.execute(requestWithTimeout(CREATE_KEYSPACE_QUERY)); + // create a dedicated table for split size tests (to avoid having to flush with each test) + if (insertTestDataForSplitSizeTests) { + insertTestDataForSplitSizeTests(); + } + } + + private void insertTestDataForSplitSizeTests() throws Exception { + session.execute(requestWithTimeout(CREATE_SPLITS_TABLE_QUERY)); + for (int i = 0; i < NB_SPLITS_RECORDS; i++) { + session.execute(requestWithTimeout(String.format(INSERT_INTO_FLINK_SPLITS, i, i))); + } + flushMemTables(SPLITS_TABLE); + } + + private void stopEnv() { + + if (session != null) { + session.close(); + } + if (cluster != null) { + cluster.close(); + } + cassandraContainer.stop(); + } + + private ClusterBuilder createBuilderWithConsistencyLevel( + ConsistencyLevel consistencyLevel, String host, int port) { + return new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder.addContactPointsWithPorts(new InetSocketAddress(host, port)) + .withQueryOptions( + new QueryOptions() + .setConsistencyLevel(consistencyLevel) + .setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL)) + .withSocketOptions( + new SocketOptions() + // default timeout x 3 + .setConnectTimeoutMillis(15000) + // default timeout x3 and higher than + // request_timeout_in_ms at the cluster level + .setReadTimeoutMillis(READ_TIMEOUT_MILLIS)) + .withoutJMXReporting() + .withoutMetrics() + .build(); + } + }; + } + + /** + * Force the flush of cassandra memTables to SSTables in order to update size_estimates. It is + * needed for the tests because we just inserted records, we need to force cassandra to update + * size_estimates system table. + */ + void flushMemTables(String table) throws Exception { + cassandraContainer.execInContainer("nodetool", "flush", KEYSPACE, table); + Thread.sleep(FLUSH_MEMTABLES_DELAY); + } + + static Statement requestWithTimeout(String query) { + return new SimpleStatement(query).setReadTimeoutMillis(READ_TIMEOUT_MILLIS); + } + + public ClusterBuilder getClusterBuilder() { + return clusterBuilder; + } + + public Session getSession() { + return session; + } +} diff --git a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializerTest.java b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializerTest.java new file mode 100644 index 0000000..0a4a3ec --- /dev/null +++ b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializerTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.enumerator; + +import org.apache.flink.connector.cassandra.source.split.CassandraSplit; + +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; + +import org.junit.jupiter.api.Test; + +import java.math.BigInteger; +import java.util.ArrayDeque; +import java.util.Queue; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link CassandraEnumeratorStateSerializer}. */ +class CassandraEnumeratorStateSerializerTest { + + @Test + public void testSerdeRoundtrip() throws Exception { + final Queue<CassandraSplit> splitsToReassign = + new ArrayDeque<>( + ImmutableList.of( + new CassandraSplit(BigInteger.ZERO, BigInteger.TEN), + new CassandraSplit(BigInteger.TEN, BigInteger.ZERO))); + + final CassandraEnumeratorState cassandraEnumeratorState = + new CassandraEnumeratorState( + 10, BigInteger.ONE, BigInteger.ZERO, BigInteger.TEN, splitsToReassign); + + final byte[] serialized = + CassandraEnumeratorStateSerializer.INSTANCE.serialize(cassandraEnumeratorState); + final CassandraEnumeratorState deserialized = + CassandraEnumeratorStateSerializer.INSTANCE.deserialize( + CassandraEnumeratorStateSerializer.CURRENT_VERSION, serialized); + assertThat(deserialized) + .isEqualTo(cassandraEnumeratorState) + .withFailMessage( + "CassandraEnumeratorState is not the same as input object after serde roundtrip"); + } +} diff --git a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/reader/CassandraQueryTest.java b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/reader/CassandraQueryTest.java new file mode 100644 index 0000000..649eeca --- /dev/null +++ b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/reader/CassandraQueryTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.reader; + +import org.apache.flink.connector.cassandra.source.CassandraSource; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.regex.Matcher; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** tests for query generation and query sanity checks. */ +class CassandraQueryTest { + + @Test + public void testKeySpaceTableExtractionRegexp() { + Arrays.asList( + "select field FROM keyspace.table where field = value;", + "select * FROM keyspace.table;", + "select field1, field2 from keyspace.table;", + "select field1, field2 from keyspace.table LIMIT(1000);", + "select field1 from keyspace.table ;", + "select field1 from keyspace.table where field1=1;") + .forEach(CassandraQueryTest::assertQueryFormatCorrect); + + Arrays.asList( + "select field1 from table;", // missing keyspace + "select field1 from .table", // undefined keyspace var in a script + "select field1 from keyspace.;", // undefined table var in a script + "select field1 from keyspace.table" // missing ";" + ) + .forEach(CassandraQueryTest::assertQueryFormatIncorrect); + } + + @Test + public void testProhibitedClauses() { + Arrays.asList( + "SELECT COUNT(*) from flink.table;", + "SELECT AVG(*) from flink.table;", + "SELECT MIN(*) from flink.table;", + "SELECT MAX(*) from flink.table;", + "SELECT SUM(*) from flink.table;", + "SELECT field1, field2 from flink.table ORDER BY field1;", + "SELECT field1, field2 from flink.table GROUP BY field1;") + .forEach(CassandraQueryTest::assertProhibitedClauseRejected); + } + + @Test + public void testGenerateRangeQuery() { + String query; + String outputQuery; + + // query with where clause + query = "SELECT field FROM keyspace.table WHERE field = value;"; + outputQuery = CassandraSplitReader.generateRangeQuery(query, "field"); + assertThat(outputQuery) + .isEqualTo( + "SELECT field FROM keyspace.table WHERE (token(field) >= ?) AND (token(field) < ?) AND field = value;"); + + // query without where clause + query = "SELECT * FROM keyspace.table;"; + outputQuery = CassandraSplitReader.generateRangeQuery(query, "field"); + assertThat(outputQuery) + .isEqualTo( + "SELECT * FROM keyspace.table WHERE (token(field) >= ?) AND (token(field) < ?);"); + + // query without where clause but with another trailing clause + query = "SELECT field FROM keyspace.table LIMIT(1000);"; + outputQuery = CassandraSplitReader.generateRangeQuery(query, "field"); + assertThat(outputQuery) + .isEqualTo( + "SELECT field FROM keyspace.table WHERE (token(field) >= ?) AND (token(field) < ?) LIMIT(1000);"); + + // query with where clause and another trailing clause + query = "SELECT field FROM keyspace.table WHERE field = value LIMIT(1000);"; + outputQuery = CassandraSplitReader.generateRangeQuery(query, "field"); + assertThat(outputQuery) + .isEqualTo( + "SELECT field FROM keyspace.table WHERE (token(field) >= ?) AND (token(field) < ?) AND field = value LIMIT(1000);"); + } + + private static void assertQueryFormatIncorrect(String query) { + assertThatThrownBy(() -> CassandraSource.checkQueryValidity(query)) + .hasMessageContaining( + "Query must be of the form select ... from keyspace.table ...;"); + } + + private static void assertQueryFormatCorrect(String query) { + Matcher matcher = CassandraSource.SELECT_REGEXP.matcher(query); + assertThat(matcher.matches()).isTrue(); + assertThat(matcher.group(1)).isEqualTo("keyspace"); + assertThat(matcher.group(2)).isEqualTo("table"); + } + + private static void assertProhibitedClauseRejected(String query) { + assertThatThrownBy(() -> CassandraSource.checkQueryValidity(query)) + .hasMessageContaining( + "Aggregations/OrderBy are not supported because the query is executed on subsets/partitions of the input table"); + } +} diff --git a/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializerTest.java b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializerTest.java new file mode 100644 index 0000000..b79fba6 --- /dev/null +++ b/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializerTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.cassandra.source.split; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.math.BigInteger; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link CassandraSplitSerializer}. */ +class CassandraSplitSerializerTest { + + @Test + public void testSerdeRoundtrip() throws IOException { + final CassandraSplit testData = new CassandraSplit(BigInteger.ONE, BigInteger.TEN); + final byte[] serialized = CassandraSplitSerializer.INSTANCE.serialize(testData); + final CassandraSplit deserialized = + CassandraSplitSerializer.INSTANCE.deserialize( + CassandraSplitSerializer.CURRENT_VERSION, serialized); + assertThat(deserialized) + .isEqualTo(testData) + .withFailMessage( + "CassandraSplit is not the same as input object after serde roundtrip"); + } +} diff --git a/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/Pojo.java b/flink-connector-cassandra/src/test/java/org/apache/flink/connectors/cassandra/utils/Pojo.java similarity index 73% rename from flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/Pojo.java rename to flink-connector-cassandra/src/test/java/org/apache/flink/connectors/cassandra/utils/Pojo.java index 559f107..a32e492 100644 --- a/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/Pojo.java +++ b/flink-connector-cassandra/src/test/java/org/apache/flink/connectors/cassandra/utils/Pojo.java @@ -15,12 +15,13 @@ * limitations under the License. */ -package org.apache.flink.batch.connectors.cassandra.example; +package org.apache.flink.connectors.cassandra.utils; import com.datastax.driver.mapping.annotations.Column; import com.datastax.driver.mapping.annotations.Table; import java.io.Serializable; +import java.util.Objects; /** Test Pojo with DataStax annotations used. */ @Table(keyspace = "flink", name = "batches") @@ -69,4 +70,27 @@ public class Pojo implements Serializable { public void setBatchID(int batchId) { this.batchID = batchId; } + + @Override + public String toString() { + return String.format( + "{\"id\":\"%s\", \"counter\":%d, \"batchID\":%d}", id, counter, batchID); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Pojo pojo = (Pojo) o; + return counter == pojo.counter && batchID == pojo.batchID && id.equals(pojo.id); + } + + @Override + public int hashCode() { + return Objects.hash(id, counter, batchID); + } } diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml index 8cf47a1..770d68b 100644 --- a/tools/maven/suppressions.xml +++ b/tools/maven/suppressions.xml @@ -25,7 +25,7 @@ under the License. <suppressions> <!-- Cassandra connectors have to use guava directly --> <suppress - files="AbstractCassandraTupleSink.java|CassandraInputFormat.java|CassandraOutputFormatBase.java|OutputFormatBase.java|OutputFormatBaseTest.java|CassandraColumnarOutputFormatBase.java|CassandraSinkBase.java|CassandraSinkBaseTest.java|CassandraPojoSink.java|CassandraRowSink.java|CassandraTupleWriteAheadSink.java|CassandraRowWriteAheadSink.java|CassandraPojoOutputFormat.java" + files="AbstractCassandraTupleSink.java|CassandraInputFormat.java|CassandraOutputFormatBase.java|OutputFormatBase.java|OutputFormatBaseTest.java|CassandraColumnarOutputFormatBase.java|CassandraSinkBase.java|CassandraSinkBaseTest.java|CassandraPojoSink.java|CassandraRowSink.java|CassandraTupleWriteAheadSink.java|CassandraRowWriteAheadSink.java|CassandraPojoOutputFormat.java|CassandraRecordEmitter" checks="IllegalImport"/> </suppressions>