[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15376320#comment-15376320
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user subhankarb commented on the issue:

https://github.com/apache/flink/pull/1813
  
Hi @kl0u, IMO that is the expected behavior. The sink would not know that 
if the Redis is down or not unless it tries to send the next data to the Redis. 
When ever a new message reaches the sink it tries to use the connection pool, 
then an then only it can throw exception that it can not send the data to Redis.
You can build a heartbeat mechanism to check periodically that Redis serve 
is up or down, and can cancel the job if Redis is down. 
@mjsax please correct me if my understanding is wrong.  


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
> Fix For: 1.1.0
>
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15373221#comment-15373221
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/1813
  
Hi @subhankarb ! By playing around with the sink during testing, I saw that 
if Redis goes down in the middle of the execution of  a job, the job has to 
wait until the next element (after the Redis failure) arrives in order to 
detect that it is down.

This is not the expected behavior, as we want the job to fail as soon as 
Redis goes down. Do you have an idea of why this is happening or how to fix it?


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
> Fix For: 1.1.0
>
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15365814#comment-15365814
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on the issue:

https://github.com/apache/flink/pull/1813
  
Thanks @subhankarb ! Great work.
Thanks @tzulitai for helping with reviewing!


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
> Fix For: 1.1.0
>
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15365780#comment-15365780
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1813


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
> Fix For: 1.1.0
>
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15364231#comment-15364231
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/1813
  
This is the build: https://travis-ci.org/rmetzger/flink/builds/142738735


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15364216#comment-15364216
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/1813
  
I'm addressing my last comments myself and merge the change once its green.


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15364208#comment-15364208
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69717542
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import org.apache.flink.util.TestLogger;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisSentinelPool;
+import redis.embedded.RedisCluster;
+import redis.embedded.util.JedisUtil;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.flink.util.NetUtils.getAvailablePort;
+
+public class RedisSentinelClusterTest extends TestLogger {
+
+   private static RedisCluster cluster;
+   private static final String REDIS_MASTER = "master";
+   private static final String TEST_KEY = "testKey";
+   private static final String TEST_VALUE = "testValue";
+   private static final List sentinels = 
Arrays.asList(getAvailablePort(), getAvailablePort());
+   private static final List group1 = 
Arrays.asList(getAvailablePort(), getAvailablePort());
+
+   private JedisSentinelPool jedisSentinelPool;
+   private FlinkJedisSentinelConfig jedisSentinelConfig;
+
+   @BeforeClass
+   public static void setUpCluster(){
+   cluster = 
RedisCluster.builder().sentinelPorts(sentinels).quorumSize(1)
+   .serverPorts(group1).replicationGroup(REDIS_MASTER, 1)
+   .build();
+   cluster.start();
+   }
+
+   @Before
+   public void setUp() {
+   Set hosts = JedisUtil.sentinelHosts(cluster);
+   jedisSentinelConfig = new 
FlinkJedisSentinelConfig.Builder().setMasterName(REDIS_MASTER)
+   .setSentinels(hosts).build();
+   jedisSentinelPool = new 
JedisSentinelPool(jedisSentinelConfig.getMasterName(),
+   jedisSentinelConfig.getSentinels());
+   }
+
+   @Test
+   public void testRedisSentinelOperation() {
+   RedisCommandsContainer redisContainer = 
RedisCommandsContainerBuilder.build(jedisSentinelConfig);
+   Jedis jedis = null;
+   try{
+   jedis = jedisSentinelPool.getResource();
+   redisContainer.set(TEST_KEY, TEST_VALUE);
+   assertEquals(TEST_VALUE, jedis.get(TEST_KEY));
+   }catch (Exception ex){
+   log.warn("Failed to get jedis resource {}", 
ex.getMessage());
--- End diff --

The test will not fail if there's an exception here. I still think this is 
almost like ignoring the exceptions here.


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This mes

[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15364180#comment-15364180
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on the issue:

https://github.com/apache/flink/pull/1813
  
@rmetzger LGTM.



> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15364135#comment-15364135
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user subhankarb commented on the issue:

https://github.com/apache/flink/pull/1813
  
@mjsax , @rmetzger plz review. IMO it is ready to get merged at last :) 


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15362308#comment-15362308
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/1813
  
@mjsax @subhankarb: the changes look good to me :)


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361390#comment-15361390
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on the issue:

https://github.com/apache/flink/pull/1813
  
@subhankarb two tiny comments
@tzulitai @rmetzger Any more comments?


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361387#comment-15361387
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69464742
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.junit.Test;
+
+public class FlinkJedisConfigBaseTest {
--- End diff --

extends TestLogger


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361385#comment-15361385
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69464673
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import org.junit.Test;
+
+public class RedisSinkTest {
--- End diff --

extends TestLogger


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361350#comment-15361350
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user subhankarb commented on the issue:

https://github.com/apache/flink/pull/1813
  
@mjsax done. 


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361181#comment-15361181
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69444799
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+
+import java.io.Serializable;
+
+/**
+ * Base class for Flink Redis configuration.
+ */
+public abstract class FlinkJedisConfigBase implements Serializable {
--- End diff --

done.


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361179#comment-15361179
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69444769
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+
+import java.io.Serializable;
+
+/**
+ * Base class for Flink Redis configuration.
+ */
+public abstract class FlinkJedisConfigBase implements Serializable {
+
+   protected int maxTotal;
+   protected int maxIdle;
+   protected int minIdle;
+   protected int connectionTimeout;
+
+   protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int 
maxIdle, int minIdle){
+   this.connectionTimeout = connectionTimeout;
+   this.maxTotal = maxTotal;
+   this.maxIdle = maxIdle;
+   this.minIdle = minIdle;
+   }
--- End diff --

done


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361180#comment-15361180
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69444781
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
 ---
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Protocol;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Cluster.
+ */
+public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
+   private static final long serialVersionUID = 1L;
+
+   private Set nodes;
+   private int maxRedirections;
+
+
+   /**
+* Jedis cluster configuration.
+* The list of node is mandatory, and when nodes is not set, it throws 
NullPointerException.
+*
+* @param nodes list of node information for JedisCluster
+* @param connectionTimeout socket / connection timeout. The default is 
2000
+* @param maxRedirections limit of redirections-how much we'll follow 
MOVED or ASK
+* @param maxTotal the maximum number of objects that can be allocated 
by the pool
+* @param maxIdle the cap on the number of "idle" instances in the pool
+* @param minIdle the minimum number of idle objects to maintain in the 
pool
+* @throws NullPointerException if parameter {@code nodes} is {@code 
null}
+*/
+   private FlinkJedisClusterConfig(Set nodes, int 
connectionTimeout, int maxRedirections,
+   int 
maxTotal, int maxIdle, int minIdle) {
+   super(connectionTimeout, maxTotal, maxIdle, minIdle);
+
+   Preconditions.checkNotNull(nodes, "Node information should be 
presented");
--- End diff --

done.


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361182#comment-15361182
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69444815
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+/**
+ * The builder for {@link RedisCommandsContainer}.
+ */
+public class RedisCommandsContainerBuilder {
+
+   /**
+* Builds container for single Redis environment.
+*
+* @param jedisPoolConfig configuration for JedisPool
+* @return container for single Redis environment
+* @throws NullPointerException if jedisPoolConfig is null
+*/
+   public static RedisCommandsContainer build(FlinkJedisPoolConfig 
jedisPoolConfig) {
--- End diff --

done.


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361164#comment-15361164
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69442146
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
 ---
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Protocol;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Cluster.
+ */
+public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
+   private static final long serialVersionUID = 1L;
+
+   private Set nodes;
+   private int maxRedirections;
+
+
+   /**
+* Jedis cluster configuration.
+* The list of node is mandatory, and when nodes is not set, it throws 
NullPointerException.
+*
+* @param nodes list of node information for JedisCluster
+* @param connectionTimeout socket / connection timeout. The default is 
2000
+* @param maxRedirections limit of redirections-how much we'll follow 
MOVED or ASK
+* @param maxTotal the maximum number of objects that can be allocated 
by the pool
+* @param maxIdle the cap on the number of "idle" instances in the pool
+* @param minIdle the minimum number of idle objects to maintain in the 
pool
+* @throws NullPointerException if parameter {@code nodes} is {@code 
null}
+*/
+   private FlinkJedisClusterConfig(Set nodes, int 
connectionTimeout, int maxRedirections,
+   int 
maxTotal, int maxIdle, int minIdle) {
+   super(connectionTimeout, maxTotal, maxIdle, minIdle);
+
+   Preconditions.checkNotNull(nodes, "Node information should be 
presented");
+   this.nodes = nodes;
+   this.maxRedirections = maxRedirections;
+   }
+
+
+
+   /**
+* Returns nodes.
+*
+* @return list of node information
+*/
+   public Set getNodes() {
+   Set ret = new HashSet<>();
+   for (InetSocketAddress node : nodes) {
+   ret.add(new HostAndPort(node.getHostName(), 
node.getPort()));
+   }
+   return ret;
+   }
+
+   /**
+* Returns limit of redirection.
+*
+* @return limit of redirection
+*/
+   public int getMaxRedirections() {
+   return maxRedirections;
+   }
+
+
+   /**
+* Builder for initializing  {@link FlinkJedisClusterConfig}.
+*/
+   public static class Builder {
--- End diff --

It is static inner class. IMO let it keep that way. 


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361091#comment-15361091
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on the issue:

https://github.com/apache/flink/pull/1813
  
@subhankarb I think one more pass and we are good to go!


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361089#comment-15361089
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69432240
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
 ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+public class JedisClusterConfigTest extends TestLogger {
+
+   @Test(expected = NullPointerException.class)
+   public void shouldThrowNullPointExceptionIfNodeValueIsNull(){
+   FlinkJedisClusterConfig.Builder builder = new 
FlinkJedisClusterConfig.Builder();
+   builder.build();
+   }
--- End diff --

Add test for new check `isEmpty()`.


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361081#comment-15361081
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69431563
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java
 ---
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class RedisSinkITCase extends RedisITCaseBase {
+
+   private FlinkJedisPoolConfig jedisPoolConfig;
+   private static final Long NUM_ELEMENTS = 20L;
+   private static final String REDIS_KEY = "TEST_KEY";
+   private static final String REDIS_ADDITIONAL_KEY = 
"TEST_ADDITIONAL_KEY";
+
+   StreamExecutionEnvironment env;
+
+
+   private Jedis jedis;
+
+   @Before
+   public void setUp(){
+   jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
+   .setHost(REDIS_HOST)
+   .setPort(REDIS_PORT).build();
+   jedis = new Jedis(REDIS_HOST, REDIS_PORT);
+   env = StreamExecutionEnvironment.getExecutionEnvironment();
+   }
+
+   @Test
+   public void testRedisListDataType() throws Exception {
+   DataStreamSource> source = 
env.addSource(new TestSourceFunction());
+   RedisSink> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisCommandMapper(RedisCommand.LPUSH));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis List Data Type");
+
+   assertEquals(NUM_ELEMENTS, jedis.llen(REDIS_KEY));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisSetDataType() throws Exception {
+   DataStreamSource> source = 
env.addSource(new TestSourceFunction());
+   RedisSink> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisCommandMapper(RedisCommand.SADD));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis Set Data Type");
+
+   assertEquals(NUM_ELEMENTS, jedis.scard(REDIS_KEY));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisHyperLogLogDataType() throws Exception {
+   DataStreamSource> source = 
env.addSource(new TestSourceFunction());
+   RedisSink> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisCommandMapper(RedisCommand.PFADD));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis Hyper Log Log Data Type");
+
+   assertEquals(NUM_ELEMENTS, 
Long.valueOf(jedis.pfcount(REDIS_KEY)));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisSortedSetDataType() throws Exception {

[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361075#comment-15361075
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69430812
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
 ---
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a single Redis server 
or to Redis sentinels
+ * If want to connect to a single Redis server, please use the first 
constructor {@link #RedisContainer(JedisPool)}.
+ * If want to connect to a Redis sentinels, Please use the second 
constructor {@link #RedisContainer(JedisSentinelPool)}
+ */
+public class RedisContainer implements RedisCommandsContainer, Closeable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisContainer.class);
+
+   private JedisPool jedisPool;
+   private JedisSentinelPool jedisSentinelPool;
+
+
+   /**
+* Use this constructor if to connect with single Redis server.
+*
+* @param jedisPool JedisPool which actually manages Jedis instances
+*/
+   public RedisContainer(JedisPool jedisPool) {
+   Preconditions.checkNotNull(jedisPool, "Jedis Pool can not be 
null");
+   this.jedisPool = jedisPool;
+   }
+
+   /**
+* Use this constructor if Redis environment is clustered with 
sentinels.
+*
+* @param sentinelPool SentinelPool which actually manages Jedis 
instances
+*/
+   public RedisContainer(final JedisSentinelPool sentinelPool) {
+   Preconditions.checkNotNull(sentinelPool, "Jedis Sentinel Pool 
can not be null");
+   this.jedisSentinelPool = sentinelPool;
+   }
+
+   /**
+* Closes the Jedis instances.
+*/
+   @Override
+   public void close() throws IOException {
+   if (this.jedisPool != null) {
+   this.jedisPool.close();
+   }
+   if (this.jedisSentinelPool != null) {
+   this.jedisSentinelPool.close();
+   }
+   }
+
+   @Override
+   public void hset(final String key, final String hashField, final String 
value) {
+   Jedis jedis = null;
+   try {
+   jedis = getInstance();
+   jedis.hset(key, hashField, value);
+   } catch (Exception e) {
+   if (LOG.isErrorEnabled()) {
+   LOG.error("Cannot send Redis message with 
command HSET to key {} and hashField {} error message {}",
+   key, hashField, e.getMessage());
+   }
+   throw e;
+   } finally {
+   releaseInstance(jedis);
+   }
+   }
+
+   @Override
+   public void rpush(final String listName, final String value) {
+   Jedis jedis = null;
+   try {
+   jedis = getInstance();
+   jedis.rpush(listName, value);
+   } catch (Exception e) {
+   if (LOG.isErrorEnabled()) {
+   LOG.error("Cannot send Redis message with 
command RPUSH to list {} error message {}",
+ 

[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361071#comment-15361071
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69430592
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
 ---
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a single Redis server 
or to Redis sentinels
+ * If want to connect to a single Redis server, please use the first 
constructor {@link #RedisContainer(JedisPool)}.
+ * If want to connect to a Redis sentinels, Please use the second 
constructor {@link #RedisContainer(JedisSentinelPool)}
+ */
+public class RedisContainer implements RedisCommandsContainer, Closeable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisContainer.class);
+
+   private JedisPool jedisPool;
+   private JedisSentinelPool jedisSentinelPool;
--- End diff --

make both `final` and set to `null` in constructors accordingly.


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361070#comment-15361070
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69430279
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * The container for all available Redis Commands.
--- End diff --

Nit: [c]ommands


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361067#comment-15361067
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69430042
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
 ---
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Protocol;
+
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Sentinel Pool.
+ */
+public class FlinkJedisSentinelConfig extends FlinkJedisConfigBase {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJedisSentinelConfig.class);
+
+   private String masterName;
+   private Set sentinels;
+   private int soTimeout;
+   private String password;
+   private int database;
+
+   /**
+* Jedis Sentinels config.
+* The master name and sentinels are mandatory, and when you didn't set 
these, it throws NullPointerException.
+*
+* @param masterName master name of the replica set
+* @param sentinels set of sentinel hosts
+* @param connectionTimeout timeout connection timeout
+* @param soTimeout timeout socket timeout
+* @param password password, if any
+* @param database database database index
+* @param maxTotal maxTotal the maximum number of objects that can be 
allocated by the pool
+* @param maxIdle the cap on the number of "idle" instances in the pool
+* @param minIdle the minimum number of idle objects to maintain in the 
pool
+*
+* @throws NullPointerException if {@code masterName} or {@code 
sentinels} is {@code null}
+* @throws IllegalArgumentException if {@code sentinels} are empty
+*/
+   private FlinkJedisSentinelConfig(String masterName, Set 
sentinels,
+   int 
connectionTimeout, int soTimeout,
+   String 
password, int database,
+   int 
maxTotal, int maxIdle, int minIdle) {
+   super(connectionTimeout, maxTotal, maxIdle, minIdle);
+   Preconditions.checkNotNull(masterName, "Master name should be 
presented");
+   Preconditions.checkNotNull(sentinels, "Sentinels information 
should be presented");
+   Preconditions.checkArgument(!sentinels.isEmpty(), "Sentinel 
hosts should not be empty");
+
+   this.masterName = masterName;
+   this.sentinels = sentinels;
--- End diff --

Should we copy this `Set` to guard against external modifications?


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361066#comment-15361066
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69429926
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
 ---
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Protocol;
+
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Sentinel Pool.
+ */
+public class FlinkJedisSentinelConfig extends FlinkJedisConfigBase {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkJedisSentinelConfig.class);
+
+   private String masterName;
+   private Set sentinels;
+   private int soTimeout;
+   private String password;
+   private int database;
--- End diff --

`final` to all


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361065#comment-15361065
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69429896
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
 ---
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Protocol;
+
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Sentinel Pool.
--- End diff --

Nit: [p]ool


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361062#comment-15361062
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69429657
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.Protocol;
+
+/**
+ * Configuration for Jedis Pool.
+ */
+public class FlinkJedisPoolConfig extends FlinkJedisConfigBase {
+
+   private static final long serialVersionUID = 1L;
+
+   private String host;
+   private int port;
+   private int database;
+   private String password;
--- End diff --

`final` to all


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361060#comment-15361060
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69429555
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.Protocol;
+
+/**
+ * Configuration for Jedis Pool.
--- End diff --

Nit: [p]ool


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361058#comment-15361058
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69429234
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+
+import java.io.Serializable;
+
+/**
+ * Base class for Flink Redis configuration.
+ */
+public abstract class FlinkJedisConfigBase implements Serializable {
+
+   protected int maxTotal;
+   protected int maxIdle;
+   protected int minIdle;
+   protected int connectionTimeout;
+
+   protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int 
maxIdle, int minIdle){
+   this.connectionTimeout = connectionTimeout;
+   this.maxTotal = maxTotal;
+   this.maxIdle = maxIdle;
+   this.minIdle = minIdle;
+   }
--- End diff --

Should we check for negative values?


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361049#comment-15361049
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69428684
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
 ---
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Protocol;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Cluster.
+ */
+public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
+   private static final long serialVersionUID = 1L;
+
+   private Set nodes;
+   private int maxRedirections;
+
+
+   /**
+* Jedis cluster configuration.
+* The list of node is mandatory, and when nodes is not set, it throws 
NullPointerException.
+*
+* @param nodes list of node information for JedisCluster
+* @param connectionTimeout socket / connection timeout. The default is 
2000
+* @param maxRedirections limit of redirections-how much we'll follow 
MOVED or ASK
+* @param maxTotal the maximum number of objects that can be allocated 
by the pool
+* @param maxIdle the cap on the number of "idle" instances in the pool
+* @param minIdle the minimum number of idle objects to maintain in the 
pool
+* @throws NullPointerException if parameter {@code nodes} is {@code 
null}
+*/
+   private FlinkJedisClusterConfig(Set nodes, int 
connectionTimeout, int maxRedirections,
+   int 
maxTotal, int maxIdle, int minIdle) {
+   super(connectionTimeout, maxTotal, maxIdle, minIdle);
+
+   Preconditions.checkNotNull(nodes, "Node information should be 
presented");
--- End diff --

Should we throw on `nodes.isEmpty()` ?
Maybe also make a copy of the `Set` to ensure it cannot be altered from 
outside?


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361045#comment-15361045
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69428463
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
 ---
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Protocol;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Cluster.
+ */
+public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
+   private static final long serialVersionUID = 1L;
+
+   private Set nodes;
+   private int maxRedirections;
--- End diff --

`final`


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361044#comment-15361044
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69428451
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
 ---
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Protocol;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Cluster.
+ */
+public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
+   private static final long serialVersionUID = 1L;
+
+   private Set nodes;
--- End diff --

`final`


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361041#comment-15361041
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69428276
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
 ---
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Protocol;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Cluster.
--- End diff --

Nit: [c]luster


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361038#comment-15361038
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69428038
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ *  The sink takes two arguments {@link FlinkJedisConfigBase} and 
{@link RedisMapper}.
+ *  When {@link FlinkJedisPoolConfig} is passed as the first argument,
+ * the sink will create connection using {@link 
redis.clients.jedis.JedisPool}. Please use this when
+ * you want to connect to a single Redis server.
+ *  When {@link FlinkJedisSentinelConfig} is passed as the first 
argument, the sink will create connection
+ * using {@link redis.clients.jedis.JedisSentinelPool}. Please use this 
when you want to connect to Sentinel.
+ *  Please use {@link FlinkJedisClusterConfig} as the first argument if 
you want to connect to
+ * a Redis Cluster.
+ *
+ * Example:
+ *
+ * 
+ *{@code
+ *public static class RedisExampleMapper implements 
RedisMapper> {
+ *
+ * private RedisCommand redisCommand;
+ *
+ * public RedisAdditionalDataMapper(RedisCommand redisCommand){
+ * this.redisCommand = redisCommand;
+ * }
+ * public RedisCommandDescription getCommandDescription() {
+ * return new RedisCommandDescription(redisCommand, 
REDIS_ADDITIONAL_KEY);
+ * }
+ * public String getKeyFromData(Tuple2 data) {
+ * return data.f0;
+ * }
+ * public String getValueFromData(Tuple2 data) {
+ * return data.f1;
+ * }
+ *}
+ *JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
+ *.setHost(REDIS_HOST).setPort(REDIS_PORT).build();
+ *new RedisSink(jedisPoolConfig, new 
RedisExampleDataMapper(RedisCommand.LPUSH));
+ *}
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class RedisSink extends RichSinkFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisSink.class);
+
+   /**
+* This additional key needed for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET}.
+* Other {@link RedisDataType} works only with two variable i.e. name 
of the list and value to be added.
+* But for {@link RedisDataType#HASH} and {@link 
Re

[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361037#comment-15361037
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69427880
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ *  The sink takes two arguments {@link FlinkJedisConfigBase} and 
{@link RedisMapper}.
+ *  When {@link FlinkJedisPoolConfig} is passed as the first argument,
+ * the sink will create connection using {@link 
redis.clients.jedis.JedisPool}. Please use this when
+ * you want to connect to a single Redis server.
+ *  When {@link FlinkJedisSentinelConfig} is passed as the first 
argument, the sink will create connection
+ * using {@link redis.clients.jedis.JedisSentinelPool}. Please use this 
when you want to connect to Sentinel.
+ *  Please use {@link FlinkJedisClusterConfig} as the first argument if 
you want to connect to
+ * a Redis Cluster.
+ *
+ * Example:
+ *
+ * 
+ *{@code
+ *public static class RedisExampleMapper implements 
RedisMapper> {
+ *
+ * private RedisCommand redisCommand;
+ *
+ * public RedisAdditionalDataMapper(RedisCommand redisCommand){
+ * this.redisCommand = redisCommand;
+ * }
+ * public RedisCommandDescription getCommandDescription() {
+ * return new RedisCommandDescription(redisCommand, 
REDIS_ADDITIONAL_KEY);
+ * }
+ * public String getKeyFromData(Tuple2 data) {
+ * return data.f0;
+ * }
+ * public String getValueFromData(Tuple2 data) {
+ * return data.f1;
+ * }
+ *}
+ *JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
+ *.setHost(REDIS_HOST).setPort(REDIS_PORT).build();
+ *new RedisSink(jedisPoolConfig, new 
RedisExampleDataMapper(RedisCommand.LPUSH));
+ *}
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class RedisSink extends RichSinkFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisSink.class);
+
+   /**
+* This additional key needed for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET}.
+* Other {@link RedisDataType} works only with two variable i.e. name 
of the list and value to be added.
+* But for {@link RedisDataType#HASH} and {@link 
Re

[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361034#comment-15361034
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69427481
  
--- Diff: docs/apis/streaming/connectors/redis.md ---
@@ -0,0 +1,177 @@
+---
+title: "Redis Connector"
+
+# Sub-level navigation
+sub-nav-group: streaming
+sub-nav-parent: connectors
+sub-nav-pos: 6
+sub-nav-title: Redis
+---
+
+
+This connector provides a Sink that can write to
+[Redis](http://redis.io/) and also can publish data to [Redis 
PubSub](http://redis.io/topics/pubsub). To use this connector, add the
+following dependency to your project:
+{% highlight xml %}
+
+  org.apache.flink
+  flink-connector-redis{{ site.scala_version_suffix 
}}
+  {{site.version }}
+
+{% endhighlight %}
+Version Compatibility: This module is compatible with redis 2.8.5 .
+
+Note that the streaming connectors are currently not part of the binary 
distribution. See linking with them for cluster execution 
[here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
+
+ Installing Redis
+Follow the instructions from the [Redis download 
page](http://redis.io/download).
+
+ Redis Sink
+A class providing an interface for sending data to Redis. 
+The sink can use three different methods for communicating with different 
type of Redis environments:
+1. Single Redis Server
+2. Redis Cluster
+3. Redis Sentinel
+
+This code shows how to create a sink that communicate to a single redis 
server:
+
+
+
+{% highlight java %}
+public static class RedisExampleMapper implements 
RedisMapper>{
+
+@Override
+public RedisCommandDescription getCommandDescription() {
+return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME");
+}
+
+@Override
+public String getKeyFromData(Tuple2 data) {
+return data.f0;
+}
+
+@Override
+public String getValueFromData(Tuple2 data) {
+return data.f1;
+}
+}
+FlinkJedisPoolConfig conf = new 
FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();
+
+DataStream stream = ...;
+stream.addSink(new RedisSink>(conf, new 
RedisExampleMapper());
+{% endhighlight %}
+
+
+{% highlight scala %}
+class RedisExampleMapper extends RedisMapper[(String, String)]{
+  override def getCommandDescription: RedisCommandDescription = {
+new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME")
+  }
+
+  override def getKeyFromData(data: (String, String)): String = data._1
+
+  override def getValueFromData(data: (String, String)): String = data._2
+}
+val conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build()
+stream.addSink(new RedisSink[(String, String)](conf, new 
RedisExampleMapper))
+{% endhighlight %}
+
+
+
+This example code does the same, but for Redis Cluster:
+
+
+
+{% highlight java %}
+
+FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
+.setNodes(new HashSet(Arrays.asList(new 
InetSocketAddress(5601.build();
+
+DataStream stream = ...;
+stream.addSink(new RedisSink>(conf, new 
RedisExampleMapper());
+{% endhighlight %}
+
+
+{% highlight scala %}
+val conf = new FlinkJedisPoolConfig.Builder().setNodes(...).build()
+stream.addSink(new RedisSink[(String, String)](conf, new 
RedisExampleMapper))
+{% endhighlight %}
+
+
+
+This example shows when the Redis environment is with Sentinels:
+
+
+
+{% highlight java %}
+
+FlinkJedisSentinelConfig conf = new FlinkJedisSentinelConfig.Builder()
+.setMasterName("master").setSentinels(...).build();
+
+DataStream stream = ...;
+stream.addSink(new RedisSink>(conf, new 
RedisExampleMapper());
+{% endhighlight %}
+
+
+{% highlight scala %}
+val conf = new 
FlinkJedisSentinelConfig.Builder().setMasterName("master").setSentinels(...).build()
+stream.addSink(new RedisSink[(String, String)](conf, new 
RedisExampleMapper))
+{% endhighlight %}
+
+
+
+This section gives a description of all the available data types and what 
redis command used for that.
+
+
+
+
+  Data Type
+  Redis Command 
[Sink]
+  Redis Command 
[Source]
+
+  
+  
+
+HASHhttp://redis.io/commands/hset";> 
HSET--NA--
+
+
+L

[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361033#comment-15361033
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69427444
  
--- Diff: docs/apis/streaming/connectors/redis.md ---
@@ -0,0 +1,177 @@
+---
+title: "Redis Connector"
+
+# Sub-level navigation
+sub-nav-group: streaming
+sub-nav-parent: connectors
+sub-nav-pos: 6
+sub-nav-title: Redis
+---
+
+
+This connector provides a Sink that can write to
+[Redis](http://redis.io/) and also can publish data to [Redis 
PubSub](http://redis.io/topics/pubsub). To use this connector, add the
+following dependency to your project:
+{% highlight xml %}
+
+  org.apache.flink
+  flink-connector-redis{{ site.scala_version_suffix 
}}
+  {{site.version }}
+
+{% endhighlight %}
+Version Compatibility: This module is compatible with redis 2.8.5 .
+
+Note that the streaming connectors are currently not part of the binary 
distribution. See linking with them for cluster execution 
[here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
+
+ Installing Redis
+Follow the instructions from the [Redis download 
page](http://redis.io/download).
+
+ Redis Sink
+A class providing an interface for sending data to Redis. 
+The sink can use three different methods for communicating with different 
type of Redis environments:
+1. Single Redis Server
+2. Redis Cluster
+3. Redis Sentinel
+
+This code shows how to create a sink that communicate to a single redis 
server:
+
+
+
+{% highlight java %}
+public static class RedisExampleMapper implements 
RedisMapper>{
+
+@Override
+public RedisCommandDescription getCommandDescription() {
+return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME");
+}
+
+@Override
+public String getKeyFromData(Tuple2 data) {
+return data.f0;
+}
+
+@Override
+public String getValueFromData(Tuple2 data) {
+return data.f1;
+}
+}
+FlinkJedisPoolConfig conf = new 
FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();
+
+DataStream stream = ...;
+stream.addSink(new RedisSink>(conf, new 
RedisExampleMapper());
+{% endhighlight %}
+
+
+{% highlight scala %}
+class RedisExampleMapper extends RedisMapper[(String, String)]{
+  override def getCommandDescription: RedisCommandDescription = {
+new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME")
+  }
+
+  override def getKeyFromData(data: (String, String)): String = data._1
+
+  override def getValueFromData(data: (String, String)): String = data._2
+}
+val conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build()
+stream.addSink(new RedisSink[(String, String)](conf, new 
RedisExampleMapper))
+{% endhighlight %}
+
+
+
+This example code does the same, but for Redis Cluster:
+
+
+
+{% highlight java %}
+
+FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
+.setNodes(new HashSet(Arrays.asList(new 
InetSocketAddress(5601.build();
+
+DataStream stream = ...;
+stream.addSink(new RedisSink>(conf, new 
RedisExampleMapper());
+{% endhighlight %}
+
+
+{% highlight scala %}
+val conf = new FlinkJedisPoolConfig.Builder().setNodes(...).build()
+stream.addSink(new RedisSink[(String, String)](conf, new 
RedisExampleMapper))
+{% endhighlight %}
+
+
+
+This example shows when the Redis environment is with Sentinels:
+
+
+
+{% highlight java %}
+
+FlinkJedisSentinelConfig conf = new FlinkJedisSentinelConfig.Builder()
+.setMasterName("master").setSentinels(...).build();
+
+DataStream stream = ...;
+stream.addSink(new RedisSink>(conf, new 
RedisExampleMapper());
+{% endhighlight %}
+
+
+{% highlight scala %}
+val conf = new 
FlinkJedisSentinelConfig.Builder().setMasterName("master").setSentinels(...).build()
+stream.addSink(new RedisSink[(String, String)](conf, new 
RedisExampleMapper))
+{% endhighlight %}
+
+
+
+This section gives a description of all the available data types and what 
redis command used for that.
+
+
+
+
+  Data Type
+  Redis Command 
[Sink]
+  Redis Command 
[Source]
+
+  
+  
+
+HASHhttp://redis.io/commands/hset";> 
HSET--NA--
--- End diff --

Nit: remove blank

[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361032#comment-15361032
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69427347
  
--- Diff: docs/apis/streaming/connectors/redis.md ---
@@ -0,0 +1,177 @@
+---
+title: "Redis Connector"
+
+# Sub-level navigation
+sub-nav-group: streaming
+sub-nav-parent: connectors
+sub-nav-pos: 6
+sub-nav-title: Redis
+---
+
+
+This connector provides a Sink that can write to
+[Redis](http://redis.io/) and also can publish data to [Redis 
PubSub](http://redis.io/topics/pubsub). To use this connector, add the
+following dependency to your project:
+{% highlight xml %}
+
+  org.apache.flink
+  flink-connector-redis{{ site.scala_version_suffix 
}}
+  {{site.version }}
+
+{% endhighlight %}
+Version Compatibility: This module is compatible with redis 2.8.5 .
+
+Note that the streaming connectors are currently not part of the binary 
distribution. See linking with them for cluster execution 
[here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
+
+ Installing Redis
+Follow the instructions from the [Redis download 
page](http://redis.io/download).
+
+ Redis Sink
+A class providing an interface for sending data to Redis. 
+The sink can use three different methods for communicating with different 
type of Redis environments:
+1. Single Redis Server
+2. Redis Cluster
+3. Redis Sentinel
+
+This code shows how to create a sink that communicate to a single redis 
server:
+
+
+
+{% highlight java %}
+public static class RedisExampleMapper implements 
RedisMapper>{
+
+@Override
+public RedisCommandDescription getCommandDescription() {
+return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME");
+}
+
+@Override
+public String getKeyFromData(Tuple2 data) {
+return data.f0;
+}
+
+@Override
+public String getValueFromData(Tuple2 data) {
+return data.f1;
+}
+}
+FlinkJedisPoolConfig conf = new 
FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();
+
+DataStream stream = ...;
+stream.addSink(new RedisSink>(conf, new 
RedisExampleMapper());
+{% endhighlight %}
+
+
+{% highlight scala %}
+class RedisExampleMapper extends RedisMapper[(String, String)]{
+  override def getCommandDescription: RedisCommandDescription = {
+new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME")
+  }
+
+  override def getKeyFromData(data: (String, String)): String = data._1
+
+  override def getValueFromData(data: (String, String)): String = data._2
+}
+val conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build()
+stream.addSink(new RedisSink[(String, String)](conf, new 
RedisExampleMapper))
+{% endhighlight %}
+
+
+
+This example code does the same, but for Redis Cluster:
+
+
+
+{% highlight java %}
+
+FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
+.setNodes(new HashSet(Arrays.asList(new 
InetSocketAddress(5601.build();
+
+DataStream stream = ...;
+stream.addSink(new RedisSink>(conf, new 
RedisExampleMapper());
+{% endhighlight %}
+
+
+{% highlight scala %}
+val conf = new FlinkJedisPoolConfig.Builder().setNodes(...).build()
+stream.addSink(new RedisSink[(String, String)](conf, new 
RedisExampleMapper))
+{% endhighlight %}
+
+
+
+This example shows when the Redis environment is with Sentinels:
+
+
+
+{% highlight java %}
+
+FlinkJedisSentinelConfig conf = new FlinkJedisSentinelConfig.Builder()
+.setMasterName("master").setSentinels(...).build();
+
+DataStream stream = ...;
+stream.addSink(new RedisSink>(conf, new 
RedisExampleMapper());
+{% endhighlight %}
+
+
+{% highlight scala %}
+val conf = new 
FlinkJedisSentinelConfig.Builder().setMasterName("master").setSentinels(...).build()
+stream.addSink(new RedisSink[(String, String)](conf, new 
RedisExampleMapper))
+{% endhighlight %}
+
+
+
+This section gives a description of all the available data types and what 
redis command used for that.
--- End diff --

[R]edis


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors

[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361028#comment-15361028
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69427102
  
--- Diff: docs/apis/streaming/connectors/redis.md ---
@@ -0,0 +1,177 @@
+---
+title: "Redis Connector"
+
+# Sub-level navigation
+sub-nav-group: streaming
+sub-nav-parent: connectors
+sub-nav-pos: 6
+sub-nav-title: Redis
+---
+
+
+This connector provides a Sink that can write to
+[Redis](http://redis.io/) and also can publish data to [Redis 
PubSub](http://redis.io/topics/pubsub). To use this connector, add the
+following dependency to your project:
+{% highlight xml %}
+
+  org.apache.flink
+  flink-connector-redis{{ site.scala_version_suffix 
}}
+  {{site.version }}
+
+{% endhighlight %}
+Version Compatibility: This module is compatible with redis 2.8.5 .
+
+Note that the streaming connectors are currently not part of the binary 
distribution. See linking with them for cluster execution 
[here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
--- End diff --

Style: Replace "See linking with them..." by "You need to link them for 
cluster execution explicitly." -- with "explicitly" as the link, ie, it 
replaces "here".


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361027#comment-15361027
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69426825
  
--- Diff: docs/apis/streaming/connectors/redis.md ---
@@ -0,0 +1,177 @@
+---
+title: "Redis Connector"
+
+# Sub-level navigation
+sub-nav-group: streaming
+sub-nav-parent: connectors
+sub-nav-pos: 6
+sub-nav-title: Redis
+---
+
+
+This connector provides a Sink that can write to
+[Redis](http://redis.io/) and also can publish data to [Redis 
PubSub](http://redis.io/topics/pubsub). To use this connector, add the
+following dependency to your project:
+{% highlight xml %}
+
+  org.apache.flink
+  flink-connector-redis{{ site.scala_version_suffix 
}}
+  {{site.version }}
+
+{% endhighlight %}
+Version Compatibility: This module is compatible with redis 2.8.5 .
--- End diff --

Nit. Should there be a whitespace after the version number?


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15360878#comment-15360878
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/1813
  
Hi @subhankarb,
Thank you for addressing our comments. I have some last nit-pick comments 
to make the code just a little better, otherwise the changes LGTM :)
Perhaps we should also wait for @mjsax to give another review on the last 
changes too, before you get on to addressing the latest comments.


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15360868#comment-15360868
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69410536
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+
+import java.io.Serializable;
+
+/**
+ * Base class for Flink Redis configuration.
+ */
+public abstract class FlinkJedisConfigBase implements Serializable {
+
+   protected int maxTotal;
+   protected int maxIdle;
+   protected int minIdle;
+   protected int connectionTimeout;
--- End diff --

nit: I would add `final` modifier to these, we don't seem to want to be 
changing them after the user creates the config.


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15360861#comment-15360861
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69410042
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+
+import java.io.Serializable;
+
+/**
+ * Base class for Flink Redis configuration.
+ */
+public abstract class FlinkJedisConfigBase implements Serializable {
--- End diff --

This is missing a `serialVersionUID `.


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15360854#comment-15360854
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69409744
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ *  The sink takes two arguments {@link FlinkJedisConfigBase} and 
{@link RedisMapper}.
+ *  When {@link FlinkJedisPoolConfig} is passed as the first argument,
+ * the sink will create connection using {@link 
redis.clients.jedis.JedisPool}. Please use this when
+ * you want to connect to a single Redis server.
+ *  When {@link FlinkJedisSentinelConfig} is passed as the first 
argument, the sink will create connection
+ * using {@link redis.clients.jedis.JedisSentinelPool}. Please use this 
when you want to connect to Sentinel.
+ *  Please use {@link FlinkJedisClusterConfig} as the first argument if 
you want to connect to
+ * a Redis Cluster.
+ *
+ * Example:
+ *
+ * 
+ *{@code
+ *public static class RedisExampleMapper implements 
RedisMapper> {
+ *
+ * private RedisCommand redisCommand;
+ *
+ * public RedisAdditionalDataMapper(RedisCommand redisCommand){
+ * this.redisCommand = redisCommand;
+ * }
+ * public RedisCommandDescription getCommandDescription() {
+ * return new RedisCommandDescription(redisCommand, 
REDIS_ADDITIONAL_KEY);
+ * }
+ * public String getKeyFromData(Tuple2 data) {
+ * return data.f0;
+ * }
+ * public String getValueFromData(Tuple2 data) {
+ * return data.f1;
+ * }
+ *}
+ *JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
+ *.setHost(REDIS_HOST).setPort(REDIS_PORT).build();
+ *new RedisSink(jedisPoolConfig, new 
RedisExampleDataMapper(RedisCommand.LPUSH));
--- End diff --

This part in the Javadoc seems to be wrong too.


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15360846#comment-15360846
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69409336
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
 ---
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Protocol;
+
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Cluster.
+ */
+public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
+   private static final long serialVersionUID = 1L;
+
+   private Set nodes;
+   private int maxRedirections;
+
+
+   /**
+* Jedis cluster configuration.
+* The list of node is mandatory, and when nodes is not set, it throws 
NullPointerException.
+*
+* @param nodes list of node information for JedisCluster
+* @param connectionTimeout socket / connection timeout. The default is 
2000
+* @param maxRedirections limit of redirections-how much we'll follow 
MOVED or ASK
+* @param maxTotal the maximum number of objects that can be allocated 
by the pool
+* @param maxIdle the cap on the number of "idle" instances in the pool
+* @param minIdle the minimum number of idle objects to maintain in the 
pool
+* @throws NullPointerException if parameter {@code nodes} is {@code 
null}
+*/
+   private FlinkJedisClusterConfig(Set nodes, int 
connectionTimeout, int maxRedirections,
+   int 
maxTotal, int maxIdle, int minIdle) {
+   super(connectionTimeout, maxTotal, maxIdle, minIdle);
+
+   Preconditions.checkNotNull(nodes, "Node information should be 
presented");
+   this.nodes = nodes;
+   this.maxRedirections = maxRedirections;
+   }
+
+
+
+   /**
+* Returns nodes.
+*
+* @return list of node information
+*/
+   public Set getNodes() {
+   Set ret = new HashSet<>();
+   for (InetSocketAddress node : nodes) {
+   ret.add(new HostAndPort(node.getHostName(), 
node.getPort()));
+   }
+   return ret;
+   }
+
+   /**
+* Returns limit of redirection.
+*
+* @return limit of redirection
+*/
+   public int getMaxRedirections() {
+   return maxRedirections;
+   }
+
+
+   /**
+* Builder for initializing  {@link FlinkJedisClusterConfig}.
+*/
+   public static class Builder {
--- End diff --

There also seems to be duplicate code for the three `Builder`s.
Can we consolidate the commonly shared code of the `Builder`s into 
`FlinkConfigConsumerBase` too? 


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15360842#comment-15360842
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69409088
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+/**
+ * The builder for {@link RedisCommandsContainer}.
+ */
+public class RedisCommandsContainerBuilder {
+
+   /**
+* Builds container for single Redis environment.
+*
+* @param jedisPoolConfig configuration for JedisPool
+* @return container for single Redis environment
+* @throws NullPointerException if jedisPoolConfig is null
+*/
+   public static RedisCommandsContainer build(FlinkJedisPoolConfig 
jedisPoolConfig) {
--- End diff --

Perhaps the `build` functions can make use of the `FlinkJedisConfigBase` 
too?
This way `RedisSink#open()` can be even more simpler.


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15360838#comment-15360838
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69408897
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ *  The sink takes two arguments {@link FlinkJedisConfigBase} and 
{@link RedisMapper}.
+ *  When {@link FlinkJedisPoolConfig} is passed as the first argument,
+ * the sink will create connection using {@link 
redis.clients.jedis.JedisPool}. Please use this when
+ * you want to connect to a single Redis server.
+ *  When {@link FlinkJedisSentinelConfig} is passed as the first 
argument, the sink will create connection
+ * using {@link redis.clients.jedis.JedisSentinelPool}. Please use this 
when you want to connect to Sentinel.
+ *  Please use {@link FlinkJedisClusterConfig} as the first argument if 
you want to connect to
+ * a Redis Cluster.
+ *
+ * Example:
+ *
+ * 
+ *{@code
+ *public static class RedisExampleMapper implements 
RedisMapper> {
+ *
+ * private RedisCommand redisCommand;
+ *
+ * public RedisAdditionalDataMapper(RedisCommand redisCommand){
+ * this.redisCommand = redisCommand;
+ * }
+ * public RedisCommandDescription getCommandDescription() {
+ * return new RedisCommandDescription(redisCommand, 
REDIS_ADDITIONAL_KEY);
+ * }
+ * public String getKeyFromData(Tuple2 data) {
+ * return data.f0;
+ * }
+ * public String getValueFromData(Tuple2 data) {
+ * return data.f1;
+ * }
+ *}
+ *JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
+ *.setHost(REDIS_HOST).setPort(REDIS_PORT).build();
+ *new RedisSink(jedisPoolConfig, new 
RedisExampleDataMapper(RedisCommand.LPUSH));
+ *}
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class RedisSink extends RichSinkFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisSink.class);
+
+   /**
+* This additional key needed for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET}.
+* Other {@link RedisDataType} works only with two variable i.e. name 
of the list and value to be added.
+* But for {@link RedisDataType#HASH} and {@link 

[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15360835#comment-15360835
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69408867
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ *  The sink takes two arguments {@link FlinkJedisConfigBase} and 
{@link RedisMapper}.
+ *  When {@link FlinkJedisPoolConfig} is passed as the first argument,
+ * the sink will create connection using {@link 
redis.clients.jedis.JedisPool}. Please use this when
+ * you want to connect to a single Redis server.
+ *  When {@link FlinkJedisSentinelConfig} is passed as the first 
argument, the sink will create connection
+ * using {@link redis.clients.jedis.JedisSentinelPool}. Please use this 
when you want to connect to Sentinel.
+ *  Please use {@link FlinkJedisClusterConfig} as the first argument if 
you want to connect to
+ * a Redis Cluster.
+ *
+ * Example:
+ *
+ * 
+ *{@code
+ *public static class RedisExampleMapper implements 
RedisMapper> {
+ *
+ * private RedisCommand redisCommand;
+ *
+ * public RedisAdditionalDataMapper(RedisCommand redisCommand){
+ * this.redisCommand = redisCommand;
+ * }
--- End diff --

Wrong constructor name in the example in the Javadoc?


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15360821#comment-15360821
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69407871
  
--- Diff: docs/apis/streaming/connectors/redis.md ---
@@ -0,0 +1,177 @@
+---
+title: "Redis Connector"
+
+# Sub-level navigation
+sub-nav-group: streaming
+sub-nav-parent: connectors
+sub-nav-pos: 6
+sub-nav-title: Redis
+---
+
+
+This connector provides a Sink that can write to
+[Redis](http://redis.io/) and also can publish data to [Redis 
PubSub](http://redis.io/topics/pubsub). To use this connector, add the
+following dependency to your project:
+{% highlight xml %}
+
+  org.apache.flink
+  flink-connector-redis{{ site.scala_version_suffix 
}}
+  {{site.version }}
+
+{% endhighlight %}
+Version Compatibility: This module is compatible with redis 2.8.5 .
--- End diff --

"redis", R should always be capitalized


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15359975#comment-15359975
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69372418
  
--- Diff: flink-streaming-connectors/flink-connector-redis/pom.xml ---
@@ -0,0 +1,86 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-streaming-connectors
+   1.1-SNAPSHOT
+   ..
+   
+
+   flink-connector-redis_2.10
+   flink-connector-redis
+
+   jar
+
+   
+   2.8.0
--- End diff --

added in doc.


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15359974#comment-15359974
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69372416
  
--- Diff: flink-streaming-connectors/flink-connector-redis/pom.xml ---
@@ -0,0 +1,86 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-streaming-connectors
+   1.1-SNAPSHOT
+   ..
+   
+
+   flink-connector-redis_2.10
+   flink-connector-redis
+
+   jar
+
+   
+   2.8.0
+   
+
+   
+   
+   org.apache.flink
+   flink-streaming-java_2.10
+   ${project.version}
--- End diff --

done.


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-07-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15358660#comment-15358660
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69266892
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a Redis cluster.
+ */
+public class RedisClusterContainer implements RedisCommandsContainer, 
Closeable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisClusterContainer.class);
+
+   private JedisCluster jedisCluster;
+
+   /**
+* Initialize Redis command container for Redis cluster.
+*
+* @param jedisCluster JedisCluster instance
+*/
+   public RedisClusterContainer(JedisCluster jedisCluster) {
+   Preconditions.checkNotNull(jedisCluster, "Jedis cluster can not 
be null");
+
+   this.jedisCluster = jedisCluster;
+   }
+
+   @Override
+   public void hset(final String key, final String hashField, final String 
value) {
+   try {
+   jedisCluster.hset(key, hashField, value);
+   } catch (Exception e) {
--- End diff --

From the look of the Jedis code, all exceptions thrown by Jedis are 
`RuntimeException`s:

https://github.com/xetorthio/jedis/tree/master/src/main/java/redis/clients/jedis/exceptions,
 so the API isn't informing to expect any exceptions to catch.
I think for this first version on the Redis Sink, we should simply fail 
(and log the failure) by rethrowing the exception. For users I think it'd be 
better to offer "simply log failure and not hard fail" as a user option, like 
what the Kafka producer does (we can perhaps add this functionality later on). 


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15358383#comment-15358383
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69247416
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a Redis cluster.
+ */
+public class RedisClusterContainer implements RedisCommandsContainer, 
Closeable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisClusterContainer.class);
+
+   private JedisCluster jedisCluster;
+
+   /**
+* Initialize Redis command container for Redis cluster.
+*
+* @param jedisCluster JedisCluster instance
+*/
+   public RedisClusterContainer(JedisCluster jedisCluster) {
+   Preconditions.checkNotNull(jedisCluster, "Jedis cluster can not 
be null");
+
+   this.jedisCluster = jedisCluster;
+   }
+
+   @Override
+   public void hset(final String key, final String hashField, final String 
value) {
+   try {
+   jedisCluster.hset(key, hashField, value);
+   } catch (Exception e) {
--- End diff --

I am quite unsure what to do. Jedis did not implemented any checked or 
unchecked exception. Should we go to the same line or log it and re-throw the 
exception. @mjsax, @tzulitai what do you think?


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15358366#comment-15358366
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69246112
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
 ---
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+
+/**
+ * Configuration for Jedis Pool.
+ */
+public class FlinkJedisPoolConfig implements Serializable {
--- End diff --

@tzulitai `soTimeout` is extra config for `JedisSentinel`. Other common 
configs i am moving to base class. Really like it. thank u.


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357270#comment-15357270
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/1813
  
@subhankarb We should also add Redis Sink to the fault tolerance guarantee 
table for the connectors in the documentation. It can be found at 
`flink/docs/api/streaming/fault_tolerance.md`.


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357253#comment-15357253
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69152556
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
 ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+/**
+ * All available commands for Redis. Each command belongs to a {@link 
RedisDataType} group.
+ */
+public enum RedisCommand {
+
+   /**
+* Insert the specified value at the head of the list stored at key.
+* If key does not exist, it is created as empty list before performing 
the push operations.
+*/
+   LPUSH(RedisDataType.LIST),
+   /**
+* Insert the specified value at the tail of the list stored at key.
+* If key does not exist, it is created as empty list before performing 
the push operation.
+*/
+   RPUSH(RedisDataType.LIST),
+
+   /**
+* Add the specified member to the set stored at key.
+* Specified member that is already a member of this set is ignored.
+*/
+   SADD(RedisDataType.SET),
+
+   /**
+* Set key to hold the string value. If key already holds a value,
+* it is overwritten, regardless of its type.
+*/
+   SET(RedisDataType.STRING),
+
+   /**
+* Adds the element to the HyperLogLog data structure stored at the 
variable name specified as first argument.
+*/
+   PFADD(RedisDataType.HYPER_LOG_LOG),
+
+   /**
+* Posts a message to the given channel.
+*/
+   PUBLISH(RedisDataType.PUBSUB),
+
+   /**
+* Adds the specified members with the specified score to the sorted 
set stored at key.
+*/
+   ZADD(RedisDataType.SORTED_SET),
+
+   /**
+* Sets field in the hash stored at key to value. If key does not exist,
+* a new key holding a hash is created. If field already exists in the 
hash, it is overwritten.
+*/
+   HSET(RedisDataType.HASH);
+
+   /**
+* The {@link RedisDataType} this command belongs to
+*/
+   private RedisDataType redisDataType;
+
+   RedisCommand(RedisDataType redisDataType) {
+   this.redisDataType = redisDataType;
+   }
+
+
+   public boolean isInRedisDataType(RedisDataType redisDataType) {
+   return this.redisDataType == redisDataType;
+   }
+
+   /**
+* The {@link RedisDataType} this command belongs to.
+* @return the {@link RedisDataType}
+ */
--- End diff --

Indentation error: should be tab followed by a space


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357245#comment-15357245
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/1813
  
Hi @mjsax @subhankarb,
Gave a first run-through on the code, please let me know if you have any 
questions on the comments. I've also tested the connector again, on a local 
single-node & cluster installation, seems to work fine. 


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357233#comment-15357233
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69151056
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ * When creating the sink using first constructor {@link 
#RedisSink(FlinkJedisPoolConfig, RedisMapper)}
+ * the sink will create connection using {@link 
redis.clients.jedis.JedisPool}.
+ * When using second constructor {@link 
#RedisSink(FlinkJedisSentinelConfig, RedisMapper)} the sink will create 
connection
+ * using {@link redis.clients.jedis.JedisSentinelPool} to Redis cluster. 
Use this if Redis is
+ * configured using sentinels else use the third constructor {@link 
#RedisSink(FlinkJedisClusterConfig, RedisMapper)}
+ * which use {@link redis.clients.jedis.JedisCluster} to connect to Redis 
cluster.
+ *
+ * Example:
+ *
+ * 
+ *{@code
+ *public static class RedisExampleMapper implements 
RedisMapper> {
+ *
+ * private RedisCommand redisCommand;
+ *
+ * public RedisAdditionalDataMapper(RedisCommand redisCommand){
+ * this.redisCommand = redisCommand;
+ * }
+ * public RedisDataTypeDescription getDataTypeDescription() {
+ * return new RedisDataTypeDescription(redisCommand, 
REDIS_ADDITIONAL_KEY);
+ * }
+ * public String getKeyFromData(Tuple2 data) {
+ * return data.f0;
+ * }
+ * public String getValueFromData(Tuple2 data) {
+ * return data.f1;
+ * }
+ *}
+ *JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
+ *.setHost(REDIS_HOST).setPort(REDIS_PORT).build();
+ *new RedisSink(jedisPoolConfig, new 
RedisExampleDataMapper(RedisCommand.LPUSH));
+ *}
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class RedisSink extends RichSinkFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisSink.class);
+
+   /**
+* This additional key needed for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET}.
+* Other {@link RedisDataType} works only with two variable i.e. name 
of the list and value to be added.
+* But for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET} we need three variables.
+* For {@link RedisDataType#HASH} we need hash name, hash key and 
element.

[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357230#comment-15357230
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69150728
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
 ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+/**
+ * All available commands for Redis. Each command belongs to a {@link 
RedisDataType} group.
+ */
+public enum RedisCommand {
+
+   /**
+* Insert the specified value at the head of the list stored at key.
+* If key does not exist, it is created as empty list before performing 
the push operations.
+*/
+   LPUSH(RedisDataType.LIST),
+   /**
+* Insert the specified value at the tail of the list stored at key.
+* If key does not exist, it is created as empty list before performing 
the push operation.
+*/
+   RPUSH(RedisDataType.LIST),
+
+   /**
+* Add the specified member to the set stored at key.
+* Specified member that is already a member of this set is ignored.
+*/
+   SADD(RedisDataType.SET),
+
+   /**
+* Set key to hold the string value. If key already holds a value,
+* it is overwritten, regardless of its type.
+*/
+   SET(RedisDataType.STRING),
+
+   /**
+* Adds the element to the HyperLogLog data structure stored at the 
variable name specified as first argument.
+*/
+   PFADD(RedisDataType.HYPER_LOG_LOG),
+
+   /**
+* Posts a message to the given channel.
+*/
+   PUBLISH(RedisDataType.PUBSUB),
+
+   /**
+* Adds the specified members with the specified score to the sorted 
set stored at key.
+*/
+   ZADD(RedisDataType.SORTED_SET),
+
+   /**
+* Sets field in the hash stored at key to value. If key does not exist,
+* a new key holding a hash is created. If field already exists in the 
hash, it is overwritten.
+*/
+   HSET(RedisDataType.HASH);
+
+   /**
+* The {@link RedisDataType} this command belongs to
+*/
+   private RedisDataType redisDataType;
+
+   RedisCommand(RedisDataType redisDataType) {
+   this.redisDataType = redisDataType;
+   }
+
+
+   public boolean isInRedisDataType(RedisDataType redisDataType) {
+   return this.redisDataType == redisDataType;
+   }
+
+   /**
+* The {@link RedisDataType} this command belongs to.
+* @return the {@link RedisDataType}
+ */
+   public RedisDataType getRedisDataType(){
--- End diff --

This method isn't used anywhere in the code apart from tests.
Perhaps we can drop the `isInRedisDataType` method, and use 
`getRedisDataType` in `RedisCommandDescription` to do the `HASH` / `SORTED_SET` 
check?


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357221#comment-15357221
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69149847
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java
 ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+/**
+ * All available commands for Redis. Each command belongs to a {@link 
RedisDataType} group.
+ */
+public enum RedisCommand {
--- End diff --

From the code, it seems unclear why we are wrapping each enum value of 
`RedisCommand` around a `RedisDataType`. I understand there was previous 
discussion on having flexibility for multiple command per data type, but I'm 
not sure how this enum design relates to that. The `getRedisDataType()` of the 
enumeration isn't used anywhere in the code, or user-facing interfaces. Am I 
missing something?


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357164#comment-15357164
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69142414
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * The container for all available Redis Commands.
+ */
+public interface RedisCommandsContainer extends Serializable {
+
+   /**
+* Sets field in the hash stored at key to value.
+* If key does not exist, a new key holding a hash is created.
+* If field already exists in the hash, it is overwritten.
+*
+* @param key Hash name
+* @param hashField Hash field
+* @param value Hash value
+*/
+   void hset(String key, String hashField, String value);
+
+   /**
+* Insert the specified value at the tail of the list stored at key.
+* If key does not exist, it is created as empty list before performing 
the push operation.
+*
+* @param listName Name of the List
+* @param value  Value to be added
+*/
+   void rpush(String listName, String value);
+
+   /**
+* Insert the specified value at the head of the list stored at key.
+* If key does not exist, it is created as empty list before performing 
the push operation.
+*
+* @param listName Name of the List
+* @param value  Value to be added
+*/
+   void lpush(String listName, String value);
+
+   /**
+* Add the specified member to the set stored at key.
+* Specified members that are already a member of this set are ignored.
+* If key does not exist, a new set is created before adding the 
specified members.
+*
+* @param setName Name of the Set
+* @param value Value to be added
+*/
+   void sadd(String setName, String value);
+
+   /**
+* Posts a message to the given channel.
+*
+* @param channelName Name of the channel to which data will be 
published
+* @param message the message
+*/
+   void  publish(String channelName, String message);
--- End diff --

extra space before `publish`


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357162#comment-15357162
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69142275
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a Redis cluster.
+ */
+public class RedisClusterContainer implements RedisCommandsContainer, 
Closeable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisClusterContainer.class);
+
+   private JedisCluster jedisCluster;
+
+   /**
+* Initialize Redis command container for Redis cluster.
+*
+* @param jedisCluster JedisCluster instance
+*/
+   public RedisClusterContainer(JedisCluster jedisCluster) {
+   Preconditions.checkNotNull(jedisCluster, "Jedis cluster can not 
be null");
+
+   this.jedisCluster = jedisCluster;
+   }
+
+   @Override
+   public void hset(final String key, final String hashField, final String 
value) {
+   try {
+   jedisCluster.hset(key, hashField, value);
+   } catch (Exception e) {
--- End diff --

Seems like we aren't failing on record failures for all command types. 
Should we also document this behaviour?


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357136#comment-15357136
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69138670
  
--- Diff: flink-streaming-connectors/flink-connector-redis/pom.xml ---
@@ -0,0 +1,86 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-streaming-connectors
+   1.1-SNAPSHOT
+   ..
+   
+
+   flink-connector-redis_2.10
+   flink-connector-redis
+
+   jar
+
+   
+   2.8.0
--- End diff --

What Redis versions are we supporting with this Jedis version? Perhaps we 
should have info about this in the documentation.


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357121#comment-15357121
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69137523
  
--- Diff: flink-streaming-connectors/flink-connector-redis/pom.xml ---
@@ -0,0 +1,86 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-streaming-connectors
+   1.1-SNAPSHOT
+   ..
+   
+
+   flink-connector-redis_2.10
+   flink-connector-redis
+
+   jar
+
+   
+   2.8.0
+   
+
+   
+   
+   org.apache.flink
+   flink-streaming-java_2.10
+   ${project.version}
--- End diff --

this should be in the `provided` scope. Connector jars don't need to be 
packaged with the streaming java API jar.


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357105#comment-15357105
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69135245
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Function that creates the description how the input data should be 
mapped to redis type.
+ *Example:
+ *{@code
+ *private static class RedisTestMapper implements 
RedisMapper> {
+ *public RedisDataTypeDescription getDataTypeDescription() {
+ *return new RedisDataTypeDescription(RedisCommand.PUBLISH);
+ *}
+ *public String getKeyFromData(Tuple2 data) {
+ *return data.f0;
+ *}
+ *public String getValueFromData(Tuple2 data) {
+ *return data.f1;
+ *}
+ *}
+ *}
+ *
+ * @param  The type of the element handled by this {@code RedisMapper}
+ */
+public interface RedisMapper extends Function, Serializable {
+
+   /**
+* Returns descriptor which defines data type.
+*
+* @return data type descriptor
+*/
+   RedisCommandDescription getDataTypeDescription();
--- End diff --

Now that this is renamed to `RedisCommandDescription`, should 
`getDataTypeDescription` be renamed accordingly too?


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357102#comment-15357102
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69134232
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ * When creating the sink using first constructor {@link 
#RedisSink(FlinkJedisPoolConfig, RedisMapper)}
+ * the sink will create connection using {@link 
redis.clients.jedis.JedisPool}.
+ * When using second constructor {@link 
#RedisSink(FlinkJedisSentinelConfig, RedisMapper)} the sink will create 
connection
+ * using {@link redis.clients.jedis.JedisSentinelPool} to Redis cluster. 
Use this if Redis is
+ * configured using sentinels else use the third constructor {@link 
#RedisSink(FlinkJedisClusterConfig, RedisMapper)}
+ * which use {@link redis.clients.jedis.JedisCluster} to connect to Redis 
cluster.
+ *
+ * Example:
+ *
+ * 
+ *{@code
+ *public static class RedisExampleMapper implements 
RedisMapper> {
+ *
+ * private RedisCommand redisCommand;
+ *
+ * public RedisAdditionalDataMapper(RedisCommand redisCommand){
+ * this.redisCommand = redisCommand;
+ * }
+ * public RedisDataTypeDescription getDataTypeDescription() {
+ * return new RedisDataTypeDescription(redisCommand, 
REDIS_ADDITIONAL_KEY);
+ * }
+ * public String getKeyFromData(Tuple2 data) {
+ * return data.f0;
+ * }
+ * public String getValueFromData(Tuple2 data) {
+ * return data.f1;
+ * }
+ *}
+ *JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
+ *.setHost(REDIS_HOST).setPort(REDIS_PORT).build();
+ *new RedisSink(jedisPoolConfig, new 
RedisExampleDataMapper(RedisCommand.LPUSH));
+ *}
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class RedisSink extends RichSinkFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisSink.class);
+
+   /**
+* This additional key needed for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET}.
+* Other {@link RedisDataType} works only with two variable i.e. name 
of the list and value to be added.
+* But for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET} we need three variables.
+* For {@link RedisDataType#HASH} we need hash name, hash key and 
element.

[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357099#comment-15357099
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69134099
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ * When creating the sink using first constructor {@link 
#RedisSink(FlinkJedisPoolConfig, RedisMapper)}
+ * the sink will create connection using {@link 
redis.clients.jedis.JedisPool}.
+ * When using second constructor {@link 
#RedisSink(FlinkJedisSentinelConfig, RedisMapper)} the sink will create 
connection
+ * using {@link redis.clients.jedis.JedisSentinelPool} to Redis cluster. 
Use this if Redis is
+ * configured using sentinels else use the third constructor {@link 
#RedisSink(FlinkJedisClusterConfig, RedisMapper)}
+ * which use {@link redis.clients.jedis.JedisCluster} to connect to Redis 
cluster.
+ *
+ * Example:
+ *
+ * 
+ *{@code
+ *public static class RedisExampleMapper implements 
RedisMapper> {
+ *
+ * private RedisCommand redisCommand;
+ *
+ * public RedisAdditionalDataMapper(RedisCommand redisCommand){
+ * this.redisCommand = redisCommand;
+ * }
+ * public RedisDataTypeDescription getDataTypeDescription() {
+ * return new RedisDataTypeDescription(redisCommand, 
REDIS_ADDITIONAL_KEY);
+ * }
+ * public String getKeyFromData(Tuple2 data) {
+ * return data.f0;
+ * }
+ * public String getValueFromData(Tuple2 data) {
+ * return data.f1;
+ * }
+ *}
+ *JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
+ *.setHost(REDIS_HOST).setPort(REDIS_PORT).build();
+ *new RedisSink(jedisPoolConfig, new 
RedisExampleDataMapper(RedisCommand.LPUSH));
+ *}
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class RedisSink extends RichSinkFunction {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisSink.class);
+
+   /**
+* This additional key needed for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET}.
+* Other {@link RedisDataType} works only with two variable i.e. name 
of the list and value to be added.
+* But for {@link RedisDataType#HASH} and {@link 
RedisDataType#SORTED_SET} we need three variables.
+* For {@link RedisDataType#HASH} we need hash name, hash key and 
element.

[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357089#comment-15357089
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r69132723
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
 ---
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.util.Preconditions;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+
+/**
+ * Configuration for Jedis Pool.
+ */
+public class FlinkJedisPoolConfig implements Serializable {
--- End diff --

The 3 configuration classes seems to have quite a bit of duplicate code, as 
well as inconsistency between them.
Ex. All 3 have getters / setters for `timeout`, `maxTotal`, `maxIdle`, 
etc., while the getter / setter method for `timeout` is sometimes called 
`getSoTimeout` and sometimes `getTimeout`.

Can we implement the shared settings in a base class, and let the 3 types 
of configuration extend from that?


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357039#comment-15357039
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/1813
  
@mjsax 
I think the failing `JMXReporterTest.testJMXAvailability` was just hotfixed 
with this commit yesterday:

https://github.com/apache/flink/commit/53630da01bcbfe05eda90869b1198b4e1c554a86


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357030#comment-15357030
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/1813
  
Sure, I'll give a full review now.


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15356225#comment-15356225
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on the issue:

https://github.com/apache/flink/pull/1813
  
@rmetzger What about this failing tests... 
```
JMXReporterTest.testJMXAvailability:148 » Runtime Could not start JMX 
server o...
```
Seems, there is no JIRA -- known issue? -- or no issue?


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15352920#comment-15352920
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user subhankarb commented on the issue:

https://github.com/apache/flink/pull/1813
  
@mjsax , @rmetzger  plz review. The changed model is described in the PR 
description.

thanks,
subhankar


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15351701#comment-15351701
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on the issue:

https://github.com/apache/flink/pull/1813
  
My two cents:
1) seems to got sorted out (thx @tzulitai for the input!)
2) I personally do not care too much about the name conflict. Reusing the 
same class for sink and source sounds reasonable. Maybe `FlinkRedisConfig` as 
name?
3) Agreed. `PUBSUB` as datatpye is fine IMHO (so we can use same type for 
source and sink, which makes it clear if PUB or SUB is used)
4 + 5 ) Your suggestions sound good to me. Please update PR so we can get 
better lock and feel for it :)

@tzulitai @subhankarb Thanks a lot!


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15350742#comment-15350742
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r68551999
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a single Redis server 
or to Redis sentinels
+ * If want to connect to a single Redis server, plz use the first 
constructor {@link #RedisContainer(JedisPool)}.
+ * If want to connect to a Redis sentinels, Plz use the second constructor 
{@link #RedisContainer(JedisSentinelPool)}
+ */
+public class RedisContainer implements RedisCommandsContainer, Closeable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisContainer.class);
+
+   private JedisPool jedisPool;
+   private JedisSentinelPool jedisSentinelPool;
+
+
+   /**
+* Use this constructor if to connect with single Redis server.
+*
+* @param jedisPool JedisPool which actually manages Jedis instances
+*/
+   public RedisContainer(JedisPool jedisPool) {
+   Preconditions.checkNotNull(jedisPool, "Jedis Pool can not be 
null");
+   this.jedisPool = jedisPool;
+   }
+
+   /**
+* Use this constructor if Redis environment is clustered with 
sentinels.
+*
+* @param sentinelPool SentinelPool which actually manages Jedis 
instances
+*/
+   public RedisContainer(final JedisSentinelPool sentinelPool) {
+   Preconditions.checkNotNull(sentinelPool, "Jedis Sentinel Pool 
can not be null");
+   this.jedisSentinelPool = sentinelPool;
+   }
+
+   /**
+* Closes the Jedis instances.
+*/
+   @Override
+   public void close() throws IOException {
+   if (this.jedisPool != null) {
+   this.jedisPool.close();
+   }
+   if (this.jedisSentinelPool != null) {
+   this.jedisSentinelPool.close();
+   }
+   }
+
+   @Override
+   public void hset(final String hashName, final String key, final String 
value) {
+   Jedis jedis = null;
+   try {
+   jedis = getInstance();
+   jedis.hset(hashName, key, value);
--- End diff --

The `RedisDataTypeDescription.additionalKey` is used for the key. Yah the 
param names are confusing. i am making the changes according to the same line 
the client. Thank you very much. :) 


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15350696#comment-15350696
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r68546159
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a single Redis server 
or to Redis sentinels
+ * If want to connect to a single Redis server, plz use the first 
constructor {@link #RedisContainer(JedisPool)}.
+ * If want to connect to a Redis sentinels, Plz use the second constructor 
{@link #RedisContainer(JedisSentinelPool)}
+ */
+public class RedisContainer implements RedisCommandsContainer, Closeable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisContainer.class);
+
+   private JedisPool jedisPool;
+   private JedisSentinelPool jedisSentinelPool;
+
+
+   /**
+* Use this constructor if to connect with single Redis server.
+*
+* @param jedisPool JedisPool which actually manages Jedis instances
+*/
+   public RedisContainer(JedisPool jedisPool) {
+   Preconditions.checkNotNull(jedisPool, "Jedis Pool can not be 
null");
+   this.jedisPool = jedisPool;
+   }
+
+   /**
+* Use this constructor if Redis environment is clustered with 
sentinels.
+*
+* @param sentinelPool SentinelPool which actually manages Jedis 
instances
+*/
+   public RedisContainer(final JedisSentinelPool sentinelPool) {
+   Preconditions.checkNotNull(sentinelPool, "Jedis Sentinel Pool 
can not be null");
+   this.jedisSentinelPool = sentinelPool;
+   }
+
+   /**
+* Closes the Jedis instances.
+*/
+   @Override
+   public void close() throws IOException {
+   if (this.jedisPool != null) {
+   this.jedisPool.close();
+   }
+   if (this.jedisSentinelPool != null) {
+   this.jedisSentinelPool.close();
+   }
+   }
+
+   @Override
+   public void hset(final String hashName, final String key, final String 
value) {
+   Jedis jedis = null;
+   try {
+   jedis = getInstance();
+   jedis.hset(hashName, key, value);
--- End diff --

Is it intended to use the `additionalKey` as the primary key for `HASH` 
too? If yes, the naming here is very confusing. `hashName` / `hashField` should 
refer to the inner secondary key.


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15350685#comment-15350685
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r68545039
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a single Redis server 
or to Redis sentinels
+ * If want to connect to a single Redis server, plz use the first 
constructor {@link #RedisContainer(JedisPool)}.
+ * If want to connect to a Redis sentinels, Plz use the second constructor 
{@link #RedisContainer(JedisSentinelPool)}
+ */
+public class RedisContainer implements RedisCommandsContainer, Closeable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisContainer.class);
+
+   private JedisPool jedisPool;
+   private JedisSentinelPool jedisSentinelPool;
+
+
+   /**
+* Use this constructor if to connect with single Redis server.
+*
+* @param jedisPool JedisPool which actually manages Jedis instances
+*/
+   public RedisContainer(JedisPool jedisPool) {
+   Preconditions.checkNotNull(jedisPool, "Jedis Pool can not be 
null");
+   this.jedisPool = jedisPool;
+   }
+
+   /**
+* Use this constructor if Redis environment is clustered with 
sentinels.
+*
+* @param sentinelPool SentinelPool which actually manages Jedis 
instances
+*/
+   public RedisContainer(final JedisSentinelPool sentinelPool) {
+   Preconditions.checkNotNull(sentinelPool, "Jedis Sentinel Pool 
can not be null");
+   this.jedisSentinelPool = sentinelPool;
+   }
+
+   /**
+* Closes the Jedis instances.
+*/
+   @Override
+   public void close() throws IOException {
+   if (this.jedisPool != null) {
+   this.jedisPool.close();
+   }
+   if (this.jedisSentinelPool != null) {
+   this.jedisSentinelPool.close();
+   }
+   }
+
+   @Override
+   public void hset(final String hashName, final String key, final String 
value) {
+   Jedis jedis = null;
+   try {
+   jedis = getInstance();
+   jedis.hset(hashName, key, value);
--- End diff --

The argument order here is wrong.
Jedis takes the argument for hset in this order: `(key, hashField, value)`. 
The primary key goes first, then the secondary.


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15350673#comment-15350673
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/1813
  
Hi @subhankarb,

Sorry, I just read the code and didn't realize you were using the 
`additionalKey` as the `setName` for `SORTED_SET`.

I think this might be quite confusing for Redis users. For Redis users, for 
sorted sets the `setName` is considered as the primary key, and the per-value 
score in the set is considered the additional / secondary key. It works the 
same for the hash data type: the hash fields are considered additional / 
secondary keys. So right now, the usage principal for `SORTED_SET` and `HASH` 
seems to be opposite to each other?


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15350646#comment-15350646
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user subhankarb commented on the issue:

https://github.com/apache/flink/pull/1813
  
@tzulitai, @mjsax  thank you very much for your valuable feedback.
1) `additionalKey` was supposed to one time declaration for `SORTED_SET` 
and `HASH`. For `HASH`
it is the value of hash name and `SORTED_SET` it is the name of the set. I 
assume it would not change once it declares. 

2) `JedisPoolConfig`/ `JedisClusterConfig` were not `Serializable` so i 
needed a wrapper class for that. @mjsax there is a story for a source for 
Redis, so `RedisSinkConfig` would not be a valid one as the config would be 
same for sink and source. Thats why i make the `JedisPoolConfig`/ 
`JedisClusterConfig` ctor private. So that user forced to use the builder class 
to avoid confusion. 

3) When i started with this i thought it was obvious that if i use PUBSUB 
in sink, i would always use it for publish and if i use this in source i would 
always use it for subscribe  :) .  

4 + 5 ) We can group the command and dataType like.
`public enum RedisCommand {
LPUSH(RedisDataType.LIST),
RPUSH(RedisDataType.LIST);
private RedisDataType redisDataType;
RedisCommand(RedisDataType redisDataType) {
  this.redisDataType = redisDataType;
}
public boolean isInRedisDataType(RedisDataType redisDataType) {
  return this.redisDataType == redisDataType;
}
}`
And in `RedisDataTypeDescription` we can extract the command . So that in 
future we can add new command. 



> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15349640#comment-15349640
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/1813
  
Hi @mjsax,

Regarding 1): just came to me that for SORTED_SET, a fixed secondary key 
doesn't make sense, because the secondary key is supposed to define the order 
of the inserted element in the set. For HASH, I guess it depends on the use 
case. For example, a stream of user events with user id can update Redis using 
the user id as primary key, and inner hash field (secondary key) depending on 
what event occurred.

I agree with your suggestion to settle with a subset of commands to start 
with, and if we want to, make it more flexible for multiple command per data 
type with new JIRAs as it would probably need larger refactoring.


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15349615#comment-15349615
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on the issue:

https://github.com/apache/flink/pull/1813
  
@tzulitai Thank a lot for testing this! Your feedback is really great!

1) I am not a Redis users either -- from my understanding, the second key 
determined the field that would be updated -- it makes sense to me, that this 
field is the same for all updates (ie, independent of the data itself). Thus, 
it would not make sense to extract it from the record via `RedisMapper`.
2) Might be confusing for Redis users (I did not stumble over it as a 
no-user ;)) -- renaming seems reasonable -- maybe `RedisSinkConfig` ?
3+4) Well, the data type is the same, no matter if you want to read or 
write it. But I agree, that using the date type itself in `RedisSink` might not 
be sufficient. I think the idea to support `LPUSH` was, that Flink as a 
streaming system might do best to append at the tail, not at the head... 
5) Similar to my answer to (4) -- the data type is the same -- 
independently what operation you perform on it. And you state yourself, that 
`RedisMapper` maps the type to the command/action.
6) See my answer to (1)

As a non-Redis users, it is hard for me to judge what flexibility is 
required/expected by users (ie, secondary key per record or not?). It also 
seems, that not all available action (LPUSH vs RPUSH) are implemented. How 
important is it, to support all action -- do you think, that the current once 
cover the most basic subset to get started with. It might be good, to just 
start with a subset of commands, and add new commands later on (just a 
suggestion). And do the think the implemented actions for each type are the 
most useful? If we want to support multiple different actions per data type, it 
would be a larger refactoring of this code I think.

@subhankarb @rmetzger What is your opinion on this?


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15349055#comment-15349055
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r68484062
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a single Redis server 
or to Redis sentinels
+ * If want to connect to a single Redis server, plz use the first 
constructor {@link #RedisContainer(JedisPool)}.
+ * If want to connect to a Redis sentinels, Plz use the second constructor 
{@link #RedisContainer(JedisSentinelPool)}
+ */
+public class RedisContainer implements RedisCommandsContainer, Closeable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisContainer.class);
+
+   private JedisPool jedisPool;
+   private JedisSentinelPool jedisSentinelPool;
+
+
+   /**
+* Use this constructor if to connect with single Redis server.
+*
+* @param jedisPool JedisPool which actually manages Jedis instances
+*/
+   public RedisContainer(JedisPool jedisPool) {
+   Preconditions.checkNotNull(jedisPool, "Jedis Pool can not be 
null");
+   this.jedisPool = jedisPool;
+   }
+
+   /**
+* Use this constructor if Redis environment is clustered with 
sentinels.
+*
+* @param sentinelPool SentinelPool which actually manages Jedis 
instances
+*/
+   public RedisContainer(final JedisSentinelPool sentinelPool) {
+   Preconditions.checkNotNull(sentinelPool, "Jedis Sentinel Pool 
can not be null");
+   this.jedisSentinelPool = sentinelPool;
+   }
+
+   /**
+* Closes the Jedis instances.
+*/
+   @Override
+   public void close() throws IOException {
+   if (this.jedisPool != null) {
+   this.jedisPool.close();
+   }
+   if (this.jedisSentinelPool != null) {
+   this.jedisSentinelPool.close();
+   }
+   }
+
+   @Override
+   public void hset(final String hashName, final String key, final String 
value) {
+   Jedis jedis = null;
+   try {
+   jedis = getInstance();
+   jedis.hset(hashName, key, value);
+   } catch (Exception e) {
+   if (LOG.isErrorEnabled()) {
+   LOG.error("Cannot send Redis message with 
command HSET to key {} and field {} error message {}",
+   key, key, e.getMessage());
+   }
+   } finally {
+   releaseInstance(jedis);
--- End diff --

`releaseInstance(jedis)` use jedis `close()` method. The close method 
return the resource to connection pool.


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connect

[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15348606#comment-15348606
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on the issue:

https://github.com/apache/flink/pull/1813
  
@rmetzger Thanks :)
I did not test with a Redis cluster or similar.


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15348422#comment-15348422
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on the issue:

https://github.com/apache/flink/pull/1813
  
Please address last comment. Otherwise, LGTM.


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15348407#comment-15348407
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r68412669
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
 ---
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a Redis cluster.
+ */
+public class RedisClusterContainer implements RedisCommandsContainer, 
Closeable {
+
--- End diff --

You can actually remove the whole JavaDoc comment (and it will inherit 
automatically) If you specify a new JavaDoc it replaces the old one; using 
`inheritDoc` tag you can include the original JavaDoc to extent it.


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15343862#comment-15343862
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user subhankarb commented on the issue:

https://github.com/apache/flink/pull/1813
  
@mjsax plz review.


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15343860#comment-15343860
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r68008039
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
 ---
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a Redis cluster.
+ */
+public class RedisClusterContainer implements RedisCommandsContainer, 
Closeable {
+
--- End diff --

replaced with {@inheritDoc}


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15343857#comment-15343857
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r68007965
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfig.java
 ---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Sentinel Pool.
+ */
+public class JedisSentinelConfig implements Serializable {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(JedisSentinelConfig.class);
+
+   private String masterName;
+   private Set sentinels;
+   private int connectionTimeout;
+   private int soTimeout;
+   private String password;
+   private int database;
+   private int maxTotal;
+   private int maxIdle;
+   private int minIdle;
+
+   /**
+* The master name and sentinels are mandatory, and when you didn't set 
these, it throws NullPointerException.
+*
+* @param masterName master name of the replica set
+* @param sentinels set of sentinel hosts
+* @param connectionTimeout timeout connection timeout
+* @param soTimeout timeout socket timeout
+* @param password password password, if any
+* @param database database database index
+* @param maxTotal maxTotal the maximum number of objects that can be 
allocated by the pool
+* @param maxIdle the cap on the number of "idle" instances in the pool
+* @param minIdle the minimum number of idle objects to maintain in the 
pool
+*
--- End diff --

fixed


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15343708#comment-15343708
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67994813
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
 ---
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a Redis cluster.
+ */
+public class RedisClusterContainer implements RedisCommandsContainer, 
Closeable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisClusterContainer.class);
+
+   private JedisCluster jedisCluster;
+
+   /**
+* Initialize Redis command container for Redis cluster.
+*
+* @param jedisCluster JedisCluster instance
+*/
+   public RedisClusterContainer(JedisCluster jedisCluster) {
+   this.jedisCluster = jedisCluster;
+   }
+
+   /**
+* Sets field in the hash stored at key to value.
+* If key does not exist, a new key holding a hash is created.
+* If field already exists in the hash, it is overwritten.
+*
+* @param hashName   Hash name
+* @param key Hash field name
+* @param value Hash value
+*/
+   @Override
+   public void hset(final String hashName, final String key, final String 
value) {
+   try {
+   jedisCluster.hset(hashName, key, value);
+   } catch (Exception e) {
+   if (LOG.isErrorEnabled()) {
+   LOG.error("Cannot send Redis message with 
command HSET to hash {} error message {}",
+   hashName, key, e.getMessage());
+   }
+   }
+   }
+
+   /**
+* Insert all the specified values at the tail of the list stored at 
key.
+* If key does not exist, it is created as empty list before performing 
the push operation.
+*
+* @param listName Name of the List
+* @param value  Value to be added
+*/
+   @Override
+   public void rpush(final String listName, final String value) {
+   try {
+   jedisCluster.rpush(listName, value);
+   } catch (Exception e) {
+   if (LOG.isErrorEnabled()) {
+   LOG.error("Cannot send Redis message with 
command RPUSH to list {} error message: {}",
+   listName, e.getMessage());
+   }
+   }
+   }
+
+   /**
+* Add the specified members to the set stored at key.
+* Specified members that are already a member of this set are ignored.
+* If key does not exist, a new set is created before adding the 
specified members.
+*
+* @param setName Name of the Set
+* @param value   Value to be added
+*/
+   @Override
+   public void sadd(final String setName, final String value) {
+   try {
+   jedisCluster.sadd(setName, value);
+   } catch (Exception e) {
+   if (LOG.isErrorEnabled()) {
+   LOG.error("Cannot send Redis message with 
command RPUSH to set {} error message {}",
+   setName, e.getMessage());
+   }
+   }
+   }
+
+   /**
+

[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15342139#comment-15342139
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67905768
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
 ---
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Redis command container if we want to connect to a Redis cluster.
+ */
+public class RedisClusterContainer implements RedisCommandsContainer, 
Closeable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisClusterContainer.class);
+
+   private JedisCluster jedisCluster;
+
+   /**
+* Initialize Redis command container for Redis cluster.
+*
+* @param jedisCluster JedisCluster instance
+*/
+   public RedisClusterContainer(JedisCluster jedisCluster) {
+   this.jedisCluster = jedisCluster;
--- End diff --

Already checked in RedisCommandsContainerBuilder.build method


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15342111#comment-15342111
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67904127
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfig.java
 ---
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Configuration for Jedis Cluster.
+ */
+public class JedisClusterConfig implements Serializable {
+   private static final long serialVersionUID = 1L;
+
+   private Set nodes;
+   private int timeout;
+   private int maxRedirections;
+   private int maxTotal;
+   private int maxIdle;
+   private int minIdle;
+
+   /**
+* The list of node is mandatory, and when nodes is not set, it throws 
NullPointerException.
--- End diff --

The constructor is private. So user has to set everything through builder 
class except nodes. 


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15341275#comment-15341275
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user subhankarb commented on the issue:

https://github.com/apache/flink/pull/1813
  
@mjsax wow... that's a lot. thanks you very very much for your time. i am 
fixing ASAP. 


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15338767#comment-15338767
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on the issue:

https://github.com/apache/flink/pull/1813
  
Most comments are nit or minor. Please fix. Otherwise, looks good.


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15338764#comment-15338764
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622740
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
 ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class RedisSinkTest extends RedisTestBase {
+
+   private JedisPoolConfig jedisPoolConfig;
+   private static final Long NUM_ELEMENTS = 20L;
+   private static final String REDIS_KEY = "TEST_KEY";
+   private static final String REDIS_ADDITIONAL_KEY = 
"TEST_ADDITIONAL_KEY";
+
+   StreamExecutionEnvironment env;
+
+
+   private Jedis jedis;
+
+   @Before
+   public void setUp(){
+   jedisPoolConfig = new JedisPoolConfig.Builder()
+   .setHost(REDIS_HOST)
+   .setPort(REDIS_PORT).build();
+   jedis = new Jedis(REDIS_HOST, REDIS_PORT);
+   env = StreamExecutionEnvironment.getExecutionEnvironment();
+   }
+
+   @Test
+   public void testRedisListDataType() throws Exception {
+   DataStreamSource> source = 
env.addSource(new TestSourceFunction());
+   RedisSink> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.LIST));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis List Data Type");
+
+   assertEquals(NUM_ELEMENTS, jedis.llen(REDIS_KEY));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisSetDataType() throws Exception {
+   DataStreamSource> source = 
env.addSource(new TestSourceFunction());
+   RedisSink> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.SET));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis Set Data Type");
+
+   assertEquals(NUM_ELEMENTS, jedis.scard(REDIS_KEY));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisHyperLogLogDataType() throws Exception {
+   DataStreamSource> source = 
env.addSource(new TestSourceFunction());
+   RedisSink> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.HYPER_LOG_LOG));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis Hyper Log Log Data Type");
+
+   assertEquals(NUM_ELEMENTS, 
Long.valueOf(jedis.pfcount(REDIS_KEY)));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisSortedSetDataType() throws Exception {
+   DataStreamSo

[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15338760#comment-15338760
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622724
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
 ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class RedisSinkTest extends RedisTestBase {
+
+   private JedisPoolConfig jedisPoolConfig;
+   private static final Long NUM_ELEMENTS = 20L;
+   private static final String REDIS_KEY = "TEST_KEY";
+   private static final String REDIS_ADDITIONAL_KEY = 
"TEST_ADDITIONAL_KEY";
+
+   StreamExecutionEnvironment env;
+
+
+   private Jedis jedis;
+
+   @Before
+   public void setUp(){
+   jedisPoolConfig = new JedisPoolConfig.Builder()
+   .setHost(REDIS_HOST)
+   .setPort(REDIS_PORT).build();
+   jedis = new Jedis(REDIS_HOST, REDIS_PORT);
+   env = StreamExecutionEnvironment.getExecutionEnvironment();
+   }
+
+   @Test
+   public void testRedisListDataType() throws Exception {
+   DataStreamSource> source = 
env.addSource(new TestSourceFunction());
+   RedisSink> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.LIST));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis List Data Type");
+
+   assertEquals(NUM_ELEMENTS, jedis.llen(REDIS_KEY));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisSetDataType() throws Exception {
+   DataStreamSource> source = 
env.addSource(new TestSourceFunction());
+   RedisSink> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.SET));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis Set Data Type");
+
+   assertEquals(NUM_ELEMENTS, jedis.scard(REDIS_KEY));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisHyperLogLogDataType() throws Exception {
+   DataStreamSource> source = 
env.addSource(new TestSourceFunction());
+   RedisSink> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.HYPER_LOG_LOG));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis Hyper Log Log Data Type");
+
+   assertEquals(NUM_ELEMENTS, 
Long.valueOf(jedis.pfcount(REDIS_KEY)));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisSortedSetDataType() throws Exception {
+   DataStreamSo

[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15338761#comment-15338761
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622731
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
 ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class RedisSinkTest extends RedisTestBase {
+
+   private JedisPoolConfig jedisPoolConfig;
+   private static final Long NUM_ELEMENTS = 20L;
+   private static final String REDIS_KEY = "TEST_KEY";
+   private static final String REDIS_ADDITIONAL_KEY = 
"TEST_ADDITIONAL_KEY";
+
+   StreamExecutionEnvironment env;
+
+
+   private Jedis jedis;
+
+   @Before
+   public void setUp(){
+   jedisPoolConfig = new JedisPoolConfig.Builder()
+   .setHost(REDIS_HOST)
+   .setPort(REDIS_PORT).build();
+   jedis = new Jedis(REDIS_HOST, REDIS_PORT);
+   env = StreamExecutionEnvironment.getExecutionEnvironment();
+   }
+
+   @Test
+   public void testRedisListDataType() throws Exception {
+   DataStreamSource> source = 
env.addSource(new TestSourceFunction());
+   RedisSink> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.LIST));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis List Data Type");
+
+   assertEquals(NUM_ELEMENTS, jedis.llen(REDIS_KEY));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisSetDataType() throws Exception {
+   DataStreamSource> source = 
env.addSource(new TestSourceFunction());
+   RedisSink> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.SET));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis Set Data Type");
+
+   assertEquals(NUM_ELEMENTS, jedis.scard(REDIS_KEY));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisHyperLogLogDataType() throws Exception {
+   DataStreamSource> source = 
env.addSource(new TestSourceFunction());
+   RedisSink> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.HYPER_LOG_LOG));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis Hyper Log Log Data Type");
+
+   assertEquals(NUM_ELEMENTS, 
Long.valueOf(jedis.pfcount(REDIS_KEY)));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisSortedSetDataType() throws Exception {
+   DataStreamSo

[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15338762#comment-15338762
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622734
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
 ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class RedisSinkTest extends RedisTestBase {
+
+   private JedisPoolConfig jedisPoolConfig;
+   private static final Long NUM_ELEMENTS = 20L;
+   private static final String REDIS_KEY = "TEST_KEY";
+   private static final String REDIS_ADDITIONAL_KEY = 
"TEST_ADDITIONAL_KEY";
+
+   StreamExecutionEnvironment env;
+
+
+   private Jedis jedis;
+
+   @Before
+   public void setUp(){
+   jedisPoolConfig = new JedisPoolConfig.Builder()
+   .setHost(REDIS_HOST)
+   .setPort(REDIS_PORT).build();
+   jedis = new Jedis(REDIS_HOST, REDIS_PORT);
+   env = StreamExecutionEnvironment.getExecutionEnvironment();
+   }
+
+   @Test
+   public void testRedisListDataType() throws Exception {
+   DataStreamSource> source = 
env.addSource(new TestSourceFunction());
+   RedisSink> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.LIST));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis List Data Type");
+
+   assertEquals(NUM_ELEMENTS, jedis.llen(REDIS_KEY));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisSetDataType() throws Exception {
+   DataStreamSource> source = 
env.addSource(new TestSourceFunction());
+   RedisSink> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.SET));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis Set Data Type");
+
+   assertEquals(NUM_ELEMENTS, jedis.scard(REDIS_KEY));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisHyperLogLogDataType() throws Exception {
+   DataStreamSource> source = 
env.addSource(new TestSourceFunction());
+   RedisSink> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.HYPER_LOG_LOG));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis Hyper Log Log Data Type");
+
+   assertEquals(NUM_ELEMENTS, 
Long.valueOf(jedis.pfcount(REDIS_KEY)));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisSortedSetDataType() throws Exception {
+   DataStreamSo

[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15338763#comment-15338763
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622738
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java
 ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class RedisSinkTest extends RedisTestBase {
+
+   private JedisPoolConfig jedisPoolConfig;
+   private static final Long NUM_ELEMENTS = 20L;
+   private static final String REDIS_KEY = "TEST_KEY";
+   private static final String REDIS_ADDITIONAL_KEY = 
"TEST_ADDITIONAL_KEY";
+
+   StreamExecutionEnvironment env;
+
+
+   private Jedis jedis;
+
+   @Before
+   public void setUp(){
+   jedisPoolConfig = new JedisPoolConfig.Builder()
+   .setHost(REDIS_HOST)
+   .setPort(REDIS_PORT).build();
+   jedis = new Jedis(REDIS_HOST, REDIS_PORT);
+   env = StreamExecutionEnvironment.getExecutionEnvironment();
+   }
+
+   @Test
+   public void testRedisListDataType() throws Exception {
+   DataStreamSource> source = 
env.addSource(new TestSourceFunction());
+   RedisSink> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.LIST));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis List Data Type");
+
+   assertEquals(NUM_ELEMENTS, jedis.llen(REDIS_KEY));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisSetDataType() throws Exception {
+   DataStreamSource> source = 
env.addSource(new TestSourceFunction());
+   RedisSink> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.SET));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis Set Data Type");
+
+   assertEquals(NUM_ELEMENTS, jedis.scard(REDIS_KEY));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisHyperLogLogDataType() throws Exception {
+   DataStreamSource> source = 
env.addSource(new TestSourceFunction());
+   RedisSink> redisSink = new 
RedisSink<>(jedisPoolConfig,
+   new RedisDataMapper(RedisDataType.HYPER_LOG_LOG));
+
+   source.addSink(redisSink);
+   env.execute("Test Redis Hyper Log Log Data Type");
+
+   assertEquals(NUM_ELEMENTS, 
Long.valueOf(jedis.pfcount(REDIS_KEY)));
+
+   jedis.del(REDIS_KEY);
+   }
+
+   @Test
+   public void testRedisSortedSetDataType() throws Exception {
+   DataStreamSo

[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-06-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15338759#comment-15338759
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r67622681
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.redis;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ * When creating the sink using first constructor {@link 
#RedisSink(JedisPoolConfig, RedisMapper)}
+ * the sink will create connection using {@link 
redis.clients.jedis.JedisPool}.
+ * When using second constructor {@link #RedisSink(JedisSentinelConfig, 
RedisMapper)} the sink will create connection
+ * using {@link redis.clients.jedis.JedisSentinelPool} to Redis cluster. 
Use this if redis is
+ * configured using sentinels else use the third constructor {@link 
#RedisSink(JedisClusterConfig, RedisMapper)}
+ * which use {@link redis.clients.jedis.JedisCluster} to connect to Redis 
cluster.
+ *
+ * Example:
+ *
+ * 
+ *{@code
+ *public static class RedisExampleDataMapper implements 
RedisMapper> {
+ *public RedisDataTypeDescription getDataTypeDescription() {
+ *return new RedisDataTypeDescription(dataType, 
REDIS_ADDITIONAL_KEY);
+ *}
+ *public String getKeyFromData(Tuple2 data) {
+ *return String.valueOf(data.f0);
+ *}
+ *public String getValueFromData(Tuple2 data) {
+ *return String.valueOf(data.f1);
--- End diff --

return data.f0;


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   3   >