Repository: flink
Updated Branches:
  refs/heads/master 7d026aa72 -> eaa5a4668


[FLINK-6314] [cassandra] Support user-defined Mapper options

This closes #4831.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eaa5a466
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eaa5a466
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eaa5a466

Branch: refs/heads/master
Commit: eaa5a4668d0b64bf67111e50765cc4dbf739f0e9
Parents: 2b7d6d5
Author: zentol <ches...@apache.org>
Authored: Wed Apr 12 11:25:41 2017 +0200
Committer: zentol <ches...@apache.org>
Committed: Wed Oct 18 12:51:00 2017 +0200

----------------------------------------------------------------------
 docs/dev/connectors/cassandra.md                |  8 +++--
 .../connectors/cassandra/CassandraPojoSink.java | 14 ++++++++
 .../connectors/cassandra/CassandraSink.java     | 18 +++++++++-
 .../connectors/cassandra/MapperOptions.java     | 36 ++++++++++++++++++++
 .../example/CassandraPojoSinkExample.java       |  2 ++
 5 files changed, 75 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/eaa5a466/docs/dev/connectors/cassandra.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/cassandra.md b/docs/dev/connectors/cassandra.md
index cfb31b3..5a104ac 100644
--- a/docs/dev/connectors/cassandra.md
+++ b/docs/dev/connectors/cassandra.md
@@ -69,10 +69,13 @@ The following configuration methods can be used:
     * Sets the cluster builder that is used to configure the connection to 
cassandra with more sophisticated settings such as consistency level, retry 
policy and etc.
 3. _setHost(String host[, int port])_
     * Simple version of setClusterBuilder() with host/port information to 
connect to Cassandra instances
-4. _enableWriteAheadLog([CheckpointCommitter committer])_
+4. _setMapperOptions(MapperOptions options)_
+    * Sets the mapper options that are used to configure the DataStax 
ObjectMapper.
+    * Only applies when processing __POJO__ data types.
+5. _enableWriteAheadLog([CheckpointCommitter committer])_
     * An __optional__ setting
     * Allows exactly-once processing for non-deterministic algorithms.
-5. _build()_
+6. _build()_
     * Finalizes the configuration and constructs the CassandraSink instance.
 
 ### Write-ahead Log
@@ -236,6 +239,7 @@ DataStream<WordCount> result = text
 
 CassandraSink.addSink(result)
         .setHost("127.0.0.1")
+        .setMapperOptions(() -> new 
Mapper.Option[]{Mapper.Option.saveNullFields(true)})
         .build();
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eaa5a466/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
index c9b29b8..e060ce0 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
@@ -24,6 +24,8 @@ import com.datastax.driver.mapping.Mapper;
 import com.datastax.driver.mapping.MappingManager;
 import com.google.common.util.concurrent.ListenableFuture;
 
+import javax.annotation.Nullable;
+
 /**
  * Flink Sink to save data into a Cassandra cluster using
  * <a 
href="http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/mapping/Mapper.html";>Mapper</a>,
@@ -38,6 +40,7 @@ public class CassandraPojoSink<IN> extends 
CassandraSinkBase<IN, ResultSet> {
        private static final long serialVersionUID = 1L;
 
        protected final Class<IN> clazz;
+       private final MapperOptions options;
        protected transient Mapper<IN> mapper;
        protected transient MappingManager mappingManager;
 
@@ -47,8 +50,13 @@ public class CassandraPojoSink<IN> extends 
CassandraSinkBase<IN, ResultSet> {
         * @param clazz Class instance
         */
        public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder) {
+               this(clazz, builder, null);
+       }
+
+       public CassandraPojoSink(Class<IN> clazz, ClusterBuilder builder, 
@Nullable MapperOptions options) {
                super(builder);
                this.clazz = clazz;
+               this.options = options;
        }
 
        @Override
@@ -57,6 +65,12 @@ public class CassandraPojoSink<IN> extends 
CassandraSinkBase<IN, ResultSet> {
                try {
                        this.mappingManager = new MappingManager(session);
                        this.mapper = mappingManager.mapper(clazz);
+                       if (options != null) {
+                               Mapper.Option[] optionsArray = 
options.getMapperOptions();
+                               if (optionsArray != null) {
+                                       
this.mapper.setDefaultSaveOptions(optionsArray);
+                               }
+                       }
                } catch (Exception e) {
                        throw new RuntimeException("Cannot create 
CassandraPojoSink with input: " + clazz.getSimpleName(), e);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/eaa5a466/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
index f902d56..29c4b21 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
@@ -230,6 +230,7 @@ public class CassandraSink<IN> {
                protected final TypeSerializer<IN> serializer;
                protected final TypeInformation<IN> typeInfo;
                protected ClusterBuilder builder;
+               protected MapperOptions mapperOptions;
                protected String query;
                protected CheckpointCommitter committer;
                protected boolean isWriteAheadLogEnabled;
@@ -321,6 +322,21 @@ public class CassandraSink<IN> {
                }
 
                /**
+                * Sets the mapper options for this sink. The mapper options 
are used to configure the DataStax
+                * {@link com.datastax.driver.mapping.Mapper} when writing 
POJOs.
+                *
+                * <p>This call has no effect if the input {@link DataStream} 
for this sink does not contain POJOs.
+                *
+                * @param options MapperOptions, that return an array of 
options that are used to configure the DataStax mapper.
+                *
+                * @return this builder
+                */
+               public CassandraSinkBuilder<IN> setMapperOptions(MapperOptions 
options) {
+                       this.mapperOptions = options;
+                       return this;
+               }
+
+               /**
                 * Finalizes the configuration of this sink.
                 *
                 * @return finalized sink
@@ -393,7 +409,7 @@ public class CassandraSink<IN> {
 
                @Override
                public CassandraSink<IN> createSink() throws Exception {
-                       return new CassandraSink<>(input.addSink(new 
CassandraPojoSink<>(typeInfo.getTypeClass(), builder)).name("Cassandra Sink"));
+                       return new CassandraSink<>(input.addSink(new 
CassandraPojoSink<>(typeInfo.getTypeClass(), builder, 
mapperOptions)).name("Cassandra Sink"));
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/eaa5a466/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/MapperOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/MapperOptions.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/MapperOptions.java
new file mode 100644
index 0000000..7c2e1be
--- /dev/null
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/MapperOptions.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.mapping.Mapper;
+
+import java.io.Serializable;
+
+/**
+ * This class is used to configure a {@link 
com.datastax.driver.mapping.Mapper} after deployment.
+ */
+public interface MapperOptions extends Serializable {
+
+       /**
+        * Returns an array of {@link 
com.datastax.driver.mapping.Mapper.Option} that are used configure the mapper.
+        *
+        * @return array of options used to configure the mapper.
+        */
+       Mapper.Option[] getMapperOptions();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/eaa5a466/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java
index 01cd6e8..89e2d9e 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraPojoSinkExample.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
 
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.Cluster.Builder;
+import com.datastax.driver.mapping.Mapper;
 
 import java.util.ArrayList;
 
@@ -57,6 +58,7 @@ public class CassandraPojoSinkExample {
                                        return 
builder.addContactPoint("127.0.0.1").build();
                                }
                        })
+                       .setMapperOptions(() -> new 
Mapper.Option[]{Mapper.Option.saveNullFields(true)})
                        .build();
 
                env.execute("Cassandra Sink example");

Reply via email to