[
https://issues.apache.org/jira/browse/FLINK-3967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15299618#comment-15299618
]
ASF GitHub Bot commented on FLINK-3967:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/2031#discussion_r64527750
--- Diff:
flink-streaming-connectors/flink-connector-rethinkdb/src/main/java/org/apache/flink/streaming/connectors/rethinkdb/FlinkRethinkDbSink.java
---
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.rethinkdb;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Objects;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.rethinkdb.RethinkDB;
+import com.rethinkdb.gen.ast.Insert;
+import com.rethinkdb.gen.ast.Table;
+import com.rethinkdb.net.Connection;
+
+/**
+ * This class is the Flink sink for RethinkDB which is a tcp/JSON protocol
based document
+ * oriented NoSQL database.
+ *
+ * <p/>
+ * This sink provides two constuctors:
+ * <p/>
+ * {@link #FlinkRethinkDbSink(String hostname, int hostport, String
database, String table, JSONSerializationSchema schema)}, and
+ * <p/>
+ * {@link #FlinkRethinkDbSink(String hostname, int hostport, String
database, String table, JSONSerializationSchema schema, ConflictStrategy
conflictStrategy)}
+ * <p/>
+ *
+ * The parameter for the constructor are as follows:
+ * <p/>
+ * <ul>
+ * <li>hostname - the rethinkdb hostname</li>
+ * <li>hosport - the rethinkdb port for the driver to connect</li>
+ * <li>database - the rethinkdb database name to which the table
belongs</li>
+ * <li>table - the rethinkdb table name where documents are inserted</li>
+ * <li>schema - the schema tranfromer that converts input to JSONObject,
or JSONArray</li>
+ * <li>conflictStrategy - the conflict resolution strategy in case
inserted document has id which exists in the db</li>
+ * </ul>
+ * <p/>
+ *
+ * The user can also set:
+ * <p/>
+ * <ul>
+ * <li>username - default is admin</li>
+ * <li>password - default is blank</li>
+ * </ul>
+ * <p/> with the {@link #setUsernameAndPassword(String, String)} method.
+ * <p/>
+ * <b>NOTE: If multiple documents are getting inserted (eg: using
JSONArray), the sink
+ * checks if there is an error entry in the result HashMap and throws a
runtime exception if errors
+ * counts is not zero. The exception message contains the results
HashMap.
+ * In case of multiple errors only the first error is noted in the result
HashMap.
+ * </b>
+ *
+ * @see {@link ConflictStrategy} for conflict resolution strategies
+ *
+ * @param <OUT> a value that can be transformed into a {@link
org.json.simple.JSONArray;} or {@link org.json.simple.JSONObject}
+ */
+public class FlinkRethinkDbSink<OUT> extends RichSinkFunction<OUT>
implements Serializable{
+
+ /**
+ * Serial version for the class
+ */
+ private static final long serialVersionUID = -2135499016796158755L;
+
+ /**
+ * Logger for the class
+ */
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkRethinkDbSink.class);
+
+ /**
+ * Conflict resolution option key in case document ids are same
+ */
+ public static final String CONFLICT_OPT = "conflict";
+
+ /**
+ * Result key indicating number of errors
+ */
+ public static final String RESULT_ERROR_KEY = "errors";
+
+ /**
+ * Serialization schema for the sink
+ */
+ private JSONSerializationSchema<OUT> serializationSchema;
+
+ /**
+ * RethinkDB connection object
+ */
+ private transient Connection rethinkDbConnection;
+
+ /**
+ * RethinkDB hostname
+ */
+ private String hostname;
+
+ /**
+ * RethinkDB port
+ */
+ private int hostport;
+
+ /**
+ * RethinkDB tablename where documents are inserted
+ */
+ private String tableName;
+
+ /**
+ * RethinkDB database where document are inserted
+ */
+ private String databaseName;
+
+ /**
+ * Conflict resolution strategy
+ */
+ private ConflictStrategy conflict;
+
+ /**
+ * Default user name
+ */
+ public static final String DEFAULT_USER_NAME = "admin";
+
+ /**
+ * User name
+ */
+ private String username = DEFAULT_USER_NAME;
+
+ /**
+ * Default user name
+ */
+ public static final String DEFAULT_PASSWORD = "";
+
+ /**
+ * password
+ */
+ private String password = DEFAULT_PASSWORD;
+
+
+ /**
+ * Constructor for RethinkDB sink
+ * @param hostname
+ * @param hostport
+ * @param database
+ * @param table
+ * @param schema
+ */
+ public FlinkRethinkDbSink(String hostname, int hostport, String
database, String table,
+ JSONSerializationSchema<OUT> schema) {
+ this(hostname, hostport, database, table, schema,
ConflictStrategy.update);
+ }
+
+ /**
+ * Constructor for sink
+ * @param hostname
+ * @param hostport
+ * @param database name
+ * @param table name
+ * @param schema serialization converter
+ * @param conflict resolution strategy for document id conflict
+ */
+ public FlinkRethinkDbSink(String hostname, int hostport, String
database, String table,
+ JSONSerializationSchema<OUT> schema,
+ ConflictStrategy conflict) {
+ this.hostname = Objects.requireNonNull(hostname);
+ this.hostport = hostport;
+ this.databaseName = Objects.requireNonNull(database);
+ this.tableName = Objects.requireNonNull(table);
+ this.serializationSchema = Objects.requireNonNull(schema);
+ this.conflict = conflict;
+ }
+
+ /**
+ * Open the sink
+ */
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ LOG.info("Received parameters : {}", parameters);
+
+ super.open(parameters);
+
+ rethinkDbConnection =
getRethinkDB().connection().hostname(hostname)
+ .port(hostport).user(username,
password).connect();
+
+ LOG.info("RethinkDb connection created for host {} port {} and
db {}",
+ hostname, hostport,databaseName);
+ }
+
+ /**
+ * Helper method to help testability
+ * @return RethinkDB instance
+ */
+ protected RethinkDB getRethinkDB() {
+ return RethinkDB.r;
+ }
+
+ /**
+ * Set username and password. If username and password are not provided,
+ * then default username (admin) and blank password are used.
+ *
+ * @param username
+ * @param password
+ *
+ * @throws IllegalArgumentException if arguments is null or empty
+ */
+ public void setUsernameAndPassword(String username, String password) {
+
+ if ( StringUtils.isBlank(username) ) {
+ throw new IllegalArgumentException("username " +
username + " cannot be null or empty" );
+ } else {
+ this.username = username;
+ }
+
+ if ( StringUtils.isBlank(password) ) {
+ throw new IllegalArgumentException("password " +
password + " cannot be null or empty" );
+ } else {
+ this.password = password;
+ }
+ }
+
+ /**
+ * Invoke the sink with the input
+ *
+ * @param the value to be inserted
+ *
+ * @throws RuntimeException if there are errors while inserting row
into rethinkdb
+ */
+ @Override
+ public void invoke(OUT value) throws Exception {
+ LOG.debug("Received value {}", value);
+
+ Object json = serializationSchema.toJSON(value);
+ LOG.debug("Object/Json: {}/{}", value, json);
+ Insert insert = getRdbTable().insert(json).optArg(CONFLICT_OPT,
conflict.toString());
+ HashMap<String,Object> result = runInsert(insert);
+
+ LOG.debug("Object/Json/Result: {}/{}/{}", value, json, result);
+
+ if ( (Long)result.get(RESULT_ERROR_KEY) != 0 ) {
--- End diff --
this is a synchronous operation, correct? If so I'd be curious about a
benchmark for this sink.
> Provide RethinkDB Sink for Flink
> --------------------------------
>
> Key: FLINK-3967
> URL: https://issues.apache.org/jira/browse/FLINK-3967
> Project: Flink
> Issue Type: New Feature
> Components: Streaming, Streaming Connectors
> Affects Versions: 1.0.3
> Environment: All
> Reporter: Mans Singh
> Assignee: Mans Singh
> Priority: Minor
> Labels: features
> Fix For: 1.1.0
>
> Original Estimate: 48h
> Remaining Estimate: 48h
>
> Provide Sink to stream data from flink to rethink db.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)