Repository: flink Updated Branches: refs/heads/master 7d026aa72 -> eaa5a4668
[FLINK-6314] [cassandra] Support user-defined Mapper options This closes #4831. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eaa5a466 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eaa5a466 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eaa5a466 Branch: refs/heads/master Commit: eaa5a4668d0b64bf67111e50765cc4dbf739f0e9 Parents: 2b7d6d5 Author: zentol <ches...@apache.org> Authored: Wed Apr 12 11:25:41 2017 +0200 Committer: zentol <ches...@apache.org> Committed: Wed Oct 18 12:51:00 2017 +0200 ---------------------------------------------------------------------- docs/dev/connectors/cassandra.md | 8 +++-- .../connectors/cassandra/CassandraPojoSink.java | 14 ++++++++ .../connectors/cassandra/CassandraSink.java | 18 +++++++++- .../connectors/cassandra/MapperOptions.java | 36 ++++++++++++++++++++ .../example/CassandraPojoSinkExample.java | 2 ++ 5 files changed, 75 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/eaa5a466/docs/dev/connectors/cassandra.md ---------------------------------------------------------------------- diff --git a/docs/dev/connectors/cassandra.md b/docs/dev/connectors/cassandra.md index cfb31b3..5a104ac 100644 --- a/docs/dev/connectors/cassandra.md +++ b/docs/dev/connectors/cassandra.md @@ -69,10 +69,13 @@ The following configuration methods can be used: * Sets the cluster builder that is used to configure the connection to cassandra with more sophisticated settings such as consistency level, retry policy and etc. 3. _setHost(String host[, int port])_ * Simple version of setClusterBuilder() with host/port information to connect to Cassandra instances -4. _enableWriteAheadLog([CheckpointCommitter committer])_ +4. _setMapperOptions(MapperOptions options)_ + * Sets the mapper options that are used to configure the DataStax ObjectMapper. + * Only applies when processing __POJO__ data types. +5. _enableWriteAheadLog([CheckpointCommitter committer])_ * An __optional__ setting * Allows exactly-once processing for non-deterministic algorithms. -5. _build()_ +6. _build()_ * Finalizes the configuration and constructs the CassandraSink instance. ### Write-ahead Log @@ -236,6 +239,7 @@ DataStream<WordCount> result = text CassandraSink.addSink(result) .setHost("127.0.0.1") + .setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)}) .build(); http://git-wip-us.apache.org/repos/asf/flink/blob/eaa5a466/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java index c9b29b8..e060ce0 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java @@ -24,6 +24,8 @@ import com.datastax.driver.mapping.Mapper; import com.datastax.driver.mapping.MappingManager; import com.google.common.util.concurrent.ListenableFuture; +import javax.annotation.Nullable; + /** * Flink Sink to save data into a Cassandra cluster using * <a href="http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/mapping/Mapper.html">Mapper</a>, @@ -38,6 +40,7 @@ public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, ResultSet> { private static final long serialVersionUID = 1L; protected final Class<IN> clazz; + private final MapperOptions options; protected transient Mapper<IN> mapper; protected transient MappingManager mappingManager; @@ -47,8 +50,13 @@ public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, ResultSet> { * @param clazz Class instance */ public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder) { + this(clazz, builder, null); + } + + public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder, @Nullable MapperOptions options) { super(builder); this.clazz = clazz; + this.options = options; } @Override @@ -57,6 +65,12 @@ public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, ResultSet> { try { this.mappingManager = new MappingManager(session); this.mapper = mappingManager.mapper(clazz); + if (options != null) { + Mapper.Option[] optionsArray = options.getMapperOptions(); + if (optionsArray != null) { + this.mapper.setDefaultSaveOptions(optionsArray); + } + } } catch (Exception e) { throw new RuntimeException("Cannot create CassandraPojoSink with input: " + clazz.getSimpleName(), e); } http://git-wip-us.apache.org/repos/asf/flink/blob/eaa5a466/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java index f902d56..29c4b21 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java @@ -230,6 +230,7 @@ public class CassandraSink<IN> { protected final TypeSerializer<IN> serializer; protected final TypeInformation<IN> typeInfo; protected ClusterBuilder builder; + protected MapperOptions mapperOptions; protected String query; protected CheckpointCommitter committer; protected boolean isWriteAheadLogEnabled; @@ -321,6 +322,21 @@ public class CassandraSink<IN> { } /** + * Sets the mapper options for this sink. The mapper options are used to configure the DataStax + * {@link com.datastax.driver.mapping.Mapper} when writing POJOs. + * + * <p>This call has no effect if the input {@link DataStream} for this sink does not contain POJOs. + * + * @param options MapperOptions, that return an array of options that are used to configure the DataStax mapper. + * + * @return this builder + */ + public CassandraSinkBuilder<IN> setMapperOptions(MapperOptions options) { + this.mapperOptions = options; + return this; + } + + /** * Finalizes the configuration of this sink. * * @return finalized sink @@ -393,7 +409,7 @@ public class CassandraSink<IN> { @Override public CassandraSink<IN> createSink() throws Exception { - return new CassandraSink<>(input.addSink(new CassandraPojoSink<>(typeInfo.getTypeClass(), builder)).name("Cassandra Sink")); + return new CassandraSink<>(input.addSink(new CassandraPojoSink<>(typeInfo.getTypeClass(), builder, mapperOptions)).name("Cassandra Sink")); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/eaa5a466/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/MapperOptions.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/MapperOptions.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/MapperOptions.java new file mode 100644 index 0000000..7c2e1be --- /dev/null +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/MapperOptions.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.mapping.Mapper; + +import java.io.Serializable; + +/** + * This class is used to configure a {@link com.datastax.driver.mapping.Mapper} after deployment. + */ +public interface MapperOptions extends Serializable { + + /** + * Returns an array of {@link com.datastax.driver.mapping.Mapper.Option} that are used configure the mapper. + * + * @return array of options used to configure the mapper. + */ + Mapper.Option[] getMapperOptions(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/eaa5a466/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java index 01cd6e8..89e2d9e 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java @@ -24,6 +24,7 @@ import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.Cluster.Builder; +import com.datastax.driver.mapping.Mapper; import java.util.ArrayList; @@ -57,6 +58,7 @@ public class CassandraPojoSinkExample { return builder.addContactPoint("127.0.0.1").build(); } }) + .setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)}) .build(); env.execute("Cassandra Sink example");