[FLINK-3034] Redis Sink Connector

This closes #1813


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3ab9e36c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3ab9e36c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3ab9e36c

Branch: refs/heads/master
Commit: 3ab9e36c76395a9e900ca6b87746a013ee57da32
Parents: cb78245
Author: subhankar <subhankar.bis...@target.com>
Authored: Thu Mar 17 17:40:07 2016 +0530
Committer: Robert Metzger <rmetz...@apache.org>
Committed: Thu Jul 7 09:58:17 2016 +0200

----------------------------------------------------------------------
 docs/apis/streaming/connectors/index.md         |   1 +
 docs/apis/streaming/connectors/redis.md         | 177 +++++++++++++
 docs/apis/streaming/fault_tolerance.md          |   5 +
 .../flink-connector-redis/pom.xml               |  79 ++++++
 .../streaming/connectors/redis/RedisSink.java   | 182 +++++++++++++
 .../common/config/FlinkJedisClusterConfig.java  | 187 +++++++++++++
 .../common/config/FlinkJedisConfigBase.java     |  90 +++++++
 .../common/config/FlinkJedisPoolConfig.java     | 224 ++++++++++++++++
 .../common/config/FlinkJedisSentinelConfig.java | 259 +++++++++++++++++++
 .../common/container/RedisClusterContainer.java | 161 ++++++++++++
 .../container/RedisCommandsContainer.java       | 108 ++++++++
 .../RedisCommandsContainerBuilder.java          | 116 +++++++++
 .../redis/common/container/RedisContainer.java  | 243 +++++++++++++++++
 .../redis/common/mapper/RedisCommand.java       |  86 ++++++
 .../common/mapper/RedisCommandDescription.java  |  94 +++++++
 .../redis/common/mapper/RedisDataType.java      |  66 +++++
 .../redis/common/mapper/RedisMapper.java        |  66 +++++
 .../connectors/redis/RedisITCaseBase.java       |  45 ++++
 .../redis/RedisSentinelClusterTest.java         | 100 +++++++
 .../connectors/redis/RedisSinkITCase.java       | 235 +++++++++++++++++
 .../redis/RedisSinkPublishITCase.java           | 137 ++++++++++
 .../connectors/redis/RedisSinkTest.java         |  65 +++++
 .../common/config/FlinkJedisConfigBaseTest.java |  50 ++++
 .../common/config/JedisClusterConfigTest.java   |  49 ++++
 .../common/config/JedisPoolConfigTest.java      |  29 +++
 .../common/config/JedisSentinelConfigTest.java  |  49 ++++
 .../mapper/RedisDataTypeDescriptionTest.java    |  41 +++
 flink-streaming-connectors/pom.xml              |   1 +
 28 files changed, 2945 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/docs/apis/streaming/connectors/index.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/index.md 
b/docs/apis/streaming/connectors/index.md
index 87182b7..83ca514 100644
--- a/docs/apis/streaming/connectors/index.md
+++ b/docs/apis/streaming/connectors/index.md
@@ -39,6 +39,7 @@ Currently these systems are supported:
  * [Twitter Streaming API](https://dev.twitter.com/docs/streaming-apis) 
(source)
  * [Apache NiFi](https://nifi.apache.org) (sink/source)
  * [Apache Cassandra](https://cassandra.apache.org/) (sink)
+ * [Redis](http://redis.io/) (sink)
 
 To run an application using one of these connectors, additional third party
 components are usually required to be installed and launched, e.g. the servers

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/docs/apis/streaming/connectors/redis.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/redis.md 
b/docs/apis/streaming/connectors/redis.md
new file mode 100644
index 0000000..dfa5296
--- /dev/null
+++ b/docs/apis/streaming/connectors/redis.md
@@ -0,0 +1,177 @@
+---
+title: "Redis Connector"
+
+# Sub-level navigation
+sub-nav-group: streaming
+sub-nav-parent: connectors
+sub-nav-pos: 6
+sub-nav-title: Redis
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This connector provides a Sink that can write to
+[Redis](http://redis.io/) and also can publish data to [Redis 
PubSub](http://redis.io/topics/pubsub). To use this connector, add the
+following dependency to your project:
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-redis{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+Version Compatibility: This module is compatible with Redis 2.8.5.
+
+Note that the streaming connectors are currently not part of the binary 
distribution. You need to link them for cluster execution 
[explicitly]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
+
+#### Installing Redis
+Follow the instructions from the [Redis download 
page](http://redis.io/download).
+
+#### Redis Sink
+A class providing an interface for sending data to Redis. 
+The sink can use three different methods for communicating with different type 
of Redis environments:
+1. Single Redis Server
+2. Redis Cluster
+3. Redis Sentinel
+
+This code shows how to create a sink that communicate to a single redis server:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public static class RedisExampleMapper implements RedisMapper<Tuple2<String, 
String>>{
+
+    @Override
+    public RedisCommandDescription getCommandDescription() {
+        return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME");
+    }
+
+    @Override
+    public String getKeyFromData(Tuple2<String, String> data) {
+        return data.f0;
+    }
+
+    @Override
+    public String getValueFromData(Tuple2<String, String> data) {
+        return data.f1;
+    }
+}
+FlinkJedisPoolConfig conf = new 
FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();
+
+DataStream<String> stream = ...;
+stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new 
RedisExampleMapper());
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class RedisExampleMapper extends RedisMapper[(String, String)]{
+  override def getCommandDescription: RedisCommandDescription = {
+    new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME")
+  }
+
+  override def getKeyFromData(data: (String, String)): String = data._1
+
+  override def getValueFromData(data: (String, String)): String = data._2
+}
+val conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build()
+stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))
+{% endhighlight %}
+</div>
+</div>
+
+This example code does the same, but for Redis Cluster:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
+    .setNodes(new HashSet<InetSocketAddress>(Arrays.asList(new 
InetSocketAddress(5601)))).build();
+
+DataStream<String> stream = ...;
+stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new 
RedisExampleMapper());
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val conf = new FlinkJedisPoolConfig.Builder().setNodes(...).build()
+stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))
+{% endhighlight %}
+</div>
+</div>
+
+This example shows when the Redis environment is with Sentinels:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+FlinkJedisSentinelConfig conf = new FlinkJedisSentinelConfig.Builder()
+    .setMasterName("master").setSentinels(...).build();
+
+DataStream<String> stream = ...;
+stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new 
RedisExampleMapper());
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val conf = new 
FlinkJedisSentinelConfig.Builder().setMasterName("master").setSentinels(...).build()
+stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))
+{% endhighlight %}
+</div>
+</div>
+
+This section gives a description of all the available data types and what 
Redis command used for that.
+
+<table class="table table-bordered" style="width: 75%">
+    <thead>
+        <tr>
+          <th class="text-center" style="width: 20%">Data Type</th>
+          <th class="text-center" style="width: 25%">Redis Command [Sink]</th>
+          <th class="text-center" style="width: 25%">Redis Command 
[Source]</th>
+        </tr>
+      </thead>
+      <tbody>
+        <tr>
+            <td>HASH</td><td><a 
href="http://redis.io/commands/hset";>HSET</a></td><td>--NA--</td>
+        </tr>
+        <tr>
+            <td>LIST</td><td>
+                <a href="http://redis.io/commands/rpush";>RPUSH</a>, 
+                <a href="http://redis.io/commands/lpush";>LPUSH</a>
+            </td><td>--NA--</td>
+        </tr>
+        <tr>
+            <td>SET</td><td><a 
href="http://redis.io/commands/rpush";>SADD</a></td><td>--NA--</td>
+        </tr>
+        <tr>
+            <td>PUBSUB</td><td><a 
href="http://redis.io/commands/publish";>PUBLISH</a></td><td>--NA--</td>
+        </tr>
+        <tr>
+            <td>STRING</td><td><a 
href="http://redis.io/commands/set";>SET</a></td><td>--NA--</td>
+        </tr>
+        <tr>
+            <td>HYPER_LOG_LOG</td><td><a 
href="http://redis.io/commands/pfadd";>PFADD</a></td><td>--NA--</td>
+        </tr>
+        <tr>
+            <td>SORTED_SET</td><td><a 
href="http://redis.io/commands/zadd";>ZADD</a></td><td>--NA--</td>
+        </tr>                
+      </tbody>
+</table>
+More about Redis can be found [here](http://redis.io/).

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/docs/apis/streaming/fault_tolerance.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/fault_tolerance.md 
b/docs/apis/streaming/fault_tolerance.md
index 0e98632..89c2557 100644
--- a/docs/apis/streaming/fault_tolerance.md
+++ b/docs/apis/streaming/fault_tolerance.md
@@ -206,6 +206,11 @@ state updates) of Flink coupled with bundled sinks:
         <td>at least once</td>
         <td></td>
     </tr>
+    <tr>
+        <td>Redis sink</td>
+        <td>at least once</td>
+        <td></td>
+    </tr>
   </tbody>
 </table>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-redis/pom.xml 
b/flink-streaming-connectors/flink-connector-redis/pom.xml
new file mode 100644
index 0000000..6c8f609
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-redis/pom.xml
@@ -0,0 +1,79 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-streaming-connectors</artifactId>
+               <version>1.1-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-connector-redis_2.10</artifactId>
+       <name>flink-connector-redis</name>
+
+       <packaging>jar</packaging>
+
+       <properties>
+               <jedis.version>2.8.0</jedis.version>
+       </properties>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>redis.clients</groupId>
+                       <artifactId>jedis</artifactId>
+                       <version>${jedis.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>com.github.kstyrc</groupId>
+                       <artifactId>embedded-redis</artifactId>
+                       <version>0.6</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+    
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 
b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
new file mode 100644
index 0000000..43518e8
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ * <p> The sink takes two arguments {@link FlinkJedisConfigBase} and {@link 
RedisMapper}.
+ * <p> When {@link FlinkJedisPoolConfig} is passed as the first argument,
+ * the sink will create connection using {@link 
redis.clients.jedis.JedisPool}. Please use this when
+ * you want to connect to a single Redis server.
+ * <p> When {@link FlinkJedisSentinelConfig} is passed as the first argument, 
the sink will create connection
+ * using {@link redis.clients.jedis.JedisSentinelPool}. Please use this when 
you want to connect to Sentinel.
+ * <p> Please use {@link FlinkJedisClusterConfig} as the first argument if you 
want to connect to
+ * a Redis Cluster.
+ *
+ * <p>Example:
+ *
+ * <pre>
+ *{@code
+ *public static class RedisExampleMapper implements RedisMapper<Tuple2<String, 
String>> {
+ *
+ *     private RedisCommand redisCommand;
+ *
+ *     public RedisExampleMapper(RedisCommand redisCommand){
+ *             this.redisCommand = redisCommand;
+ *     }
+ *     public RedisCommandDescription getCommandDescription() {
+ *             return new RedisCommandDescription(redisCommand, 
REDIS_ADDITIONAL_KEY);
+ *     }
+ *     public String getKeyFromData(Tuple2<String, String> data) {
+ *             return data.f0;
+ *     }
+ *     public String getValueFromData(Tuple2<String, String> data) {
+ *             return data.f1;
+ *     }
+ *}
+ *JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
+ *    .setHost(REDIS_HOST).setPort(REDIS_PORT).build();
+ *new RedisSink<String>(jedisPoolConfig, new 
RedisExampleMapper(RedisCommand.LPUSH));
+ *}</pre>
+ *
+ * @param <IN> Type of the elements emitted by this sink
+ */
+public class RedisSink<IN> extends RichSinkFunction<IN> {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(RedisSink.class);
+
+       /**
+        * This additional key needed for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET}.
+        * Other {@link RedisDataType} works only with two variable i.e. name 
of the list and value to be added.
+        * But for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET} we need three variables.
+        * <p>For {@link RedisDataType#HASH} we need hash name, hash key and 
element.
+        * {@code additionalKey} used as hash name for {@link 
RedisDataType#HASH}
+        * <p>For {@link RedisDataType#SORTED_SET} we need set name, the 
element and it's score.
+        * {@code additionalKey} used as set name for {@link 
RedisDataType#SORTED_SET}
+        */
+       private String additionalKey;
+       private RedisMapper<IN> redisSinkMapper;
+       private RedisCommand redisCommand;
+
+       private FlinkJedisConfigBase flinkJedisConfigBase;
+       private RedisCommandsContainer redisCommandsContainer;
+
+       /**
+        * Creates a new {@link RedisSink} that connects to the Redis server.
+        *
+        * @param flinkJedisConfigBase The configuration of {@link 
FlinkJedisConfigBase}
+        * @param redisSinkMapper This is used to generate Redis command and 
key value from incoming elements.
+        */
+       public RedisSink(FlinkJedisConfigBase flinkJedisConfigBase, 
RedisMapper<IN> redisSinkMapper) {
+               Preconditions.checkNotNull(flinkJedisConfigBase, "Redis 
connection pool config should not be null");
+               Preconditions.checkNotNull(redisSinkMapper, "Redis Mapper can 
not be null");
+               
Preconditions.checkNotNull(redisSinkMapper.getCommandDescription(), "Redis 
Mapper data type description can not be null");
+
+               this.flinkJedisConfigBase = flinkJedisConfigBase;
+
+               this.redisSinkMapper = redisSinkMapper;
+               RedisCommandDescription redisCommandDescription = 
redisSinkMapper.getCommandDescription();
+               this.redisCommand = redisCommandDescription.getCommand();
+               this.additionalKey = redisCommandDescription.getAdditionalKey();
+       }
+
+       /**
+        * Called when new data arrives to the sink, and forwards it to Redis 
channel.
+        * Depending on the specified Redis data type (see {@link 
RedisDataType}),
+        * a different Redis command will be applied.
+        * Available commands are RPUSH, LPUSH, SADD, PUBLISH, SET, PFADD, 
HSET, ZADD.
+        *
+        * @param input The incoming data
+        */
+       @Override
+       public void invoke(IN input) throws Exception {
+               String key = redisSinkMapper.getKeyFromData(input);
+               String value = redisSinkMapper.getValueFromData(input);
+
+               switch (redisCommand) {
+                       case RPUSH:
+                               this.redisCommandsContainer.rpush(key, value);
+                               break;
+                       case LPUSH:
+                               this.redisCommandsContainer.lpush(key, value);
+                               break;
+                       case SADD:
+                               this.redisCommandsContainer.sadd(key, value);
+                               break;
+                       case SET:
+                               this.redisCommandsContainer.set(key, value);
+                               break;
+                       case PFADD:
+                               this.redisCommandsContainer.pfadd(key, value);
+                               break;
+                       case PUBLISH:
+                               this.redisCommandsContainer.publish(key, value);
+                               break;
+                       case ZADD:
+                               
this.redisCommandsContainer.zadd(this.additionalKey, value, key);
+                               break;
+                       case HSET:
+                               
this.redisCommandsContainer.hset(this.additionalKey, key, value);
+                               break;
+                       default:
+                               throw new IllegalArgumentException("Cannot 
process such data type: " + redisCommand);
+               }
+       }
+
+       /**
+        * Initializes the connection to Redis by either cluster or sentinels 
or single server.
+        *
+        * @throws IllegalArgumentException if jedisPoolConfig, 
jedisClusterConfig and jedisSentinelConfig are all null
+     */
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               this.redisCommandsContainer = 
RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase);
+       }
+
+       /**
+        * Closes commands container.
+        * @throws IOException if command container is unable to close.
+        */
+       @Override
+       public void close() throws IOException {
+               if (redisCommandsContainer != null) {
+                       redisCommandsContainer.close();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
 
b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
new file mode 100644
index 0000000..6e6cfe5
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Protocol;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis cluster.
+ */
+public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
+       private static final long serialVersionUID = 1L;
+
+       private final Set<InetSocketAddress> nodes;
+       private final int maxRedirections;
+
+
+       /**
+        * Jedis cluster configuration.
+        * The list of node is mandatory, and when nodes is not set, it throws 
NullPointerException.
+        *
+        * @param nodes list of node information for JedisCluster
+        * @param connectionTimeout socket / connection timeout. The default is 
2000
+        * @param maxRedirections limit of redirections-how much we'll follow 
MOVED or ASK
+        * @param maxTotal the maximum number of objects that can be allocated 
by the pool
+        * @param maxIdle the cap on the number of "idle" instances in the pool
+        * @param minIdle the minimum number of idle objects to maintain in the 
pool
+        * @throws NullPointerException if parameter {@code nodes} is {@code 
null}
+        */
+       private FlinkJedisClusterConfig(Set<InetSocketAddress> nodes, int 
connectionTimeout, int maxRedirections,
+                                                                       int 
maxTotal, int maxIdle, int minIdle) {
+               super(connectionTimeout, maxTotal, maxIdle, minIdle);
+
+               Preconditions.checkNotNull(nodes, "Node information should be 
presented");
+               Preconditions.checkArgument(!nodes.isEmpty(), "Redis cluster 
hosts should not be empty");
+               this.nodes = new HashSet<>(nodes);
+               this.maxRedirections = maxRedirections;
+       }
+
+
+
+       /**
+        * Returns nodes.
+        *
+        * @return list of node information
+        */
+       public Set<HostAndPort> getNodes() {
+               Set<HostAndPort> ret = new HashSet<>();
+               for (InetSocketAddress node : nodes) {
+                       ret.add(new HostAndPort(node.getHostName(), 
node.getPort()));
+               }
+               return ret;
+       }
+
+       /**
+        * Returns limit of redirection.
+        *
+        * @return limit of redirection
+        */
+       public int getMaxRedirections() {
+               return maxRedirections;
+       }
+
+
+       /**
+        * Builder for initializing  {@link FlinkJedisClusterConfig}.
+        */
+       public static class Builder {
+               private Set<InetSocketAddress> nodes;
+               private int timeout = Protocol.DEFAULT_TIMEOUT;
+               private int maxRedirections = 5;
+               private int maxTotal = 
GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
+               private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
+               private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
+
+               /**
+                * Sets list of node.
+                *
+                * @param nodes list of node
+                * @return Builder itself
+                */
+               public Builder setNodes(Set<InetSocketAddress> nodes) {
+                       this.nodes = nodes;
+                       return this;
+               }
+
+               /**
+                * Sets socket / connection timeout.
+                *
+                * @param timeout socket / connection timeout, default value is 
2000
+                * @return Builder itself
+                */
+               public Builder setTimeout(int timeout) {
+                       this.timeout = timeout;
+                       return this;
+               }
+
+               /**
+                * Sets limit of redirection.
+                *
+                * @param maxRedirections limit of redirection, default value 
is 5
+                * @return Builder itself
+                */
+               public Builder setMaxRedirections(int maxRedirections) {
+                       this.maxRedirections = maxRedirections;
+                       return this;
+               }
+
+               /**
+                * Sets value for the {@code maxTotal} configuration attribute
+                * for pools to be created with this configuration instance.
+                *
+                * @param maxTotal maxTotal the maximum number of objects that 
can be allocated by the pool, default value is 8
+                * @return Builder itself
+                */
+               public Builder setMaxTotal(int maxTotal) {
+                       this.maxTotal = maxTotal;
+                       return this;
+               }
+
+               /**
+                * Sets value for the {@code maxIdle} configuration attribute
+                * for pools to be created with this configuration instance.
+                *
+                * @param maxIdle the cap on the number of "idle" instances in 
the pool, default value is 8
+                * @return Builder itself
+                */
+               public Builder setMaxIdle(int maxIdle) {
+                       this.maxIdle = maxIdle;
+                       return this;
+               }
+
+               /**
+                * Sets value for the {@code minIdle} configuration attribute
+                * for pools to be created with this configuration instance.
+                *
+                * @param minIdle the minimum number of idle objects to 
maintain in the pool, default value is 0
+                * @return Builder itself
+                */
+               public Builder setMinIdle(int minIdle) {
+                       this.minIdle = minIdle;
+                       return this;
+               }
+
+               /**
+                * Builds JedisClusterConfig.
+                *
+                * @return JedisClusterConfig
+                */
+               public FlinkJedisClusterConfig build() {
+                       return new FlinkJedisClusterConfig(nodes, timeout, 
maxRedirections, maxTotal, maxIdle, minIdle);
+               }
+       }
+
+       @Override
+       public String toString() {
+               return "JedisClusterConfig{" +
+                       "nodes=" + nodes +
+                       ", timeout=" + connectionTimeout +
+                       ", maxRedirections=" + maxRedirections +
+                       ", maxTotal=" + maxTotal +
+                       ", maxIdle=" + maxIdle +
+                       ", minIdle=" + minIdle +
+                       '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
 
b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
new file mode 100644
index 0000000..a2489b8
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * Base class for Flink Redis configuration.
+ */
+public abstract class FlinkJedisConfigBase implements Serializable {
+       private static final long serialVersionUID = 1L;
+
+       protected final int maxTotal;
+       protected final int maxIdle;
+       protected final int minIdle;
+       protected final int connectionTimeout;
+
+       protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int 
maxIdle, int minIdle){
+               Preconditions.checkArgument(connectionTimeout >= 0, "connection 
timeout can not be negative");
+               Preconditions.checkArgument(maxTotal >= 0, "maxTotal value can 
not be negative");
+               Preconditions.checkArgument(maxIdle >= 0, "maxIdle value can 
not be negative");
+               Preconditions.checkArgument(minIdle >= 0, "minIdle value can 
not be negative");
+               this.connectionTimeout = connectionTimeout;
+               this.maxTotal = maxTotal;
+               this.maxIdle = maxIdle;
+               this.minIdle = minIdle;
+       }
+
+       /**
+        * Returns timeout.
+        *
+        * @return connection timeout
+        */
+       public int getConnectionTimeout() {
+               return connectionTimeout;
+       }
+
+       /**
+        * Get the value for the {@code maxTotal} configuration attribute
+        * for pools to be created with this configuration instance.
+        *
+        * @return  The current setting of {@code maxTotal} for this
+        *          configuration instance
+        * @see GenericObjectPoolConfig#getMaxTotal()
+        */
+       public int getMaxTotal() {
+               return maxTotal;
+       }
+
+       /**
+        * Get the value for the {@code maxIdle} configuration attribute
+        * for pools to be created with this configuration instance.
+        *
+        * @return  The current setting of {@code maxIdle} for this
+        *          configuration instance
+        * @see GenericObjectPoolConfig#getMaxIdle()
+        */
+       public int getMaxIdle() {
+               return maxIdle;
+       }
+
+       /**
+        * Get the value for the {@code minIdle} configuration attribute
+        * for pools to be created with this configuration instance.
+        *
+        * @return  The current setting of {@code minIdle} for this
+        *          configuration instance
+        * @see GenericObjectPoolConfig#getMinIdle()
+        */
+       public int getMinIdle() {
+               return minIdle;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
 
b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
new file mode 100644
index 0000000..d261a35
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.Protocol;
+
+/**
+ * Configuration for Jedis pool.
+ */
+public class FlinkJedisPoolConfig extends FlinkJedisConfigBase {
+
+       private static final long serialVersionUID = 1L;
+
+       private final String host;
+       private final int port;
+       private final int database;
+       private final String password;
+
+
+       /**
+        * Jedis pool configuration.
+        * The host is mandatory, and when host is not set, it throws 
NullPointerException.
+        *
+        * @param host hostname or IP
+        * @param port port, default value is 6379
+        * @param connectionTimeout socket / connection timeout, default value 
is 2000 milli second
+        * @param password password, if any
+        * @param database database index
+        * @param maxTotal the maximum number of objects that can be allocated 
by the pool, default value is 8
+        * @param maxIdle the cap on the number of "idle" instances in the 
pool, default value is 8
+        * @param minIdle the minimum number of idle objects to maintain in the 
pool, default value is 0
+        * @throws NullPointerException if parameter {@code host} is {@code 
null}
+        */
+       private FlinkJedisPoolConfig(String host, int port, int 
connectionTimeout, String password, int database,
+                                                               int maxTotal, 
int maxIdle, int minIdle) {
+               super(connectionTimeout, maxTotal, maxIdle, minIdle);
+               Preconditions.checkNotNull(host, "Host information should be 
presented");
+               this.host = host;
+               this.port = port;
+               this.database = database;
+               this.password = password;
+       }
+
+       /**
+        * Returns host.
+        *
+        * @return hostname or IP
+        */
+       public String getHost() {
+               return host;
+       }
+
+       /**
+        * Returns port.
+        *
+        * @return port
+        */
+       public int getPort() {
+               return port;
+       }
+
+
+       /**
+        * Returns database index.
+        *
+        * @return database index
+        */
+       public int getDatabase() {
+               return database;
+       }
+
+       /**
+        * Returns password.
+        *
+        * @return password
+        */
+       public String getPassword() {
+               return password;
+       }
+
+       /**
+        * Builder for initializing  {@link FlinkJedisPoolConfig}.
+        */
+       public static class Builder {
+               private String host;
+               private int port = Protocol.DEFAULT_PORT;
+               private int timeout = Protocol.DEFAULT_TIMEOUT;
+               private int database = Protocol.DEFAULT_DATABASE;
+               private String password;
+               private int maxTotal = 
GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
+               private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
+               private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
+
+               /**
+                * Sets value for the {@code maxTotal} configuration attribute
+                * for pools to be created with this configuration instance.
+                *
+                * @param maxTotal maxTotal the maximum number of objects that 
can be allocated by the pool, default value is 8
+         * @return Builder itself
+         */
+               public Builder setMaxTotal(int maxTotal) {
+                       this.maxTotal = maxTotal;
+                       return this;
+               }
+
+               /**
+                * Sets value for the {@code maxIdle} configuration attribute
+                * for pools to be created with this configuration instance.
+                *
+                * @param maxIdle the cap on the number of "idle" instances in 
the pool, default value is 8
+         * @return Builder itself
+         */
+               public Builder setMaxIdle(int maxIdle) {
+                       this.maxIdle = maxIdle;
+                       return this;
+               }
+
+               /**
+                * Sets value for the {@code minIdle} configuration attribute
+                * for pools to be created with this configuration instance.
+                *
+                * @param minIdle the minimum number of idle objects to 
maintain in the pool, default value is 0
+         * @return Builder itself
+         */
+               public Builder setMinIdle(int minIdle) {
+                       this.minIdle = minIdle;
+                       return this;
+               }
+
+               /**
+                * Sets host.
+                *
+                * @param host host
+                * @return Builder itself
+                */
+               public Builder setHost(String host) {
+                       this.host = host;
+                       return this;
+               }
+
+               /**
+                * Sets port.
+                *
+                * @param port port, default value is 6379
+                * @return Builder itself
+                */
+               public Builder setPort(int port) {
+                       this.port = port;
+                       return this;
+               }
+
+               /**
+                * Sets timeout.
+                *
+                * @param timeout timeout, default value is 2000
+                * @return Builder itself
+                */
+               public Builder setTimeout(int timeout) {
+                       this.timeout = timeout;
+                       return this;
+               }
+
+               /**
+                * Sets database index.
+                *
+                * @param database database index, default value is 0
+                * @return Builder itself
+                */
+               public Builder setDatabase(int database) {
+                       this.database = database;
+                       return this;
+               }
+
+               /**
+                * Sets password.
+                *
+                * @param password password, if any
+                * @return Builder itself
+                */
+               public Builder setPassword(String password) {
+                       this.password = password;
+                       return this;
+               }
+
+
+               /**
+                * Builds JedisPoolConfig.
+                *
+                * @return JedisPoolConfig
+                */
+               public FlinkJedisPoolConfig build() {
+                       return new FlinkJedisPoolConfig(host, port, timeout, 
password, database, maxTotal, maxIdle, minIdle);
+               }
+       }
+
+       @Override
+       public String toString() {
+               return "JedisPoolConfig{" +
+                       "host='" + host + '\'' +
+                       ", port=" + port +
+                       ", timeout=" + connectionTimeout +
+                       ", database=" + database +
+                       ", maxTotal=" + maxTotal +
+                       ", maxIdle=" + maxIdle +
+                       ", minIdle=" + minIdle +
+                       '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
 
b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
new file mode 100644
index 0000000..2cdb397
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Protocol;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Sentinel pool.
+ */
+public class FlinkJedisSentinelConfig extends FlinkJedisConfigBase {
+       private static final long serialVersionUID = 1L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJedisSentinelConfig.class);
+
+       private final String masterName;
+       private final Set<String> sentinels;
+       private final int soTimeout;
+       private final String password;
+       private final int database;
+
+       /**
+        * Jedis Sentinels config.
+        * The master name and sentinels are mandatory, and when you didn't set 
these, it throws NullPointerException.
+        *
+        * @param masterName master name of the replica set
+        * @param sentinels set of sentinel hosts
+        * @param connectionTimeout timeout connection timeout
+        * @param soTimeout timeout socket timeout
+        * @param password password, if any
+        * @param database database database index
+        * @param maxTotal maxTotal the maximum number of objects that can be 
allocated by the pool
+        * @param maxIdle the cap on the number of "idle" instances in the pool
+        * @param minIdle the minimum number of idle objects to maintain in the 
pool
+        *
+        * @throws NullPointerException if {@code masterName} or {@code 
sentinels} is {@code null}
+        * @throws IllegalArgumentException if {@code sentinels} are empty
+        */
+       private FlinkJedisSentinelConfig(String masterName, Set<String> 
sentinels,
+                                                                       int 
connectionTimeout, int soTimeout,
+                                                                       String 
password, int database,
+                                                                       int 
maxTotal, int maxIdle, int minIdle) {
+               super(connectionTimeout, maxTotal, maxIdle, minIdle);
+               Preconditions.checkNotNull(masterName, "Master name should be 
presented");
+               Preconditions.checkNotNull(sentinels, "Sentinels information 
should be presented");
+               Preconditions.checkArgument(!sentinels.isEmpty(), "Sentinel 
hosts should not be empty");
+
+               this.masterName = masterName;
+               this.sentinels = new HashSet<>(sentinels);
+               this.soTimeout = soTimeout;
+               this.password = password;
+               this.database = database;
+       }
+
+       /**
+        * Returns master name of the replica set.
+        *
+        * @return master name of the replica set.
+        */
+       public String getMasterName() {
+               return masterName;
+       }
+
+       /**
+        * Returns Sentinels host addresses.
+        *
+        * @return Set of Sentinels host addresses
+        */
+       public Set<String> getSentinels() {
+               return sentinels;
+       }
+
+       /**
+        * Returns socket timeout.
+        *
+        * @return socket timeout
+        */
+       public int getSoTimeout() {
+               return soTimeout;
+       }
+
+       /**
+        * Returns password.
+        *
+        * @return password
+        */
+       public String getPassword() {
+               return password;
+       }
+
+       /**
+        * Returns database index.
+        *
+        * @return database index
+        */
+       public int getDatabase() {
+               return database;
+       }
+
+       /**
+        * Builder for initializing {@link FlinkJedisSentinelConfig}.
+        */
+       public static class Builder {
+               private String masterName;
+               private Set<String> sentinels;
+               private int connectionTimeout = Protocol.DEFAULT_TIMEOUT;
+               private int soTimeout = Protocol.DEFAULT_TIMEOUT;
+               private String password;
+               private int database = Protocol.DEFAULT_DATABASE;
+               private int maxTotal = 
GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
+               private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
+               private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
+
+               /**
+                * Sets master name of the replica set.
+                *
+                * @param masterName  master name of the replica set
+                * @return Builder itself
+         */
+               public Builder setMasterName(String masterName) {
+                       this.masterName = masterName;
+                       return this;
+               }
+
+               /**
+                * Sets sentinels address.
+                *
+                * @param sentinels host set of the sentinels
+                * @return Builder itself
+         */
+               public Builder setSentinels(Set<String> sentinels) {
+                       this.sentinels = sentinels;
+                       return this;
+               }
+
+               /**
+                * Sets connection timeout.
+                *
+                * @param connectionTimeout connection timeout, default value 
is 2000
+                * @return Builder itself
+                */
+               public Builder setConnectionTimeout(int connectionTimeout) {
+                       this.connectionTimeout = connectionTimeout;
+                       return this;
+               }
+
+               /**
+                * Sets socket timeout.
+                *
+                * @param soTimeout socket timeout, default value is 2000
+         * @return Builder itself
+         */
+               public Builder setSoTimeout(int soTimeout) {
+                       this.soTimeout = soTimeout;
+                       return this;
+               }
+
+               /**
+                * Sets password.
+                *
+                * @param password password, if any
+                * @return Builder itself
+                */
+               public Builder setPassword(String password) {
+                       this.password = password;
+                       return this;
+               }
+
+               /**
+                * Sets database index.
+                *
+                * @param database database index, default value is 0
+                * @return Builder itself
+                */
+               public Builder setDatabase(int database) {
+                       this.database = database;
+                       return this;
+               }
+
+               /**
+                * Sets value for the {@code maxTotal} configuration attribute
+                * for pools to be created with this configuration instance.
+                *
+                * @param maxTotal maxTotal the maximum number of objects that 
can be allocated by the pool, default value is 8
+                * @return Builder itself
+                */
+               public Builder setMaxTotal(int maxTotal) {
+                       this.maxTotal = maxTotal;
+                       return this;
+               }
+
+               /**
+                * Sets value for the {@code maxIdle} configuration attribute
+                * for pools to be created with this configuration instance.
+                *
+                * @param maxIdle the cap on the number of "idle" instances in 
the pool, default value is 8
+                * @return Builder itself
+                */
+               public Builder setMaxIdle(int maxIdle) {
+                       this.maxIdle = maxIdle;
+                       return this;
+               }
+
+               /**
+                * Sets value for the {@code minIdle} configuration attribute
+                * for pools to be created with this configuration instance.
+                *
+                * @param minIdle the minimum number of idle objects to 
maintain in the pool, default value is 0
+                * @return Builder itself
+                */
+               public Builder setMinIdle(int minIdle) {
+                       this.minIdle = minIdle;
+                       return this;
+               }
+
+               /**
+                * Builds JedisSentinelConfig.
+                *
+                * @return JedisSentinelConfig
+                */
+               public FlinkJedisSentinelConfig build(){
+                       return new FlinkJedisSentinelConfig(masterName, 
sentinels, connectionTimeout, soTimeout,
+                               password, database, maxTotal, maxIdle, minIdle);
+               }
+       }
+
+       @Override
+       public String toString() {
+               return "JedisSentinelConfig{" +
+                       "masterName='" + masterName + '\'' +
+                       ", connectionTimeout=" + connectionTimeout +
+                       ", soTimeout=" + soTimeout +
+                       ", database=" + database +
+                       ", maxTotal=" + maxTotal +
+                       ", maxIdle=" + maxIdle +
+                       ", minIdle=" + minIdle +
+                       '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
 
b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
new file mode 100644
index 0000000..7551c9e
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a Redis cluster.
+ */
+public class RedisClusterContainer implements RedisCommandsContainer, 
Closeable {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(RedisClusterContainer.class);
+
+       private JedisCluster jedisCluster;
+
+       /**
+        * Initialize Redis command container for Redis cluster.
+        *
+        * @param jedisCluster JedisCluster instance
+        */
+       public RedisClusterContainer(JedisCluster jedisCluster) {
+               Preconditions.checkNotNull(jedisCluster, "Jedis cluster can not 
be null");
+
+               this.jedisCluster = jedisCluster;
+       }
+
+       @Override
+       public void hset(final String key, final String hashField, final String 
value) {
+               try {
+                       jedisCluster.hset(key, hashField, value);
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Cannot send Redis message with 
command HSET to hash {} error message {}",
+                                       key, hashField, e.getMessage());
+                       }
+                       throw e;
+               }
+       }
+
+       @Override
+       public void rpush(final String listName, final String value) {
+               try {
+                       jedisCluster.rpush(listName, value);
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Cannot send Redis message with 
command RPUSH to list {} error message: {}",
+                                       listName, e.getMessage());
+                       }
+                       throw e;
+               }
+       }
+
+       @Override
+       public void lpush(String listName, String value) {
+               try {
+                       jedisCluster.lpush(listName, value);
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Cannot send Redis message with 
command LPUSH to list {} error message: {}",
+                                       listName, e.getMessage());
+                       }
+                       throw e;
+               }
+       }
+
+       @Override
+       public void sadd(final String setName, final String value) {
+               try {
+                       jedisCluster.sadd(setName, value);
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Cannot send Redis message with 
command RPUSH to set {} error message {}",
+                                       setName, e.getMessage());
+                       }
+                       throw e;
+               }
+       }
+
+       @Override
+       public void publish(final String channelName, final String message) {
+               try {
+                       jedisCluster.publish(channelName, message);
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Cannot send Redis message with 
command PUBLISH to channel {} error message {}",
+                                       channelName, e.getMessage());
+                       }
+                       throw e;
+               }
+       }
+
+       @Override
+       public void set(final String key, final String value) {
+               try {
+                       jedisCluster.set(key, value);
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Cannot send Redis message with 
command SET to key {} error message {}",
+                                       key, e.getMessage());
+                       }
+                       throw e;
+               }
+       }
+
+       @Override
+       public void pfadd(final String key, final String element) {
+               try {
+                       jedisCluster.set(key, element);
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Cannot send Redis message with 
command PFADD to key {} error message {}",
+                                       key, e.getMessage());
+                       }
+                       throw e;
+               }
+       }
+
+       @Override
+       public void zadd(final String key, final String score, final String 
element) {
+               try {
+                       jedisCluster.zadd(key, Double.valueOf(score), element);
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Cannot send Redis message with 
command ZADD to set {} error message {}",
+                                       key, e.getMessage());
+                       }
+                       throw e;
+               }
+       }
+
+       /**
+        * Closes the {@link JedisCluster}.
+        */
+       @Override
+       public void close() throws IOException {
+               this.jedisCluster.close();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
 
b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
new file mode 100644
index 0000000..1b92c2e
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * The container for all available Redis commands.
+ */
+public interface RedisCommandsContainer extends Serializable {
+
+       /**
+        * Sets field in the hash stored at key to value.
+        * If key does not exist, a new key holding a hash is created.
+        * If field already exists in the hash, it is overwritten.
+        *
+        * @param key Hash name
+        * @param hashField Hash field
+        * @param value Hash value
+        */
+       void hset(String key, String hashField, String value);
+
+       /**
+        * Insert the specified value at the tail of the list stored at key.
+        * If key does not exist, it is created as empty list before performing 
the push operation.
+        *
+        * @param listName Name of the List
+        * @param value  Value to be added
+        */
+       void rpush(String listName, String value);
+
+       /**
+        * Insert the specified value at the head of the list stored at key.
+        * If key does not exist, it is created as empty list before performing 
the push operation.
+        *
+        * @param listName Name of the List
+        * @param value  Value to be added
+        */
+       void lpush(String listName, String value);
+
+       /**
+        * Add the specified member to the set stored at key.
+        * Specified members that are already a member of this set are ignored.
+        * If key does not exist, a new set is created before adding the 
specified members.
+        *
+        * @param setName Name of the Set
+        * @param value Value to be added
+        */
+       void sadd(String setName, String value);
+
+       /**
+        * Posts a message to the given channel.
+        *
+        * @param channelName Name of the channel to which data will be 
published
+        * @param message the message
+        */
+       void publish(String channelName, String message);
+
+       /**
+        * Set key to hold the string value. If key already holds a value, it 
is overwritten,
+        * regardless of its type. Any previous time to live associated with 
the key is
+        * discarded on successful SET operation.
+        *
+        * @param key the key name in which value to be set
+        * @param value the value
+        */
+       void set(String key, String value);
+
+       /**
+        * Adds all the element arguments to the HyperLogLog data structure
+        * stored at the variable name specified as first argument.
+        *
+        * @param key The name of the key
+        * @param element the element
+        */
+       void pfadd(String key, String element);
+
+       /**
+        * Adds the specified member with the specified scores to the sorted 
set stored at key.
+        *
+        * @param key The name of the Sorted Set
+        * @param score Score of the element
+        * @param element  element to be added
+        */
+       void zadd(String key, String score, String element);
+
+       /**
+        * Close the Jedis container.
+        *
+        * @throws IOException if the instance can not be closed properly
+        */
+       void close() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
 
b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
new file mode 100644
index 0000000..dc5396a
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+/**
+ * The builder for {@link RedisCommandsContainer}.
+ */
+public class RedisCommandsContainerBuilder {
+
+       /**
+        * Initialize the {@link RedisCommandsContainer} based on the instance 
type.
+        * @param flinkJedisConfigBase configuration base
+        * @return @throws IllegalArgumentException if jedisPoolConfig, 
jedisClusterConfig and jedisSentinelConfig are all null
+     */
+       public static RedisCommandsContainer build(FlinkJedisConfigBase 
flinkJedisConfigBase){
+               if(flinkJedisConfigBase instanceof FlinkJedisPoolConfig){
+                       FlinkJedisPoolConfig flinkJedisPoolConfig = 
(FlinkJedisPoolConfig) flinkJedisConfigBase;
+                       return 
RedisCommandsContainerBuilder.build(flinkJedisPoolConfig);
+               } else if (flinkJedisConfigBase instanceof 
FlinkJedisClusterConfig) {
+                       FlinkJedisClusterConfig flinkJedisClusterConfig = 
(FlinkJedisClusterConfig) flinkJedisConfigBase;
+                       return 
RedisCommandsContainerBuilder.build(flinkJedisClusterConfig);
+               } else if (flinkJedisConfigBase instanceof 
FlinkJedisSentinelConfig) {
+                       FlinkJedisSentinelConfig flinkJedisSentinelConfig = 
(FlinkJedisSentinelConfig) flinkJedisConfigBase;
+                       return 
RedisCommandsContainerBuilder.build(flinkJedisSentinelConfig);
+               } else {
+                       throw new IllegalArgumentException("Jedis configuration 
not found");
+               }
+       }
+
+       /**
+        * Builds container for single Redis environment.
+        *
+        * @param jedisPoolConfig configuration for JedisPool
+        * @return container for single Redis environment
+        * @throws NullPointerException if jedisPoolConfig is null
+        */
+       public static RedisCommandsContainer build(FlinkJedisPoolConfig 
jedisPoolConfig) {
+               Preconditions.checkNotNull(jedisPoolConfig, "Redis pool config 
should not be Null");
+
+               GenericObjectPoolConfig genericObjectPoolConfig = new 
GenericObjectPoolConfig();
+               
genericObjectPoolConfig.setMaxIdle(jedisPoolConfig.getMaxIdle());
+               
genericObjectPoolConfig.setMaxTotal(jedisPoolConfig.getMaxTotal());
+               
genericObjectPoolConfig.setMinIdle(jedisPoolConfig.getMinIdle());
+
+               JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, 
jedisPoolConfig.getHost(),
+                       jedisPoolConfig.getPort(), 
jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(),
+                       jedisPoolConfig.getDatabase());
+               return new RedisContainer(jedisPool);
+       }
+
+       /**
+        * Builds container for Redis Cluster environment.
+        *
+        * @param jedisClusterConfig configuration for JedisCluster
+        * @return container for Redis Cluster environment
+        * @throws NullPointerException if jedisClusterConfig is null
+        */
+       public static RedisCommandsContainer build(FlinkJedisClusterConfig 
jedisClusterConfig) {
+               Preconditions.checkNotNull(jedisClusterConfig, "Redis cluster 
config should not be Null");
+
+               GenericObjectPoolConfig genericObjectPoolConfig = new 
GenericObjectPoolConfig();
+               
genericObjectPoolConfig.setMaxIdle(jedisClusterConfig.getMaxIdle());
+               
genericObjectPoolConfig.setMaxTotal(jedisClusterConfig.getMaxTotal());
+               
genericObjectPoolConfig.setMinIdle(jedisClusterConfig.getMinIdle());
+
+               JedisCluster jedisCluster = new 
JedisCluster(jedisClusterConfig.getNodes(), 
jedisClusterConfig.getConnectionTimeout(),
+                       jedisClusterConfig.getMaxRedirections(), 
genericObjectPoolConfig);
+               return new RedisClusterContainer(jedisCluster);
+       }
+
+       /**
+        * Builds container for Redis Sentinel environment.
+        *
+        * @param jedisSentinelConfig configuration for JedisSentinel
+        * @return container for Redis sentinel environment
+        * @throws NullPointerException if jedisSentinelConfig is null
+        */
+       public static RedisCommandsContainer build(FlinkJedisSentinelConfig 
jedisSentinelConfig) {
+               Preconditions.checkNotNull(jedisSentinelConfig, "Redis sentinel 
config should not be Null");
+
+               GenericObjectPoolConfig genericObjectPoolConfig = new 
GenericObjectPoolConfig();
+               
genericObjectPoolConfig.setMaxIdle(jedisSentinelConfig.getMaxIdle());
+               
genericObjectPoolConfig.setMaxTotal(jedisSentinelConfig.getMaxTotal());
+               
genericObjectPoolConfig.setMinIdle(jedisSentinelConfig.getMinIdle());
+
+               JedisSentinelPool jedisSentinelPool = new 
JedisSentinelPool(jedisSentinelConfig.getMasterName(),
+                       jedisSentinelConfig.getSentinels(), 
genericObjectPoolConfig,
+                       jedisSentinelConfig.getConnectionTimeout(), 
jedisSentinelConfig.getSoTimeout(),
+                       jedisSentinelConfig.getPassword(), 
jedisSentinelConfig.getDatabase());
+               return new RedisContainer(jedisSentinelPool);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
 
b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
new file mode 100644
index 0000000..8684e9a
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a single Redis server or 
to Redis sentinels
+ * If want to connect to a single Redis server, please use the first 
constructor {@link #RedisContainer(JedisPool)}.
+ * If want to connect to a Redis sentinels, Please use the second constructor 
{@link #RedisContainer(JedisSentinelPool)}
+ */
+public class RedisContainer implements RedisCommandsContainer, Closeable {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(RedisContainer.class);
+
+       private final JedisPool jedisPool;
+       private final JedisSentinelPool jedisSentinelPool;
+
+
+       /**
+        * Use this constructor if to connect with single Redis server.
+        *
+        * @param jedisPool JedisPool which actually manages Jedis instances
+        */
+       public RedisContainer(JedisPool jedisPool) {
+               Preconditions.checkNotNull(jedisPool, "Jedis Pool can not be 
null");
+               this.jedisPool = jedisPool;
+               this.jedisSentinelPool = null;
+       }
+
+       /**
+        * Use this constructor if Redis environment is clustered with 
sentinels.
+        *
+        * @param sentinelPool SentinelPool which actually manages Jedis 
instances
+        */
+       public RedisContainer(final JedisSentinelPool sentinelPool) {
+               Preconditions.checkNotNull(sentinelPool, "Jedis Sentinel Pool 
can not be null");
+               this.jedisPool = null;
+               this.jedisSentinelPool = sentinelPool;
+       }
+
+       /**
+        * Closes the Jedis instances.
+        */
+       @Override
+       public void close() throws IOException {
+               if (this.jedisPool != null) {
+                       this.jedisPool.close();
+               }
+               if (this.jedisSentinelPool != null) {
+                       this.jedisSentinelPool.close();
+               }
+       }
+
+       @Override
+       public void hset(final String key, final String hashField, final String 
value) {
+               Jedis jedis = null;
+               try {
+                       jedis = getInstance();
+                       jedis.hset(key, hashField, value);
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Cannot send Redis message with 
command HSET to key {} and hashField {} error message {}",
+                                       key, hashField, e.getMessage());
+                       }
+                       throw e;
+               } finally {
+                       releaseInstance(jedis);
+               }
+       }
+
+       @Override
+       public void rpush(final String listName, final String value) {
+               Jedis jedis = null;
+               try {
+                       jedis = getInstance();
+                       jedis.rpush(listName, value);
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Cannot send Redis message with 
command RPUSH to list {} error message {}",
+                                       listName, e.getMessage());
+                       }
+                       throw e;
+               } finally {
+                       releaseInstance(jedis);
+               }
+       }
+
+       @Override
+       public void lpush(String listName, String value) {
+               Jedis jedis = null;
+               try {
+                       jedis = getInstance();
+                       jedis.lpush(listName, value);
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Cannot send Redis message with 
command LUSH to list {} error message {}",
+                                       listName, e.getMessage());
+                       }
+                       throw e;
+               } finally {
+                       releaseInstance(jedis);
+               }
+       }
+
+       @Override
+       public void sadd(final String setName, final String value) {
+               Jedis jedis = null;
+               try {
+                       jedis = getInstance();
+                       jedis.sadd(setName, value);
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Cannot send Redis message with 
command RPUSH to set {} error message {}",
+                                       setName, e.getMessage());
+                       }
+                       throw e;
+               } finally {
+                       releaseInstance(jedis);
+               }
+       }
+
+       @Override
+       public void publish(final String channelName, final String message) {
+               Jedis jedis = null;
+               try {
+                       jedis = getInstance();
+                       jedis.publish(channelName, message);
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Cannot send Redis message with 
command PUBLISH to channel {} error message {}",
+                                       channelName, e.getMessage());
+                       }
+                       throw e;
+               } finally {
+                       releaseInstance(jedis);
+               }
+       }
+
+       @Override
+       public void set(final String key, final String value) {
+               Jedis jedis = null;
+               try {
+                       jedis = getInstance();
+                       jedis.set(key, value);
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Cannot send Redis message with 
command SET to key {} error message {}",
+                                       key, e.getMessage());
+                       }
+                       throw e;
+               } finally {
+                       releaseInstance(jedis);
+               }
+       }
+
+       @Override
+       public void pfadd(final String key, final String element) {
+               Jedis jedis = null;
+               try {
+                       jedis = getInstance();
+                       jedis.pfadd(key, element);
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Cannot send Redis message with 
command PFADD to key {} error message {}",
+                                       key, e.getMessage());
+                       }
+                       throw e;
+               } finally {
+                       releaseInstance(jedis);
+               }
+       }
+
+       @Override
+       public void zadd(final String key, final String score, final String 
element) {
+               Jedis jedis = null;
+               try {
+                       jedis = getInstance();
+                       jedis.zadd(key, Double.valueOf(score), element);
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Cannot send Redis message with 
command ZADD to set {} error message {}",
+                                       key, e.getMessage());
+                       }
+                       throw e;
+               } finally {
+                       releaseInstance(jedis);
+               }
+       }
+
+       /**
+        * Returns Jedis instance from the pool.
+        *
+        * @return the Jedis instance
+     */
+       private Jedis getInstance() {
+               if (jedisSentinelPool != null) {
+                       return jedisSentinelPool.getResource();
+               } else {
+                       return jedisPool.getResource();
+               }
+       }
+
+       /**
+        * Closes the jedis instance after finishing the command.
+        *
+        * @param jedis The jedis instance
+     */
+       private void releaseInstance(final Jedis jedis) {
+               if (jedis == null) {
+                       return;
+               }
+               try {
+                       jedis.close();
+               } catch (Exception e) {
+                       LOG.error("Failed to close (return) instance to pool", 
e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
 
b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
new file mode 100644
index 0000000..b0661c7
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+/**
+ * All available commands for Redis. Each command belongs to a {@link 
RedisDataType} group.
+ */
+public enum RedisCommand {
+
+       /**
+        * Insert the specified value at the head of the list stored at key.
+        * If key does not exist, it is created as empty list before performing 
the push operations.
+        */
+       LPUSH(RedisDataType.LIST),
+
+       /**
+        * Insert the specified value at the tail of the list stored at key.
+        * If key does not exist, it is created as empty list before performing 
the push operation.
+        */
+       RPUSH(RedisDataType.LIST),
+
+       /**
+        * Add the specified member to the set stored at key.
+        * Specified member that is already a member of this set is ignored.
+        */
+       SADD(RedisDataType.SET),
+
+       /**
+        * Set key to hold the string value. If key already holds a value,
+        * it is overwritten, regardless of its type.
+        */
+       SET(RedisDataType.STRING),
+
+       /**
+        * Adds the element to the HyperLogLog data structure stored at the 
variable name specified as first argument.
+        */
+       PFADD(RedisDataType.HYPER_LOG_LOG),
+
+       /**
+        * Posts a message to the given channel.
+        */
+       PUBLISH(RedisDataType.PUBSUB),
+
+       /**
+        * Adds the specified members with the specified score to the sorted 
set stored at key.
+        */
+       ZADD(RedisDataType.SORTED_SET),
+
+       /**
+        * Sets field in the hash stored at key to value. If key does not exist,
+        * a new key holding a hash is created. If field already exists in the 
hash, it is overwritten.
+        */
+       HSET(RedisDataType.HASH);
+
+       /**
+        * The {@link RedisDataType} this command belongs to.
+        */
+       private RedisDataType redisDataType;
+
+       RedisCommand(RedisDataType redisDataType) {
+               this.redisDataType = redisDataType;
+       }
+
+
+       /**
+        * The {@link RedisDataType} this command belongs to.
+        * @return the {@link RedisDataType}
+        */
+       public RedisDataType getRedisDataType(){
+               return redisDataType;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
 
b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
new file mode 100644
index 0000000..1eea48a
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommandDescription.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * The description of the command type. This must be passed while creating new 
{@link RedisMapper}.
+ * <p>When creating descriptor for the group of {@link RedisDataType#HASH} and 
{@link RedisDataType#SORTED_SET},
+ * you need to use first constructor {@link 
#RedisCommandDescription(RedisCommand, String)}.
+ * If the {@code additionalKey} is {@code null} it will throw {@code 
IllegalArgumentException}
+ *
+ * <p>When {@link RedisCommand} is not in group of {@link RedisDataType#HASH} 
and {@link RedisDataType#SORTED_SET}
+ * you can use second constructor {@link 
#RedisCommandDescription(RedisCommand)}
+ */
+public class RedisCommandDescription implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       private RedisCommand redisCommand;
+
+       /**
+        * This additional key is needed for the group {@link 
RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}.
+        * Other {@link RedisDataType} works only with two variable i.e. name 
of the list and value to be added.
+        * But for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET} we need three variables.
+        * <p>For {@link RedisDataType#HASH} we need hash name, hash key and 
element.
+        * {@link #getAdditionalKey()} used as hash name for {@link 
RedisDataType#HASH}
+        * <p>For {@link RedisDataType#SORTED_SET} we need set name, the 
element and it's score.
+        * {@link #getAdditionalKey()} used as set name for {@link 
RedisDataType#SORTED_SET}
+        */
+       private String additionalKey;
+
+       /**
+        * Use this constructor when data type is {@link RedisDataType#HASH} or 
{@link RedisDataType#SORTED_SET}.
+        * If different data type is specified, {@code additionalKey} is 
ignored.
+        * @param redisCommand the redis command type {@link RedisCommand}
+        * @param additionalKey additional key for Hash and Sorted set data type
+        */
+       public RedisCommandDescription(RedisCommand redisCommand, String 
additionalKey) {
+               Preconditions.checkNotNull(redisCommand, "Redis command type 
can not be null");
+               this.redisCommand = redisCommand;
+               this.additionalKey = additionalKey;
+
+               if (redisCommand.getRedisDataType() == RedisDataType.HASH ||
+                       redisCommand.getRedisDataType() == 
RedisDataType.SORTED_SET) {
+                       if (additionalKey == null) {
+                               throw new IllegalArgumentException("Hash and 
Sorted Set should have additional key");
+                       }
+               }
+       }
+
+       /**
+        * Use this constructor when command type is not in group {@link 
RedisDataType#HASH} or {@link RedisDataType#SORTED_SET}.
+        *
+        * @param redisCommand the redis data type {@link RedisCommand}
+        */
+       public RedisCommandDescription(RedisCommand redisCommand) {
+               this(redisCommand, null);
+       }
+
+       /**
+        * Returns the {@link RedisCommand}.
+        *
+        * @return the command type of the mapping
+        */
+       public RedisCommand getCommand() {
+               return redisCommand;
+       }
+
+       /**
+        * Returns the additional key if data type is {@link 
RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}.
+        *
+        * @return the additional key
+        */
+       public String getAdditionalKey() {
+               return additionalKey;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
 
b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
new file mode 100644
index 0000000..6e3997c
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+/**
+ * All available data type for Redis.
+ */
+public enum RedisDataType {
+
+       /**
+        * Strings are the most basic kind of Redis value. Redis Strings are 
binary safe,
+        * this means that a Redis string can contain any kind of data, for 
instance a JPEG image or a serialized Ruby object.
+        * A String value can be at max 512 Megabytes in length.
+        */
+       STRING,
+
+       /**
+        * Redis Hashes are maps between string fields and string values.
+        */
+       HASH,
+
+       /**
+        * Redis Lists are simply lists of strings, sorted by insertion order.
+        */
+       LIST,
+
+       /**
+        * Redis Sets are an unordered collection of Strings.
+        */
+       SET,
+
+       /**
+        * Redis Sorted Sets are, similarly to Redis Sets, non repeating 
collections of Strings.
+        * The difference is that every member of a Sorted Set is associated 
with score,
+        * that is used in order to take the sorted set ordered, from the 
smallest to the greatest score.
+        * While members are unique, scores may be repeated.
+        */
+       SORTED_SET,
+
+       /**
+        * HyperLogLog is a probabilistic data structure used in order to count 
unique things.
+        */
+       HYPER_LOG_LOG,
+
+       /**
+        * Redis implementation of publish and subscribe paradigm. Published 
messages are characterized into channels,
+        * without knowledge of what (if any) subscribers there may be.
+        * Subscribers express interest in one or more channels, and only 
receive messages
+        * that are of interest, without knowledge of what (if any) publishers 
there are.
+        */
+       PUBSUB
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
 
b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
new file mode 100644
index 0000000..63fed19
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Function that creates the description how the input data should be mapped 
to redis type.
+ *<p>Example:
+ *<pre>{@code
+ *private static class RedisTestMapper implements RedisMapper<Tuple2<String, 
String>> {
+ *    public RedisDataTypeDescription getCommandDescription() {
+ *        return new RedisDataTypeDescription(RedisCommand.PUBLISH);
+ *    }
+ *    public String getKeyFromData(Tuple2<String, String> data) {
+ *        return data.f0;
+ *    }
+ *    public String getValueFromData(Tuple2<String, String> data) {
+ *        return data.f1;
+ *    }
+ *}
+ *}</pre>
+ *
+ * @param <T> The type of the element handled by this {@code RedisMapper}
+ */
+public interface RedisMapper<T> extends Function, Serializable {
+
+       /**
+        * Returns descriptor which defines data type.
+        *
+        * @return data type descriptor
+        */
+       RedisCommandDescription getCommandDescription();
+
+       /**
+        * Extracts key from data.
+        *
+        * @param data source data
+        * @return key
+        */
+       String getKeyFromData(T data);
+
+       /**
+        * Extracts value from data.
+        *
+        * @param data source data
+        * @return value
+        */
+       String getValueFromData(T data);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3ab9e36c/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
 
b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
new file mode 100644
index 0000000..7d98f2d
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisITCaseBase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import redis.embedded.RedisServer;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.NetUtils.getAvailablePort;
+
+public abstract class RedisITCaseBase extends 
StreamingMultipleProgramsTestBase {
+
+       public static final int REDIS_PORT = getAvailablePort();
+       public static final String REDIS_HOST = "127.0.0.1";
+
+       private static RedisServer redisServer;
+
+       @BeforeClass
+       public static void createRedisServer() throws IOException, 
InterruptedException {
+               redisServer = new RedisServer(REDIS_PORT);
+               redisServer.start();
+       }
+
+       @AfterClass
+       public static void stopRedisServer(){
+               redisServer.stop();
+       }
+}

Reply via email to