Repository: flink Updated Branches: refs/heads/master 0df8e0797 -> 1809cad6d
[FLINK-6225] [cassandra] Add a CassandraAppendTableSink. This closes #3748. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1809cad6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1809cad6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1809cad6 Branch: refs/heads/master Commit: 1809cad6d05522d6185a69ca14ddc275d5ebbbf1 Parents: 0df8e07 Author: Jing Fan <[email protected]> Authored: Thu Apr 20 17:54:36 2017 -0700 Committer: Fabian Hueske <[email protected]> Committed: Wed Nov 1 14:53:09 2017 +0100 ---------------------------------------------------------------------- docs/dev/table/sourceSinks.md | 44 ++++- .../flink-connector-cassandra/pom.xml | 6 + .../cassandra/CassandraAppendTableSink.java | 88 ++++++++++ .../connectors/cassandra/CassandraRowSink.java | 42 +++++ .../cassandra/CassandraRowWriteAheadSink.java | 162 +++++++++++++++++++ .../connectors/cassandra/CassandraSink.java | 40 ++++- .../cassandra/CassandraConnectorITCase.java | 75 ++++++++- .../java/typeutils/runtime/RowSerializer.java | 7 + tools/maven/suppressions.xml | 2 +- 9 files changed, 454 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/docs/dev/table/sourceSinks.md ---------------------------------------------------------------------- diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md index 43542f3..dfa7954 100644 --- a/docs/dev/table/sourceSinks.md +++ b/docs/dev/table/sourceSinks.md @@ -492,7 +492,8 @@ The following table lists the `TableSink`s which are provided with Flink. | **Class name** | **Maven dependency** | **Batch?** | **Streaming?** | **Description** | `CsvTableSink` | `flink-table` | Y | Append | A simple sink for CSV files. -| `JDBCAppendTableSink` | `flink-jdbc` | Y | Append | Writes tables to a JDBC database. +| `JDBCAppendTableSink` | `flink-jdbc` | Y | Append | Writes a Table to a JDBC table. +| `CassandraAppendTableSink` | `flink-connector-cassandra` | N | Append | Writes a Table to a Cassandra table. | `Kafka08JsonTableSink` | `flink-connector-kafka-0.8` | N | Append | A Kafka 0.8 sink with JSON encoding. | `Kafka09JsonTableSink` | `flink-connector-kafka-0.9` | N | Append | A Kafka 0.9 sink with JSON encoding. @@ -583,6 +584,47 @@ Similar to using <code>JDBCOutputFormat</code>, you have to explicitly specify t {% top %} +### CassandraAppendTableSink + +The `CassandraAppendTableSink` emits a `Table` to a Cassandra table. The sink only supports append-only streaming tables. It cannot be used to emit a `Table` that is continuously updated. See the [documentation on Table to Stream conversions](./streaming.html#table-to-stream-conversion) for details. + +The `CassandraAppendTableSink` inserts all rows at least once into the Cassandra table if checkpointing is enabled. However, you can specify the query as upsert query. + +To use the `CassandraAppendTableSink`, you have to add the Cassandra connector dependency (<code>flink-connector-cassandra</code>) to your project. The example below shows how to use the `CassandraAppendTableSink`. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} + +ClusterBuilder builder = ... // configure Cassandra cluster connection + +CassandraAppendTableSink sink = new CassandraAppendTableSink( + builder, + // the query must match the schema of the table + INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?)); + +Table table = ... +table.writeToSink(sink); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val builder: ClusterBuilder = ... // configure Cassandra cluster connection + +val sink: CassandraAppendTableSink = new CassandraAppendTableSink( + builder, + // the query must match the schema of the table + INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?)) + +val table: Table = ??? +table.writeToSink(sink) +{% endhighlight %} +</div> +</div> + +{% top %} + Define a TableSource -------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/flink-connectors/flink-connector-cassandra/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/pom.xml b/flink-connectors/flink-connector-cassandra/pom.xml index c97b43f..3c1d3e1 100644 --- a/flink-connectors/flink-connector-cassandra/pom.xml +++ b/flink-connectors/flink-connector-cassandra/pom.xml @@ -197,5 +197,11 @@ under the License. </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java new file mode 100644 index 0000000..395ff9a --- /dev/null +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.sinks.AppendStreamTableSink; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.util.Properties; + +/** + * An {@link AppendStreamTableSink} to write an append stream Table to a Cassandra table. + */ +public class CassandraAppendTableSink implements AppendStreamTableSink<Row> { + + private final ClusterBuilder builder; + private final String cql; + private String[] fieldNames; + private TypeInformation[] fieldTypes; + private final Properties properties; + + public CassandraAppendTableSink(ClusterBuilder builder, String cql) { + this.builder = Preconditions.checkNotNull(builder, "ClusterBuilder must not be null."); + this.cql = Preconditions.checkNotNull(cql, "CQL query must not be null."); + this.properties = new Properties(); + } + + public CassandraAppendTableSink(ClusterBuilder builder, String cql, Properties properties) { + this.builder = Preconditions.checkNotNull(builder, "ClusterBuilder must not be null."); + this.cql = Preconditions.checkNotNull(cql, "CQL query must not be null."); + this.properties = Preconditions.checkNotNull(properties, "Properties must not be null."); + } + + @Override + public TypeInformation<Row> getOutputType() { + return new RowTypeInfo(fieldTypes); + } + + @Override + public String[] getFieldNames() { + return this.fieldNames; + } + + @Override + public TypeInformation<?>[] getFieldTypes() { + return this.fieldTypes; + } + + @Override + public CassandraAppendTableSink configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { + CassandraAppendTableSink cassandraTableSink = new CassandraAppendTableSink(this.builder, this.cql, this.properties); + cassandraTableSink.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names must not be null."); + cassandraTableSink.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field types must not be null."); + Preconditions.checkArgument(fieldNames.length == fieldTypes.length, + "Number of provided field names and types does not match."); + return cassandraTableSink; + } + + @Override + public void emitDataStream(DataStream<Row> dataStream) { + try { + CassandraSink.addSink(dataStream) + .setClusterBuilder(this.builder) + .setQuery(this.cql) + .build(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java new file mode 100644 index 0000000..fbbeb96 --- /dev/null +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowSink.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.flink.types.Row; + +/** + * A SinkFunction to write Row records into a Cassandra table. + */ +public class CassandraRowSink extends AbstractCassandraTupleSink<Row> { + + private final int rowArity; + + public CassandraRowSink(int rowArity, String insertQuery, ClusterBuilder builder) { + super(insertQuery, builder); + this.rowArity = rowArity; + } + + @Override + protected Object[] extract(Row record) { + Object[] al = new Object[rowArity]; + for (int i = 0; i < rowArity; i++) { + al[i] = record.getField(i); + } + return al; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java new file mode 100644 index 0000000..6b3d418 --- /dev/null +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraRowWriteAheadSink.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.api.java.typeutils.runtime.RowSerializer; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink; +import org.apache.flink.types.Row; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; + +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Sink that emits its input elements into a Cassandra table. This sink stores incoming records within a + * {@link org.apache.flink.runtime.state.AbstractStateBackend}, and only commits them to Cassandra + * if a checkpoint is completed. + * + */ +public class CassandraRowWriteAheadSink extends GenericWriteAheadSink<Row> { + private static final long serialVersionUID = 1L; + + protected transient Cluster cluster; + protected transient Session session; + + private final String insertQuery; + private transient PreparedStatement preparedStatement; + + private ClusterBuilder builder; + + private int arity; + private transient Object[] fields; + + protected CassandraRowWriteAheadSink(String insertQuery, TypeSerializer<Row> serializer, ClusterBuilder builder, CheckpointCommitter committer) throws Exception { + super(committer, serializer, UUID.randomUUID().toString().replace("-", "_")); + this.insertQuery = insertQuery; + this.builder = builder; + ClosureCleaner.clean(builder, true); + } + + public void open() throws Exception { + super.open(); + if (!getRuntimeContext().isCheckpointingEnabled()) { + throw new IllegalStateException("The write-ahead log requires checkpointing to be enabled."); + } + cluster = builder.getCluster(); + session = cluster.connect(); + preparedStatement = session.prepare(insertQuery); + + arity = ((RowSerializer) serializer).getArity(); + fields = new Object[arity]; + } + + @Override + public void close() throws Exception { + super.close(); + try { + if (session != null) { + session.close(); + } + } catch (Exception e) { + LOG.error("Error while closing session.", e); + } + try { + if (cluster != null) { + cluster.close(); + } + } catch (Exception e) { + LOG.error("Error while closing cluster.", e); + } + } + + @Override + protected boolean sendValues(Iterable<Row> values, long checkpointId, long timestamp) throws Exception { + final AtomicInteger updatesCount = new AtomicInteger(0); + final AtomicInteger updatesConfirmed = new AtomicInteger(0); + + final AtomicReference<Throwable> exception = new AtomicReference<>(); + + FutureCallback<ResultSet> callback = new FutureCallback<ResultSet>() { + @Override + public void onSuccess(ResultSet resultSet) { + updatesConfirmed.incrementAndGet(); + if (updatesCount.get() > 0) { // only set if all updates have been sent + if (updatesCount.get() == updatesConfirmed.get()) { + synchronized (updatesConfirmed) { + updatesConfirmed.notifyAll(); + } + } + } + } + + @Override + public void onFailure(Throwable throwable) { + if (exception.compareAndSet(null, throwable)) { + LOG.error("Error while sending value.", throwable); + synchronized (updatesConfirmed) { + updatesConfirmed.notifyAll(); + } + } + } + }; + + //set values for prepared statement + int updatesSent = 0; + for (Row value : values) { + for (int x = 0; x < arity; x++) { + fields[x] = value.getField(x); + } + //insert values and send to cassandra + BoundStatement s = preparedStatement.bind(fields); + s.setDefaultTimestamp(timestamp); + ResultSetFuture result = session.executeAsync(s); + updatesSent++; + if (result != null) { + //add callback to detect errors + Futures.addCallback(result, callback); + } + } + updatesCount.set(updatesSent); + + synchronized (updatesConfirmed) { + while (exception.get() == null && updatesSent != updatesConfirmed.get()) { + updatesConfirmed.wait(); + } + } + + if (exception.get() != null) { + LOG.warn("Sending a value failed.", exception.get()); + return false; + } else { + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java index 29c4b21..3543378 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; @@ -32,6 +33,7 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.transformations.SinkTransformation; import org.apache.flink.streaming.api.transformations.StreamTransformation; import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.types.Row; import com.datastax.driver.core.Cluster; @@ -205,12 +207,16 @@ public class CassandraSink<IN> { * @param <IN> input type * @return CassandraSinkBuilder, to further configure the sink */ - public static <IN, T extends Tuple> CassandraSinkBuilder<IN> addSink(DataStream<IN> input) { + public static <IN> CassandraSinkBuilder<IN> addSink(DataStream<IN> input) { TypeInformation<IN> typeInfo = input.getType(); if (typeInfo instanceof TupleTypeInfo) { - DataStream<T> tupleInput = (DataStream<T>) input; + DataStream<Tuple> tupleInput = (DataStream<Tuple>) input; return (CassandraSinkBuilder<IN>) new CassandraTupleSinkBuilder<>(tupleInput, tupleInput.getType(), tupleInput.getType().createSerializer(tupleInput.getExecutionEnvironment().getConfig())); } + if (typeInfo instanceof RowTypeInfo) { + DataStream<Row> rowInput = (DataStream<Row>) input; + return (CassandraSinkBuilder<IN>) new CassandraRowSinkBuilder(rowInput, rowInput.getType(), rowInput.getType().createSerializer(rowInput.getExecutionEnvironment().getConfig())); + } if (typeInfo instanceof PojoTypeInfo) { return new CassandraPojoSinkBuilder<>(input, input.getType(), input.getType().createSerializer(input.getExecutionEnvironment().getConfig())); } @@ -391,6 +397,36 @@ public class CassandraSink<IN> { } /** + * Builder for a {@link CassandraRowSink}. + */ + public static class CassandraRowSinkBuilder extends CassandraSinkBuilder<Row> { + public CassandraRowSinkBuilder(DataStream<Row> input, TypeInformation<Row> typeInfo, TypeSerializer<Row> serializer) { + super(input, typeInfo, serializer); + } + + @Override + protected void sanityCheck() { + super.sanityCheck(); + if (query == null || query.length() == 0) { + throw new IllegalArgumentException("Query must not be null or empty."); + } + } + + @Override + protected CassandraSink<Row> createSink() throws Exception { + return new CassandraSink<>(input.addSink(new CassandraRowSink(typeInfo.getArity(), query, builder)).name("Cassandra Sink")); + + } + + @Override + protected CassandraSink<Row> createWriteAheadSink() throws Exception { + return committer == null + ? new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraRowWriteAheadSink(query, serializer, builder, new CassandraCommitter(builder)))) + : new CassandraSink<>(input.transform("Cassandra Sink", null, new CassandraRowWriteAheadSink(query, serializer, builder, committer))); + } + } + + /** * Builder for a {@link CassandraPojoSink}. * @param <IN> */ http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index f52a42c..f1b598f 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -35,15 +35,17 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkContextUtil; import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase; +import org.apache.flink.table.api.StreamTableEnvironment; +import org.apache.flink.types.Row; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.QueryOptions; import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import org.apache.cassandra.service.CassandraDaemon; import org.junit.AfterClass; @@ -58,6 +60,7 @@ import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -83,11 +86,15 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri private static EmbeddedCassandraService cassandra; + private static final String HOST = "127.0.0.1"; + + private static final int PORT = 9042; + private static ClusterBuilder builder = new ClusterBuilder() { @Override protected Cluster buildCluster(Cluster.Builder builder) { return builder - .addContactPoint("127.0.0.1") + .addContactPointsWithPorts(new InetSocketAddress(HOST, PORT)) .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE).setSerialConsistencyLevel(ConsistencyLevel.LOCAL_SERIAL)) .withoutJMXReporting() .withoutMetrics().build(); @@ -108,10 +115,16 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri private int tableID; private static final ArrayList<Tuple3<String, Integer, Integer>> collection = new ArrayList<>(20); + private static final ArrayList<Row> rowCollection = new ArrayList<>(20); + + private static final String[] FIELD_NAMES = {"id", "counter", "batch_id"}; + private static final TypeInformation[] FIELD_TYPES = { + BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO}; static { for (int i = 0; i < 20; i++) { collection.add(new Tuple3<>(UUID.randomUUID().toString(), i, 0)); + rowCollection.add(Row.of(UUID.randomUUID().toString(), i, 0)); } } @@ -245,7 +258,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri list.add(x); } - for (Row s : result) { + for (com.datastax.driver.core.Row s : result) { list.remove(new Integer(s.getInt("counter"))); } Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty()); @@ -260,7 +273,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri list.add(x); } - for (Row s : result) { + for (com.datastax.driver.core.Row s : result) { list.remove(new Integer(s.getInt("counter"))); } Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty()); @@ -278,7 +291,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri list.add(x); } - for (Row s : result) { + for (com.datastax.driver.core.Row s : result) { list.remove(new Integer(s.getInt("counter"))); } Assert.assertTrue("The following ID's were not found in the ResultSet: " + list.toString(), list.isEmpty()); @@ -300,7 +313,8 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri ArrayList<Integer> actual = new ArrayList<>(); ResultSet result = session.execute(injectTableName(SELECT_DATA_QUERY)); - for (Row s : result) { + + for (com.datastax.driver.core.Row s : result) { actual.add(s.getInt("counter")); } @@ -380,6 +394,22 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri } @Test + public void testCassandraRowAtLeastOnceSink() throws Exception { + CassandraRowSink sink = new CassandraRowSink(FIELD_TYPES.length, injectTableName(INSERT_DATA_QUERY), builder); + + sink.open(new Configuration()); + + for (Row value : rowCollection) { + sink.send(value); + } + + sink.close(); + + ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY)); + Assert.assertEquals(20, rs.all().size()); + } + + @Test public void testCassandraPojoAtLeastOnceSink() throws Exception { session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, "test")); @@ -398,6 +428,35 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri } @Test + public void testCassandraTableSink() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(4); + StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env); + + DataStreamSource<Row> source = env.fromCollection(rowCollection); + + tEnv.registerDataStreamInternal("testFlinkTable", source); + + tEnv.sql("select * from testFlinkTable").writeToSink( + new CassandraAppendTableSink(builder, injectTableName(INSERT_DATA_QUERY))); + + env.execute(); + ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY)); + + // validate that all input was correctly written to Cassandra + List<Row> input = new ArrayList<>(rowCollection); + List<com.datastax.driver.core.Row> output = rs.all(); + for (com.datastax.driver.core.Row o : output) { + Row cmp = new Row(3); + cmp.setField(0, o.getString(0)); + cmp.setField(1, o.getInt(2)); + cmp.setField(2, o.getInt(1)); + Assert.assertTrue("Row " + cmp + " was written to Cassandra but not in input.", input.remove(cmp)); + } + Assert.assertTrue("The input data was not completely written to Cassandra", input.isEmpty()); + } + + @Test public void testCassandraBatchFormats() throws Exception { OutputFormat<Tuple3<String, Integer, Integer>> sink = new CassandraOutputFormat<>(injectTableName(INSERT_DATA_QUERY), builder); sink.configure(new Configuration()); @@ -465,10 +524,10 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri sink.close(); ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY)); - List<Row> rows = rs.all(); + List<com.datastax.driver.core.Row> rows = rs.all(); Assert.assertEquals(scalaTupleCollection.size(), rows.size()); - for (Row row : rows) { + for (com.datastax.driver.core.Row row : rows) { scalaTupleCollection.remove(new scala.Tuple3<>(row.getString("id"), row.getInt("counter"), row.getInt("batch_id"))); } Assert.assertEquals(0, scalaTupleCollection.size()); http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java index bd08b04..7f9cc21 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java @@ -50,11 +50,14 @@ public final class RowSerializer extends TypeSerializer<Row> { private final TypeSerializer<Object>[] fieldSerializers; + private final int arity; + private transient boolean[] nullMask; @SuppressWarnings("unchecked") public RowSerializer(TypeSerializer<?>[] fieldSerializers) { this.fieldSerializers = (TypeSerializer<Object>[]) checkNotNull(fieldSerializers); + this.arity = fieldSerializers.length; this.nullMask = new boolean[fieldSerializers.length]; } @@ -135,6 +138,10 @@ public final class RowSerializer extends TypeSerializer<Row> { return -1; } + public int getArity() { + return arity; + } + @Override public void serialize(Row record, DataOutputView target) throws IOException { int len = fieldSerializers.length; http://git-wip-us.apache.org/repos/asf/flink/blob/1809cad6/tools/maven/suppressions.xml ---------------------------------------------------------------------- diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml index d897137..5d5c455 100644 --- a/tools/maven/suppressions.xml +++ b/tools/maven/suppressions.xml @@ -30,7 +30,7 @@ under the License. <!-- Cassandra connectors have to use guava directly --> <suppress - files="AbstractCassandraTupleSink.java|CassandraInputFormat.java|CassandraOutputFormat.java|CassandraSinkBase.java|CassandraPojoSink.java|CassandraTupleWriteAheadSink.java" + files="AbstractCassandraTupleSink.java|CassandraInputFormat.java|CassandraOutputFormat.java|CassandraSinkBase.java|CassandraPojoSink.java|CassandraRowSink.java|CassandraTupleWriteAheadSink.java|CassandraRowWriteAheadSink.java" checks="IllegalImport"/> <!-- Kinesis producer has to use guava directly --> <suppress
