[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15376320#comment-15376320 ] ASF GitHub Bot commented on FLINK-3034: --- Github user subhankarb commented on the issue: https://github.com/apache/flink/pull/1813 Hi @kl0u, IMO that is the expected behavior. The sink would not know that if the Redis is down or not unless it tries to send the next data to the Redis. When ever a new message reaches the sink it tries to use the connection pool, then an then only it can throw exception that it can not send the data to Redis. You can build a heartbeat mechanism to check periodically that Redis serve is up or down, and can cancel the job if Redis is down. @mjsax please correct me if my understanding is wrong. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > Fix For: 1.1.0 > > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15373221#comment-15373221 ] ASF GitHub Bot commented on FLINK-3034: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/1813 Hi @subhankarb ! By playing around with the sink during testing, I saw that if Redis goes down in the middle of the execution of a job, the job has to wait until the next element (after the Redis failure) arrives in order to detect that it is down. This is not the expected behavior, as we want the job to fail as soon as Redis goes down. Do you have an idea of why this is happening or how to fix it? > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > Fix For: 1.1.0 > > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15365814#comment-15365814 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on the issue: https://github.com/apache/flink/pull/1813 Thanks @subhankarb ! Great work. Thanks @tzulitai for helping with reviewing! > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > Fix For: 1.1.0 > > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15365780#comment-15365780 ] ASF GitHub Bot commented on FLINK-3034: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1813 > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > Fix For: 1.1.0 > > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15364231#comment-15364231 ] ASF GitHub Bot commented on FLINK-3034: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/1813 This is the build: https://travis-ci.org/rmetzger/flink/builds/142738735 > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15364216#comment-15364216 ] ASF GitHub Bot commented on FLINK-3034: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/1813 I'm addressing my last comments myself and merge the change once its green. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15364208#comment-15364208 ] ASF GitHub Bot commented on FLINK-3034: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69717542 --- Diff: flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; +import org.apache.flink.util.TestLogger; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisSentinelPool; +import redis.embedded.RedisCluster; +import redis.embedded.util.JedisUtil; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.AfterClass; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.apache.flink.util.NetUtils.getAvailablePort; + +public class RedisSentinelClusterTest extends TestLogger { + + private static RedisCluster cluster; + private static final String REDIS_MASTER = "master"; + private static final String TEST_KEY = "testKey"; + private static final String TEST_VALUE = "testValue"; + private static final List sentinels = Arrays.asList(getAvailablePort(), getAvailablePort()); + private static final List group1 = Arrays.asList(getAvailablePort(), getAvailablePort()); + + private JedisSentinelPool jedisSentinelPool; + private FlinkJedisSentinelConfig jedisSentinelConfig; + + @BeforeClass + public static void setUpCluster(){ + cluster = RedisCluster.builder().sentinelPorts(sentinels).quorumSize(1) + .serverPorts(group1).replicationGroup(REDIS_MASTER, 1) + .build(); + cluster.start(); + } + + @Before + public void setUp() { + Set hosts = JedisUtil.sentinelHosts(cluster); + jedisSentinelConfig = new FlinkJedisSentinelConfig.Builder().setMasterName(REDIS_MASTER) + .setSentinels(hosts).build(); + jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(), + jedisSentinelConfig.getSentinels()); + } + + @Test + public void testRedisSentinelOperation() { + RedisCommandsContainer redisContainer = RedisCommandsContainerBuilder.build(jedisSentinelConfig); + Jedis jedis = null; + try{ + jedis = jedisSentinelPool.getResource(); + redisContainer.set(TEST_KEY, TEST_VALUE); + assertEquals(TEST_VALUE, jedis.get(TEST_KEY)); + }catch (Exception ex){ + log.warn("Failed to get jedis resource {}", ex.getMessage()); --- End diff -- The test will not fail if there's an exception here. I still think this is almost like ignoring the exceptions here. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This mes
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15364180#comment-15364180 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on the issue: https://github.com/apache/flink/pull/1813 @rmetzger LGTM. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15364135#comment-15364135 ] ASF GitHub Bot commented on FLINK-3034: --- Github user subhankarb commented on the issue: https://github.com/apache/flink/pull/1813 @mjsax , @rmetzger plz review. IMO it is ready to get merged at last :) > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15362308#comment-15362308 ] ASF GitHub Bot commented on FLINK-3034: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/1813 @mjsax @subhankarb: the changes look good to me :) > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361390#comment-15361390 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on the issue: https://github.com/apache/flink/pull/1813 @subhankarb two tiny comments @tzulitai @rmetzger Any more comments? > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361387#comment-15361387 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69464742 --- Diff: flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.junit.Test; + +public class FlinkJedisConfigBaseTest { --- End diff -- extends TestLogger > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361385#comment-15361385 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69464673 --- Diff: flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; +import org.junit.Test; + +public class RedisSinkTest { --- End diff -- extends TestLogger > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361350#comment-15361350 ] ASF GitHub Bot commented on FLINK-3034: --- Github user subhankarb commented on the issue: https://github.com/apache/flink/pull/1813 @mjsax done. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361181#comment-15361181 ] ASF GitHub Bot commented on FLINK-3034: --- Github user subhankarb commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69444799 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java --- @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; + +import java.io.Serializable; + +/** + * Base class for Flink Redis configuration. + */ +public abstract class FlinkJedisConfigBase implements Serializable { --- End diff -- done. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361179#comment-15361179 ] ASF GitHub Bot commented on FLINK-3034: --- Github user subhankarb commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69444769 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java --- @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; + +import java.io.Serializable; + +/** + * Base class for Flink Redis configuration. + */ +public abstract class FlinkJedisConfigBase implements Serializable { + + protected int maxTotal; + protected int maxIdle; + protected int minIdle; + protected int connectionTimeout; + + protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int maxIdle, int minIdle){ + this.connectionTimeout = connectionTimeout; + this.maxTotal = maxTotal; + this.maxIdle = maxIdle; + this.minIdle = minIdle; + } --- End diff -- done > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361180#comment-15361180 ] ASF GitHub Bot commented on FLINK-3034: --- Github user subhankarb commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69444781 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java --- @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.util.Preconditions; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.Protocol; + +import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.Set; + +/** + * Configuration for Jedis Cluster. + */ +public class FlinkJedisClusterConfig extends FlinkJedisConfigBase { + private static final long serialVersionUID = 1L; + + private Set nodes; + private int maxRedirections; + + + /** +* Jedis cluster configuration. +* The list of node is mandatory, and when nodes is not set, it throws NullPointerException. +* +* @param nodes list of node information for JedisCluster +* @param connectionTimeout socket / connection timeout. The default is 2000 +* @param maxRedirections limit of redirections-how much we'll follow MOVED or ASK +* @param maxTotal the maximum number of objects that can be allocated by the pool +* @param maxIdle the cap on the number of "idle" instances in the pool +* @param minIdle the minimum number of idle objects to maintain in the pool +* @throws NullPointerException if parameter {@code nodes} is {@code null} +*/ + private FlinkJedisClusterConfig(Set nodes, int connectionTimeout, int maxRedirections, + int maxTotal, int maxIdle, int minIdle) { + super(connectionTimeout, maxTotal, maxIdle, minIdle); + + Preconditions.checkNotNull(nodes, "Node information should be presented"); --- End diff -- done. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361182#comment-15361182 ] ASF GitHub Bot commented on FLINK-3034: --- Github user subhankarb commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69444815 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig; +import org.apache.flink.util.Preconditions; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisSentinelPool; + +/** + * The builder for {@link RedisCommandsContainer}. + */ +public class RedisCommandsContainerBuilder { + + /** +* Builds container for single Redis environment. +* +* @param jedisPoolConfig configuration for JedisPool +* @return container for single Redis environment +* @throws NullPointerException if jedisPoolConfig is null +*/ + public static RedisCommandsContainer build(FlinkJedisPoolConfig jedisPoolConfig) { --- End diff -- done. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361164#comment-15361164 ] ASF GitHub Bot commented on FLINK-3034: --- Github user subhankarb commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69442146 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java --- @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.util.Preconditions; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.Protocol; + +import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.Set; + +/** + * Configuration for Jedis Cluster. + */ +public class FlinkJedisClusterConfig extends FlinkJedisConfigBase { + private static final long serialVersionUID = 1L; + + private Set nodes; + private int maxRedirections; + + + /** +* Jedis cluster configuration. +* The list of node is mandatory, and when nodes is not set, it throws NullPointerException. +* +* @param nodes list of node information for JedisCluster +* @param connectionTimeout socket / connection timeout. The default is 2000 +* @param maxRedirections limit of redirections-how much we'll follow MOVED or ASK +* @param maxTotal the maximum number of objects that can be allocated by the pool +* @param maxIdle the cap on the number of "idle" instances in the pool +* @param minIdle the minimum number of idle objects to maintain in the pool +* @throws NullPointerException if parameter {@code nodes} is {@code null} +*/ + private FlinkJedisClusterConfig(Set nodes, int connectionTimeout, int maxRedirections, + int maxTotal, int maxIdle, int minIdle) { + super(connectionTimeout, maxTotal, maxIdle, minIdle); + + Preconditions.checkNotNull(nodes, "Node information should be presented"); + this.nodes = nodes; + this.maxRedirections = maxRedirections; + } + + + + /** +* Returns nodes. +* +* @return list of node information +*/ + public Set getNodes() { + Set ret = new HashSet<>(); + for (InetSocketAddress node : nodes) { + ret.add(new HostAndPort(node.getHostName(), node.getPort())); + } + return ret; + } + + /** +* Returns limit of redirection. +* +* @return limit of redirection +*/ + public int getMaxRedirections() { + return maxRedirections; + } + + + /** +* Builder for initializing {@link FlinkJedisClusterConfig}. +*/ + public static class Builder { --- End diff -- It is static inner class. IMO let it keep that way. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361091#comment-15361091 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on the issue: https://github.com/apache/flink/pull/1813 @subhankarb I think one more pass and we are good to go! > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361089#comment-15361089 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69432240 --- Diff: flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java --- @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +public class JedisClusterConfigTest extends TestLogger { + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointExceptionIfNodeValueIsNull(){ + FlinkJedisClusterConfig.Builder builder = new FlinkJedisClusterConfig.Builder(); + builder.build(); + } --- End diff -- Add test for new check `isEmpty()`. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361081#comment-15361081 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69431563 --- Diff: flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java --- @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import redis.clients.jedis.Jedis; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class RedisSinkITCase extends RedisITCaseBase { + + private FlinkJedisPoolConfig jedisPoolConfig; + private static final Long NUM_ELEMENTS = 20L; + private static final String REDIS_KEY = "TEST_KEY"; + private static final String REDIS_ADDITIONAL_KEY = "TEST_ADDITIONAL_KEY"; + + StreamExecutionEnvironment env; + + + private Jedis jedis; + + @Before + public void setUp(){ + jedisPoolConfig = new FlinkJedisPoolConfig.Builder() + .setHost(REDIS_HOST) + .setPort(REDIS_PORT).build(); + jedis = new Jedis(REDIS_HOST, REDIS_PORT); + env = StreamExecutionEnvironment.getExecutionEnvironment(); + } + + @Test + public void testRedisListDataType() throws Exception { + DataStreamSource> source = env.addSource(new TestSourceFunction()); + RedisSink> redisSink = new RedisSink<>(jedisPoolConfig, + new RedisCommandMapper(RedisCommand.LPUSH)); + + source.addSink(redisSink); + env.execute("Test Redis List Data Type"); + + assertEquals(NUM_ELEMENTS, jedis.llen(REDIS_KEY)); + + jedis.del(REDIS_KEY); + } + + @Test + public void testRedisSetDataType() throws Exception { + DataStreamSource> source = env.addSource(new TestSourceFunction()); + RedisSink> redisSink = new RedisSink<>(jedisPoolConfig, + new RedisCommandMapper(RedisCommand.SADD)); + + source.addSink(redisSink); + env.execute("Test Redis Set Data Type"); + + assertEquals(NUM_ELEMENTS, jedis.scard(REDIS_KEY)); + + jedis.del(REDIS_KEY); + } + + @Test + public void testRedisHyperLogLogDataType() throws Exception { + DataStreamSource> source = env.addSource(new TestSourceFunction()); + RedisSink> redisSink = new RedisSink<>(jedisPoolConfig, + new RedisCommandMapper(RedisCommand.PFADD)); + + source.addSink(redisSink); + env.execute("Test Redis Hyper Log Log Data Type"); + + assertEquals(NUM_ELEMENTS, Long.valueOf(jedis.pfcount(REDIS_KEY))); + + jedis.del(REDIS_KEY); + } + + @Test + public void testRedisSortedSetDataType() throws Exception {
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361075#comment-15361075 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69430812 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java --- @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisSentinelPool; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Redis command container if we want to connect to a single Redis server or to Redis sentinels + * If want to connect to a single Redis server, please use the first constructor {@link #RedisContainer(JedisPool)}. + * If want to connect to a Redis sentinels, Please use the second constructor {@link #RedisContainer(JedisSentinelPool)} + */ +public class RedisContainer implements RedisCommandsContainer, Closeable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisContainer.class); + + private JedisPool jedisPool; + private JedisSentinelPool jedisSentinelPool; + + + /** +* Use this constructor if to connect with single Redis server. +* +* @param jedisPool JedisPool which actually manages Jedis instances +*/ + public RedisContainer(JedisPool jedisPool) { + Preconditions.checkNotNull(jedisPool, "Jedis Pool can not be null"); + this.jedisPool = jedisPool; + } + + /** +* Use this constructor if Redis environment is clustered with sentinels. +* +* @param sentinelPool SentinelPool which actually manages Jedis instances +*/ + public RedisContainer(final JedisSentinelPool sentinelPool) { + Preconditions.checkNotNull(sentinelPool, "Jedis Sentinel Pool can not be null"); + this.jedisSentinelPool = sentinelPool; + } + + /** +* Closes the Jedis instances. +*/ + @Override + public void close() throws IOException { + if (this.jedisPool != null) { + this.jedisPool.close(); + } + if (this.jedisSentinelPool != null) { + this.jedisSentinelPool.close(); + } + } + + @Override + public void hset(final String key, final String hashField, final String value) { + Jedis jedis = null; + try { + jedis = getInstance(); + jedis.hset(key, hashField, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command HSET to key {} and hashField {} error message {}", + key, hashField, e.getMessage()); + } + throw e; + } finally { + releaseInstance(jedis); + } + } + + @Override + public void rpush(final String listName, final String value) { + Jedis jedis = null; + try { + jedis = getInstance(); + jedis.rpush(listName, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command RPUSH to list {} error message {}", +
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361071#comment-15361071 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69430592 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java --- @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisSentinelPool; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Redis command container if we want to connect to a single Redis server or to Redis sentinels + * If want to connect to a single Redis server, please use the first constructor {@link #RedisContainer(JedisPool)}. + * If want to connect to a Redis sentinels, Please use the second constructor {@link #RedisContainer(JedisSentinelPool)} + */ +public class RedisContainer implements RedisCommandsContainer, Closeable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisContainer.class); + + private JedisPool jedisPool; + private JedisSentinelPool jedisSentinelPool; --- End diff -- make both `final` and set to `null` in constructors accordingly. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361070#comment-15361070 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69430279 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import java.io.IOException; +import java.io.Serializable; + +/** + * The container for all available Redis Commands. --- End diff -- Nit: [c]ommands > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361067#comment-15361067 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69430042 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java --- @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Protocol; + +import java.util.Set; + +/** + * Configuration for Jedis Sentinel Pool. + */ +public class FlinkJedisSentinelConfig extends FlinkJedisConfigBase { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FlinkJedisSentinelConfig.class); + + private String masterName; + private Set sentinels; + private int soTimeout; + private String password; + private int database; + + /** +* Jedis Sentinels config. +* The master name and sentinels are mandatory, and when you didn't set these, it throws NullPointerException. +* +* @param masterName master name of the replica set +* @param sentinels set of sentinel hosts +* @param connectionTimeout timeout connection timeout +* @param soTimeout timeout socket timeout +* @param password password, if any +* @param database database database index +* @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool +* @param maxIdle the cap on the number of "idle" instances in the pool +* @param minIdle the minimum number of idle objects to maintain in the pool +* +* @throws NullPointerException if {@code masterName} or {@code sentinels} is {@code null} +* @throws IllegalArgumentException if {@code sentinels} are empty +*/ + private FlinkJedisSentinelConfig(String masterName, Set sentinels, + int connectionTimeout, int soTimeout, + String password, int database, + int maxTotal, int maxIdle, int minIdle) { + super(connectionTimeout, maxTotal, maxIdle, minIdle); + Preconditions.checkNotNull(masterName, "Master name should be presented"); + Preconditions.checkNotNull(sentinels, "Sentinels information should be presented"); + Preconditions.checkArgument(!sentinels.isEmpty(), "Sentinel hosts should not be empty"); + + this.masterName = masterName; + this.sentinels = sentinels; --- End diff -- Should we copy this `Set` to guard against external modifications? > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361066#comment-15361066 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69429926 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java --- @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Protocol; + +import java.util.Set; + +/** + * Configuration for Jedis Sentinel Pool. + */ +public class FlinkJedisSentinelConfig extends FlinkJedisConfigBase { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(FlinkJedisSentinelConfig.class); + + private String masterName; + private Set sentinels; + private int soTimeout; + private String password; + private int database; --- End diff -- `final` to all > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361065#comment-15361065 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69429896 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java --- @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Protocol; + +import java.util.Set; + +/** + * Configuration for Jedis Sentinel Pool. --- End diff -- Nit: [p]ool > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361062#comment-15361062 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69429657 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.util.Preconditions; +import redis.clients.jedis.Protocol; + +/** + * Configuration for Jedis Pool. + */ +public class FlinkJedisPoolConfig extends FlinkJedisConfigBase { + + private static final long serialVersionUID = 1L; + + private String host; + private int port; + private int database; + private String password; --- End diff -- `final` to all > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361060#comment-15361060 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69429555 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.util.Preconditions; +import redis.clients.jedis.Protocol; + +/** + * Configuration for Jedis Pool. --- End diff -- Nit: [p]ool > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361058#comment-15361058 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69429234 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java --- @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; + +import java.io.Serializable; + +/** + * Base class for Flink Redis configuration. + */ +public abstract class FlinkJedisConfigBase implements Serializable { + + protected int maxTotal; + protected int maxIdle; + protected int minIdle; + protected int connectionTimeout; + + protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int maxIdle, int minIdle){ + this.connectionTimeout = connectionTimeout; + this.maxTotal = maxTotal; + this.maxIdle = maxIdle; + this.minIdle = minIdle; + } --- End diff -- Should we check for negative values? > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361049#comment-15361049 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69428684 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java --- @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.util.Preconditions; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.Protocol; + +import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.Set; + +/** + * Configuration for Jedis Cluster. + */ +public class FlinkJedisClusterConfig extends FlinkJedisConfigBase { + private static final long serialVersionUID = 1L; + + private Set nodes; + private int maxRedirections; + + + /** +* Jedis cluster configuration. +* The list of node is mandatory, and when nodes is not set, it throws NullPointerException. +* +* @param nodes list of node information for JedisCluster +* @param connectionTimeout socket / connection timeout. The default is 2000 +* @param maxRedirections limit of redirections-how much we'll follow MOVED or ASK +* @param maxTotal the maximum number of objects that can be allocated by the pool +* @param maxIdle the cap on the number of "idle" instances in the pool +* @param minIdle the minimum number of idle objects to maintain in the pool +* @throws NullPointerException if parameter {@code nodes} is {@code null} +*/ + private FlinkJedisClusterConfig(Set nodes, int connectionTimeout, int maxRedirections, + int maxTotal, int maxIdle, int minIdle) { + super(connectionTimeout, maxTotal, maxIdle, minIdle); + + Preconditions.checkNotNull(nodes, "Node information should be presented"); --- End diff -- Should we throw on `nodes.isEmpty()` ? Maybe also make a copy of the `Set` to ensure it cannot be altered from outside? > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361045#comment-15361045 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69428463 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java --- @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.util.Preconditions; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.Protocol; + +import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.Set; + +/** + * Configuration for Jedis Cluster. + */ +public class FlinkJedisClusterConfig extends FlinkJedisConfigBase { + private static final long serialVersionUID = 1L; + + private Set nodes; + private int maxRedirections; --- End diff -- `final` > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361044#comment-15361044 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69428451 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java --- @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.util.Preconditions; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.Protocol; + +import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.Set; + +/** + * Configuration for Jedis Cluster. + */ +public class FlinkJedisClusterConfig extends FlinkJedisConfigBase { + private static final long serialVersionUID = 1L; + + private Set nodes; --- End diff -- `final` > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361041#comment-15361041 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69428276 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java --- @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.util.Preconditions; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.Protocol; + +import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.Set; + +/** + * Configuration for Jedis Cluster. --- End diff -- Nit: [c]luster > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361038#comment-15361038 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69428038 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java --- @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; + +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * A sink that delivers data to a Redis channel using the Jedis client. + * The sink takes two arguments {@link FlinkJedisConfigBase} and {@link RedisMapper}. + * When {@link FlinkJedisPoolConfig} is passed as the first argument, + * the sink will create connection using {@link redis.clients.jedis.JedisPool}. Please use this when + * you want to connect to a single Redis server. + * When {@link FlinkJedisSentinelConfig} is passed as the first argument, the sink will create connection + * using {@link redis.clients.jedis.JedisSentinelPool}. Please use this when you want to connect to Sentinel. + * Please use {@link FlinkJedisClusterConfig} as the first argument if you want to connect to + * a Redis Cluster. + * + * Example: + * + * + *{@code + *public static class RedisExampleMapper implements RedisMapper> { + * + * private RedisCommand redisCommand; + * + * public RedisAdditionalDataMapper(RedisCommand redisCommand){ + * this.redisCommand = redisCommand; + * } + * public RedisCommandDescription getCommandDescription() { + * return new RedisCommandDescription(redisCommand, REDIS_ADDITIONAL_KEY); + * } + * public String getKeyFromData(Tuple2 data) { + * return data.f0; + * } + * public String getValueFromData(Tuple2 data) { + * return data.f1; + * } + *} + *JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder() + *.setHost(REDIS_HOST).setPort(REDIS_PORT).build(); + *new RedisSink(jedisPoolConfig, new RedisExampleDataMapper(RedisCommand.LPUSH)); + *} + * + * @param Type of the elements emitted by this sink + */ +public class RedisSink extends RichSinkFunction { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisSink.class); + + /** +* This additional key needed for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}. +* Other {@link RedisDataType} works only with two variable i.e. name of the list and value to be added. +* But for {@link RedisDataType#HASH} and {@link Re
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361037#comment-15361037 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69427880 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java --- @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; + +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * A sink that delivers data to a Redis channel using the Jedis client. + * The sink takes two arguments {@link FlinkJedisConfigBase} and {@link RedisMapper}. + * When {@link FlinkJedisPoolConfig} is passed as the first argument, + * the sink will create connection using {@link redis.clients.jedis.JedisPool}. Please use this when + * you want to connect to a single Redis server. + * When {@link FlinkJedisSentinelConfig} is passed as the first argument, the sink will create connection + * using {@link redis.clients.jedis.JedisSentinelPool}. Please use this when you want to connect to Sentinel. + * Please use {@link FlinkJedisClusterConfig} as the first argument if you want to connect to + * a Redis Cluster. + * + * Example: + * + * + *{@code + *public static class RedisExampleMapper implements RedisMapper> { + * + * private RedisCommand redisCommand; + * + * public RedisAdditionalDataMapper(RedisCommand redisCommand){ + * this.redisCommand = redisCommand; + * } + * public RedisCommandDescription getCommandDescription() { + * return new RedisCommandDescription(redisCommand, REDIS_ADDITIONAL_KEY); + * } + * public String getKeyFromData(Tuple2 data) { + * return data.f0; + * } + * public String getValueFromData(Tuple2 data) { + * return data.f1; + * } + *} + *JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder() + *.setHost(REDIS_HOST).setPort(REDIS_PORT).build(); + *new RedisSink(jedisPoolConfig, new RedisExampleDataMapper(RedisCommand.LPUSH)); + *} + * + * @param Type of the elements emitted by this sink + */ +public class RedisSink extends RichSinkFunction { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisSink.class); + + /** +* This additional key needed for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}. +* Other {@link RedisDataType} works only with two variable i.e. name of the list and value to be added. +* But for {@link RedisDataType#HASH} and {@link Re
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361034#comment-15361034 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69427481 --- Diff: docs/apis/streaming/connectors/redis.md --- @@ -0,0 +1,177 @@ +--- +title: "Redis Connector" + +# Sub-level navigation +sub-nav-group: streaming +sub-nav-parent: connectors +sub-nav-pos: 6 +sub-nav-title: Redis +--- + + +This connector provides a Sink that can write to +[Redis](http://redis.io/) and also can publish data to [Redis PubSub](http://redis.io/topics/pubsub). To use this connector, add the +following dependency to your project: +{% highlight xml %} + + org.apache.flink + flink-connector-redis{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} +Version Compatibility: This module is compatible with redis 2.8.5 . + +Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution [here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution). + + Installing Redis +Follow the instructions from the [Redis download page](http://redis.io/download). + + Redis Sink +A class providing an interface for sending data to Redis. +The sink can use three different methods for communicating with different type of Redis environments: +1. Single Redis Server +2. Redis Cluster +3. Redis Sentinel + +This code shows how to create a sink that communicate to a single redis server: + + + +{% highlight java %} +public static class RedisExampleMapper implements RedisMapper>{ + +@Override +public RedisCommandDescription getCommandDescription() { +return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME"); +} + +@Override +public String getKeyFromData(Tuple2 data) { +return data.f0; +} + +@Override +public String getValueFromData(Tuple2 data) { +return data.f1; +} +} +FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build(); + +DataStream stream = ...; +stream.addSink(new RedisSink>(conf, new RedisExampleMapper()); +{% endhighlight %} + + +{% highlight scala %} +class RedisExampleMapper extends RedisMapper[(String, String)]{ + override def getCommandDescription: RedisCommandDescription = { +new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME") + } + + override def getKeyFromData(data: (String, String)): String = data._1 + + override def getValueFromData(data: (String, String)): String = data._2 +} +val conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build() +stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper)) +{% endhighlight %} + + + +This example code does the same, but for Redis Cluster: + + + +{% highlight java %} + +FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder() +.setNodes(new HashSet(Arrays.asList(new InetSocketAddress(5601.build(); + +DataStream stream = ...; +stream.addSink(new RedisSink>(conf, new RedisExampleMapper()); +{% endhighlight %} + + +{% highlight scala %} +val conf = new FlinkJedisPoolConfig.Builder().setNodes(...).build() +stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper)) +{% endhighlight %} + + + +This example shows when the Redis environment is with Sentinels: + + + +{% highlight java %} + +FlinkJedisSentinelConfig conf = new FlinkJedisSentinelConfig.Builder() +.setMasterName("master").setSentinels(...).build(); + +DataStream stream = ...; +stream.addSink(new RedisSink>(conf, new RedisExampleMapper()); +{% endhighlight %} + + +{% highlight scala %} +val conf = new FlinkJedisSentinelConfig.Builder().setMasterName("master").setSentinels(...).build() +stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper)) +{% endhighlight %} + + + +This section gives a description of all the available data types and what redis command used for that. + + + + + Data Type + Redis Command [Sink] + Redis Command [Source] + + + + +HASHhttp://redis.io/commands/hset";> HSET--NA-- + + +L
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361033#comment-15361033 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69427444 --- Diff: docs/apis/streaming/connectors/redis.md --- @@ -0,0 +1,177 @@ +--- +title: "Redis Connector" + +# Sub-level navigation +sub-nav-group: streaming +sub-nav-parent: connectors +sub-nav-pos: 6 +sub-nav-title: Redis +--- + + +This connector provides a Sink that can write to +[Redis](http://redis.io/) and also can publish data to [Redis PubSub](http://redis.io/topics/pubsub). To use this connector, add the +following dependency to your project: +{% highlight xml %} + + org.apache.flink + flink-connector-redis{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} +Version Compatibility: This module is compatible with redis 2.8.5 . + +Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution [here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution). + + Installing Redis +Follow the instructions from the [Redis download page](http://redis.io/download). + + Redis Sink +A class providing an interface for sending data to Redis. +The sink can use three different methods for communicating with different type of Redis environments: +1. Single Redis Server +2. Redis Cluster +3. Redis Sentinel + +This code shows how to create a sink that communicate to a single redis server: + + + +{% highlight java %} +public static class RedisExampleMapper implements RedisMapper>{ + +@Override +public RedisCommandDescription getCommandDescription() { +return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME"); +} + +@Override +public String getKeyFromData(Tuple2 data) { +return data.f0; +} + +@Override +public String getValueFromData(Tuple2 data) { +return data.f1; +} +} +FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build(); + +DataStream stream = ...; +stream.addSink(new RedisSink>(conf, new RedisExampleMapper()); +{% endhighlight %} + + +{% highlight scala %} +class RedisExampleMapper extends RedisMapper[(String, String)]{ + override def getCommandDescription: RedisCommandDescription = { +new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME") + } + + override def getKeyFromData(data: (String, String)): String = data._1 + + override def getValueFromData(data: (String, String)): String = data._2 +} +val conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build() +stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper)) +{% endhighlight %} + + + +This example code does the same, but for Redis Cluster: + + + +{% highlight java %} + +FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder() +.setNodes(new HashSet(Arrays.asList(new InetSocketAddress(5601.build(); + +DataStream stream = ...; +stream.addSink(new RedisSink>(conf, new RedisExampleMapper()); +{% endhighlight %} + + +{% highlight scala %} +val conf = new FlinkJedisPoolConfig.Builder().setNodes(...).build() +stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper)) +{% endhighlight %} + + + +This example shows when the Redis environment is with Sentinels: + + + +{% highlight java %} + +FlinkJedisSentinelConfig conf = new FlinkJedisSentinelConfig.Builder() +.setMasterName("master").setSentinels(...).build(); + +DataStream stream = ...; +stream.addSink(new RedisSink>(conf, new RedisExampleMapper()); +{% endhighlight %} + + +{% highlight scala %} +val conf = new FlinkJedisSentinelConfig.Builder().setMasterName("master").setSentinels(...).build() +stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper)) +{% endhighlight %} + + + +This section gives a description of all the available data types and what redis command used for that. + + + + + Data Type + Redis Command [Sink] + Redis Command [Source] + + + + +HASHhttp://redis.io/commands/hset";> HSET--NA-- --- End diff -- Nit: remove blank
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361032#comment-15361032 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69427347 --- Diff: docs/apis/streaming/connectors/redis.md --- @@ -0,0 +1,177 @@ +--- +title: "Redis Connector" + +# Sub-level navigation +sub-nav-group: streaming +sub-nav-parent: connectors +sub-nav-pos: 6 +sub-nav-title: Redis +--- + + +This connector provides a Sink that can write to +[Redis](http://redis.io/) and also can publish data to [Redis PubSub](http://redis.io/topics/pubsub). To use this connector, add the +following dependency to your project: +{% highlight xml %} + + org.apache.flink + flink-connector-redis{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} +Version Compatibility: This module is compatible with redis 2.8.5 . + +Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution [here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution). + + Installing Redis +Follow the instructions from the [Redis download page](http://redis.io/download). + + Redis Sink +A class providing an interface for sending data to Redis. +The sink can use three different methods for communicating with different type of Redis environments: +1. Single Redis Server +2. Redis Cluster +3. Redis Sentinel + +This code shows how to create a sink that communicate to a single redis server: + + + +{% highlight java %} +public static class RedisExampleMapper implements RedisMapper>{ + +@Override +public RedisCommandDescription getCommandDescription() { +return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME"); +} + +@Override +public String getKeyFromData(Tuple2 data) { +return data.f0; +} + +@Override +public String getValueFromData(Tuple2 data) { +return data.f1; +} +} +FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build(); + +DataStream stream = ...; +stream.addSink(new RedisSink>(conf, new RedisExampleMapper()); +{% endhighlight %} + + +{% highlight scala %} +class RedisExampleMapper extends RedisMapper[(String, String)]{ + override def getCommandDescription: RedisCommandDescription = { +new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME") + } + + override def getKeyFromData(data: (String, String)): String = data._1 + + override def getValueFromData(data: (String, String)): String = data._2 +} +val conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build() +stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper)) +{% endhighlight %} + + + +This example code does the same, but for Redis Cluster: + + + +{% highlight java %} + +FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder() +.setNodes(new HashSet(Arrays.asList(new InetSocketAddress(5601.build(); + +DataStream stream = ...; +stream.addSink(new RedisSink>(conf, new RedisExampleMapper()); +{% endhighlight %} + + +{% highlight scala %} +val conf = new FlinkJedisPoolConfig.Builder().setNodes(...).build() +stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper)) +{% endhighlight %} + + + +This example shows when the Redis environment is with Sentinels: + + + +{% highlight java %} + +FlinkJedisSentinelConfig conf = new FlinkJedisSentinelConfig.Builder() +.setMasterName("master").setSentinels(...).build(); + +DataStream stream = ...; +stream.addSink(new RedisSink>(conf, new RedisExampleMapper()); +{% endhighlight %} + + +{% highlight scala %} +val conf = new FlinkJedisSentinelConfig.Builder().setMasterName("master").setSentinels(...).build() +stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper)) +{% endhighlight %} + + + +This section gives a description of all the available data types and what redis command used for that. --- End diff -- [R]edis > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361028#comment-15361028 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69427102 --- Diff: docs/apis/streaming/connectors/redis.md --- @@ -0,0 +1,177 @@ +--- +title: "Redis Connector" + +# Sub-level navigation +sub-nav-group: streaming +sub-nav-parent: connectors +sub-nav-pos: 6 +sub-nav-title: Redis +--- + + +This connector provides a Sink that can write to +[Redis](http://redis.io/) and also can publish data to [Redis PubSub](http://redis.io/topics/pubsub). To use this connector, add the +following dependency to your project: +{% highlight xml %} + + org.apache.flink + flink-connector-redis{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} +Version Compatibility: This module is compatible with redis 2.8.5 . + +Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution [here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution). --- End diff -- Style: Replace "See linking with them..." by "You need to link them for cluster execution explicitly." -- with "explicitly" as the link, ie, it replaces "here". > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361027#comment-15361027 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69426825 --- Diff: docs/apis/streaming/connectors/redis.md --- @@ -0,0 +1,177 @@ +--- +title: "Redis Connector" + +# Sub-level navigation +sub-nav-group: streaming +sub-nav-parent: connectors +sub-nav-pos: 6 +sub-nav-title: Redis +--- + + +This connector provides a Sink that can write to +[Redis](http://redis.io/) and also can publish data to [Redis PubSub](http://redis.io/topics/pubsub). To use this connector, add the +following dependency to your project: +{% highlight xml %} + + org.apache.flink + flink-connector-redis{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} +Version Compatibility: This module is compatible with redis 2.8.5 . --- End diff -- Nit. Should there be a whitespace after the version number? > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15360878#comment-15360878 ] ASF GitHub Bot commented on FLINK-3034: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/1813 Hi @subhankarb, Thank you for addressing our comments. I have some last nit-pick comments to make the code just a little better, otherwise the changes LGTM :) Perhaps we should also wait for @mjsax to give another review on the last changes too, before you get on to addressing the latest comments. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15360868#comment-15360868 ] ASF GitHub Bot commented on FLINK-3034: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69410536 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java --- @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; + +import java.io.Serializable; + +/** + * Base class for Flink Redis configuration. + */ +public abstract class FlinkJedisConfigBase implements Serializable { + + protected int maxTotal; + protected int maxIdle; + protected int minIdle; + protected int connectionTimeout; --- End diff -- nit: I would add `final` modifier to these, we don't seem to want to be changing them after the user creates the config. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15360861#comment-15360861 ] ASF GitHub Bot commented on FLINK-3034: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69410042 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java --- @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; + +import java.io.Serializable; + +/** + * Base class for Flink Redis configuration. + */ +public abstract class FlinkJedisConfigBase implements Serializable { --- End diff -- This is missing a `serialVersionUID `. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15360854#comment-15360854 ] ASF GitHub Bot commented on FLINK-3034: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69409744 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java --- @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; + +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * A sink that delivers data to a Redis channel using the Jedis client. + * The sink takes two arguments {@link FlinkJedisConfigBase} and {@link RedisMapper}. + * When {@link FlinkJedisPoolConfig} is passed as the first argument, + * the sink will create connection using {@link redis.clients.jedis.JedisPool}. Please use this when + * you want to connect to a single Redis server. + * When {@link FlinkJedisSentinelConfig} is passed as the first argument, the sink will create connection + * using {@link redis.clients.jedis.JedisSentinelPool}. Please use this when you want to connect to Sentinel. + * Please use {@link FlinkJedisClusterConfig} as the first argument if you want to connect to + * a Redis Cluster. + * + * Example: + * + * + *{@code + *public static class RedisExampleMapper implements RedisMapper> { + * + * private RedisCommand redisCommand; + * + * public RedisAdditionalDataMapper(RedisCommand redisCommand){ + * this.redisCommand = redisCommand; + * } + * public RedisCommandDescription getCommandDescription() { + * return new RedisCommandDescription(redisCommand, REDIS_ADDITIONAL_KEY); + * } + * public String getKeyFromData(Tuple2 data) { + * return data.f0; + * } + * public String getValueFromData(Tuple2 data) { + * return data.f1; + * } + *} + *JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder() + *.setHost(REDIS_HOST).setPort(REDIS_PORT).build(); + *new RedisSink(jedisPoolConfig, new RedisExampleDataMapper(RedisCommand.LPUSH)); --- End diff -- This part in the Javadoc seems to be wrong too. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15360846#comment-15360846 ] ASF GitHub Bot commented on FLINK-3034: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69409336 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java --- @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.util.Preconditions; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.Protocol; + +import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.Set; + +/** + * Configuration for Jedis Cluster. + */ +public class FlinkJedisClusterConfig extends FlinkJedisConfigBase { + private static final long serialVersionUID = 1L; + + private Set nodes; + private int maxRedirections; + + + /** +* Jedis cluster configuration. +* The list of node is mandatory, and when nodes is not set, it throws NullPointerException. +* +* @param nodes list of node information for JedisCluster +* @param connectionTimeout socket / connection timeout. The default is 2000 +* @param maxRedirections limit of redirections-how much we'll follow MOVED or ASK +* @param maxTotal the maximum number of objects that can be allocated by the pool +* @param maxIdle the cap on the number of "idle" instances in the pool +* @param minIdle the minimum number of idle objects to maintain in the pool +* @throws NullPointerException if parameter {@code nodes} is {@code null} +*/ + private FlinkJedisClusterConfig(Set nodes, int connectionTimeout, int maxRedirections, + int maxTotal, int maxIdle, int minIdle) { + super(connectionTimeout, maxTotal, maxIdle, minIdle); + + Preconditions.checkNotNull(nodes, "Node information should be presented"); + this.nodes = nodes; + this.maxRedirections = maxRedirections; + } + + + + /** +* Returns nodes. +* +* @return list of node information +*/ + public Set getNodes() { + Set ret = new HashSet<>(); + for (InetSocketAddress node : nodes) { + ret.add(new HostAndPort(node.getHostName(), node.getPort())); + } + return ret; + } + + /** +* Returns limit of redirection. +* +* @return limit of redirection +*/ + public int getMaxRedirections() { + return maxRedirections; + } + + + /** +* Builder for initializing {@link FlinkJedisClusterConfig}. +*/ + public static class Builder { --- End diff -- There also seems to be duplicate code for the three `Builder`s. Can we consolidate the commonly shared code of the `Builder`s into `FlinkConfigConsumerBase` too? > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15360842#comment-15360842 ] ASF GitHub Bot commented on FLINK-3034: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69409088 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig; +import org.apache.flink.util.Preconditions; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisSentinelPool; + +/** + * The builder for {@link RedisCommandsContainer}. + */ +public class RedisCommandsContainerBuilder { + + /** +* Builds container for single Redis environment. +* +* @param jedisPoolConfig configuration for JedisPool +* @return container for single Redis environment +* @throws NullPointerException if jedisPoolConfig is null +*/ + public static RedisCommandsContainer build(FlinkJedisPoolConfig jedisPoolConfig) { --- End diff -- Perhaps the `build` functions can make use of the `FlinkJedisConfigBase` too? This way `RedisSink#open()` can be even more simpler. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15360838#comment-15360838 ] ASF GitHub Bot commented on FLINK-3034: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69408897 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java --- @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; + +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * A sink that delivers data to a Redis channel using the Jedis client. + * The sink takes two arguments {@link FlinkJedisConfigBase} and {@link RedisMapper}. + * When {@link FlinkJedisPoolConfig} is passed as the first argument, + * the sink will create connection using {@link redis.clients.jedis.JedisPool}. Please use this when + * you want to connect to a single Redis server. + * When {@link FlinkJedisSentinelConfig} is passed as the first argument, the sink will create connection + * using {@link redis.clients.jedis.JedisSentinelPool}. Please use this when you want to connect to Sentinel. + * Please use {@link FlinkJedisClusterConfig} as the first argument if you want to connect to + * a Redis Cluster. + * + * Example: + * + * + *{@code + *public static class RedisExampleMapper implements RedisMapper> { + * + * private RedisCommand redisCommand; + * + * public RedisAdditionalDataMapper(RedisCommand redisCommand){ + * this.redisCommand = redisCommand; + * } + * public RedisCommandDescription getCommandDescription() { + * return new RedisCommandDescription(redisCommand, REDIS_ADDITIONAL_KEY); + * } + * public String getKeyFromData(Tuple2 data) { + * return data.f0; + * } + * public String getValueFromData(Tuple2 data) { + * return data.f1; + * } + *} + *JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder() + *.setHost(REDIS_HOST).setPort(REDIS_PORT).build(); + *new RedisSink(jedisPoolConfig, new RedisExampleDataMapper(RedisCommand.LPUSH)); + *} + * + * @param Type of the elements emitted by this sink + */ +public class RedisSink extends RichSinkFunction { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisSink.class); + + /** +* This additional key needed for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}. +* Other {@link RedisDataType} works only with two variable i.e. name of the list and value to be added. +* But for {@link RedisDataType#HASH} and {@link
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15360835#comment-15360835 ] ASF GitHub Bot commented on FLINK-3034: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69408867 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java --- @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; + +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * A sink that delivers data to a Redis channel using the Jedis client. + * The sink takes two arguments {@link FlinkJedisConfigBase} and {@link RedisMapper}. + * When {@link FlinkJedisPoolConfig} is passed as the first argument, + * the sink will create connection using {@link redis.clients.jedis.JedisPool}. Please use this when + * you want to connect to a single Redis server. + * When {@link FlinkJedisSentinelConfig} is passed as the first argument, the sink will create connection + * using {@link redis.clients.jedis.JedisSentinelPool}. Please use this when you want to connect to Sentinel. + * Please use {@link FlinkJedisClusterConfig} as the first argument if you want to connect to + * a Redis Cluster. + * + * Example: + * + * + *{@code + *public static class RedisExampleMapper implements RedisMapper> { + * + * private RedisCommand redisCommand; + * + * public RedisAdditionalDataMapper(RedisCommand redisCommand){ + * this.redisCommand = redisCommand; + * } --- End diff -- Wrong constructor name in the example in the Javadoc? > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15360821#comment-15360821 ] ASF GitHub Bot commented on FLINK-3034: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69407871 --- Diff: docs/apis/streaming/connectors/redis.md --- @@ -0,0 +1,177 @@ +--- +title: "Redis Connector" + +# Sub-level navigation +sub-nav-group: streaming +sub-nav-parent: connectors +sub-nav-pos: 6 +sub-nav-title: Redis +--- + + +This connector provides a Sink that can write to +[Redis](http://redis.io/) and also can publish data to [Redis PubSub](http://redis.io/topics/pubsub). To use this connector, add the +following dependency to your project: +{% highlight xml %} + + org.apache.flink + flink-connector-redis{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} +Version Compatibility: This module is compatible with redis 2.8.5 . --- End diff -- "redis", R should always be capitalized > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15359975#comment-15359975 ] ASF GitHub Bot commented on FLINK-3034: --- Github user subhankarb commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69372418 --- Diff: flink-streaming-connectors/flink-connector-redis/pom.xml --- @@ -0,0 +1,86 @@ + + +http://maven.apache.org/POM/4.0.0"; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> + + 4.0.0 + + + org.apache.flink + flink-streaming-connectors + 1.1-SNAPSHOT + .. + + + flink-connector-redis_2.10 + flink-connector-redis + + jar + + + 2.8.0 --- End diff -- added in doc. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15359974#comment-15359974 ] ASF GitHub Bot commented on FLINK-3034: --- Github user subhankarb commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69372416 --- Diff: flink-streaming-connectors/flink-connector-redis/pom.xml --- @@ -0,0 +1,86 @@ + + +http://maven.apache.org/POM/4.0.0"; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> + + 4.0.0 + + + org.apache.flink + flink-streaming-connectors + 1.1-SNAPSHOT + .. + + + flink-connector-redis_2.10 + flink-connector-redis + + jar + + + 2.8.0 + + + + + org.apache.flink + flink-streaming-java_2.10 + ${project.version} --- End diff -- done. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15358660#comment-15358660 ] ASF GitHub Bot commented on FLINK-3034: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69266892 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.JedisCluster; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Redis command container if we want to connect to a Redis cluster. + */ +public class RedisClusterContainer implements RedisCommandsContainer, Closeable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisClusterContainer.class); + + private JedisCluster jedisCluster; + + /** +* Initialize Redis command container for Redis cluster. +* +* @param jedisCluster JedisCluster instance +*/ + public RedisClusterContainer(JedisCluster jedisCluster) { + Preconditions.checkNotNull(jedisCluster, "Jedis cluster can not be null"); + + this.jedisCluster = jedisCluster; + } + + @Override + public void hset(final String key, final String hashField, final String value) { + try { + jedisCluster.hset(key, hashField, value); + } catch (Exception e) { --- End diff -- From the look of the Jedis code, all exceptions thrown by Jedis are `RuntimeException`s: https://github.com/xetorthio/jedis/tree/master/src/main/java/redis/clients/jedis/exceptions, so the API isn't informing to expect any exceptions to catch. I think for this first version on the Redis Sink, we should simply fail (and log the failure) by rethrowing the exception. For users I think it'd be better to offer "simply log failure and not hard fail" as a user option, like what the Kafka producer does (we can perhaps add this functionality later on). > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15358383#comment-15358383 ] ASF GitHub Bot commented on FLINK-3034: --- Github user subhankarb commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69247416 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.JedisCluster; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Redis command container if we want to connect to a Redis cluster. + */ +public class RedisClusterContainer implements RedisCommandsContainer, Closeable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisClusterContainer.class); + + private JedisCluster jedisCluster; + + /** +* Initialize Redis command container for Redis cluster. +* +* @param jedisCluster JedisCluster instance +*/ + public RedisClusterContainer(JedisCluster jedisCluster) { + Preconditions.checkNotNull(jedisCluster, "Jedis cluster can not be null"); + + this.jedisCluster = jedisCluster; + } + + @Override + public void hset(final String key, final String hashField, final String value) { + try { + jedisCluster.hset(key, hashField, value); + } catch (Exception e) { --- End diff -- I am quite unsure what to do. Jedis did not implemented any checked or unchecked exception. Should we go to the same line or log it and re-throw the exception. @mjsax, @tzulitai what do you think? > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15358366#comment-15358366 ] ASF GitHub Bot commented on FLINK-3034: --- Github user subhankarb commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69246112 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java --- @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.util.Preconditions; +import redis.clients.jedis.Protocol; + +import java.io.Serializable; + +/** + * Configuration for Jedis Pool. + */ +public class FlinkJedisPoolConfig implements Serializable { --- End diff -- @tzulitai `soTimeout` is extra config for `JedisSentinel`. Other common configs i am moving to base class. Really like it. thank u. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357270#comment-15357270 ] ASF GitHub Bot commented on FLINK-3034: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/1813 @subhankarb We should also add Redis Sink to the fault tolerance guarantee table for the connectors in the documentation. It can be found at `flink/docs/api/streaming/fault_tolerance.md`. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357253#comment-15357253 ] ASF GitHub Bot commented on FLINK-3034: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69152556 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.mapper; + +/** + * All available commands for Redis. Each command belongs to a {@link RedisDataType} group. + */ +public enum RedisCommand { + + /** +* Insert the specified value at the head of the list stored at key. +* If key does not exist, it is created as empty list before performing the push operations. +*/ + LPUSH(RedisDataType.LIST), + /** +* Insert the specified value at the tail of the list stored at key. +* If key does not exist, it is created as empty list before performing the push operation. +*/ + RPUSH(RedisDataType.LIST), + + /** +* Add the specified member to the set stored at key. +* Specified member that is already a member of this set is ignored. +*/ + SADD(RedisDataType.SET), + + /** +* Set key to hold the string value. If key already holds a value, +* it is overwritten, regardless of its type. +*/ + SET(RedisDataType.STRING), + + /** +* Adds the element to the HyperLogLog data structure stored at the variable name specified as first argument. +*/ + PFADD(RedisDataType.HYPER_LOG_LOG), + + /** +* Posts a message to the given channel. +*/ + PUBLISH(RedisDataType.PUBSUB), + + /** +* Adds the specified members with the specified score to the sorted set stored at key. +*/ + ZADD(RedisDataType.SORTED_SET), + + /** +* Sets field in the hash stored at key to value. If key does not exist, +* a new key holding a hash is created. If field already exists in the hash, it is overwritten. +*/ + HSET(RedisDataType.HASH); + + /** +* The {@link RedisDataType} this command belongs to +*/ + private RedisDataType redisDataType; + + RedisCommand(RedisDataType redisDataType) { + this.redisDataType = redisDataType; + } + + + public boolean isInRedisDataType(RedisDataType redisDataType) { + return this.redisDataType == redisDataType; + } + + /** +* The {@link RedisDataType} this command belongs to. +* @return the {@link RedisDataType} + */ --- End diff -- Indentation error: should be tab followed by a space > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357245#comment-15357245 ] ASF GitHub Bot commented on FLINK-3034: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/1813 Hi @mjsax @subhankarb, Gave a first run-through on the code, please let me know if you have any questions on the comments. I've also tested the connector again, on a local single-node & cluster installation, seems to work fine. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357233#comment-15357233 ] ASF GitHub Bot commented on FLINK-3034: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69151056 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java --- @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; + +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * A sink that delivers data to a Redis channel using the Jedis client. + * When creating the sink using first constructor {@link #RedisSink(FlinkJedisPoolConfig, RedisMapper)} + * the sink will create connection using {@link redis.clients.jedis.JedisPool}. + * When using second constructor {@link #RedisSink(FlinkJedisSentinelConfig, RedisMapper)} the sink will create connection + * using {@link redis.clients.jedis.JedisSentinelPool} to Redis cluster. Use this if Redis is + * configured using sentinels else use the third constructor {@link #RedisSink(FlinkJedisClusterConfig, RedisMapper)} + * which use {@link redis.clients.jedis.JedisCluster} to connect to Redis cluster. + * + * Example: + * + * + *{@code + *public static class RedisExampleMapper implements RedisMapper> { + * + * private RedisCommand redisCommand; + * + * public RedisAdditionalDataMapper(RedisCommand redisCommand){ + * this.redisCommand = redisCommand; + * } + * public RedisDataTypeDescription getDataTypeDescription() { + * return new RedisDataTypeDescription(redisCommand, REDIS_ADDITIONAL_KEY); + * } + * public String getKeyFromData(Tuple2 data) { + * return data.f0; + * } + * public String getValueFromData(Tuple2 data) { + * return data.f1; + * } + *} + *JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder() + *.setHost(REDIS_HOST).setPort(REDIS_PORT).build(); + *new RedisSink(jedisPoolConfig, new RedisExampleDataMapper(RedisCommand.LPUSH)); + *} + * + * @param Type of the elements emitted by this sink + */ +public class RedisSink extends RichSinkFunction { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisSink.class); + + /** +* This additional key needed for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}. +* Other {@link RedisDataType} works only with two variable i.e. name of the list and value to be added. +* But for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET} we need three variables. +* For {@link RedisDataType#HASH} we need hash name, hash key and element.
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357230#comment-15357230 ] ASF GitHub Bot commented on FLINK-3034: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69150728 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.mapper; + +/** + * All available commands for Redis. Each command belongs to a {@link RedisDataType} group. + */ +public enum RedisCommand { + + /** +* Insert the specified value at the head of the list stored at key. +* If key does not exist, it is created as empty list before performing the push operations. +*/ + LPUSH(RedisDataType.LIST), + /** +* Insert the specified value at the tail of the list stored at key. +* If key does not exist, it is created as empty list before performing the push operation. +*/ + RPUSH(RedisDataType.LIST), + + /** +* Add the specified member to the set stored at key. +* Specified member that is already a member of this set is ignored. +*/ + SADD(RedisDataType.SET), + + /** +* Set key to hold the string value. If key already holds a value, +* it is overwritten, regardless of its type. +*/ + SET(RedisDataType.STRING), + + /** +* Adds the element to the HyperLogLog data structure stored at the variable name specified as first argument. +*/ + PFADD(RedisDataType.HYPER_LOG_LOG), + + /** +* Posts a message to the given channel. +*/ + PUBLISH(RedisDataType.PUBSUB), + + /** +* Adds the specified members with the specified score to the sorted set stored at key. +*/ + ZADD(RedisDataType.SORTED_SET), + + /** +* Sets field in the hash stored at key to value. If key does not exist, +* a new key holding a hash is created. If field already exists in the hash, it is overwritten. +*/ + HSET(RedisDataType.HASH); + + /** +* The {@link RedisDataType} this command belongs to +*/ + private RedisDataType redisDataType; + + RedisCommand(RedisDataType redisDataType) { + this.redisDataType = redisDataType; + } + + + public boolean isInRedisDataType(RedisDataType redisDataType) { + return this.redisDataType == redisDataType; + } + + /** +* The {@link RedisDataType} this command belongs to. +* @return the {@link RedisDataType} + */ + public RedisDataType getRedisDataType(){ --- End diff -- This method isn't used anywhere in the code apart from tests. Perhaps we can drop the `isInRedisDataType` method, and use `getRedisDataType` in `RedisCommandDescription` to do the `HASH` / `SORTED_SET` check? > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357221#comment-15357221 ] ASF GitHub Bot commented on FLINK-3034: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69149847 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.mapper; + +/** + * All available commands for Redis. Each command belongs to a {@link RedisDataType} group. + */ +public enum RedisCommand { --- End diff -- From the code, it seems unclear why we are wrapping each enum value of `RedisCommand` around a `RedisDataType`. I understand there was previous discussion on having flexibility for multiple command per data type, but I'm not sure how this enum design relates to that. The `getRedisDataType()` of the enumeration isn't used anywhere in the code, or user-facing interfaces. Am I missing something? > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357164#comment-15357164 ] ASF GitHub Bot commented on FLINK-3034: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69142414 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import java.io.IOException; +import java.io.Serializable; + +/** + * The container for all available Redis Commands. + */ +public interface RedisCommandsContainer extends Serializable { + + /** +* Sets field in the hash stored at key to value. +* If key does not exist, a new key holding a hash is created. +* If field already exists in the hash, it is overwritten. +* +* @param key Hash name +* @param hashField Hash field +* @param value Hash value +*/ + void hset(String key, String hashField, String value); + + /** +* Insert the specified value at the tail of the list stored at key. +* If key does not exist, it is created as empty list before performing the push operation. +* +* @param listName Name of the List +* @param value Value to be added +*/ + void rpush(String listName, String value); + + /** +* Insert the specified value at the head of the list stored at key. +* If key does not exist, it is created as empty list before performing the push operation. +* +* @param listName Name of the List +* @param value Value to be added +*/ + void lpush(String listName, String value); + + /** +* Add the specified member to the set stored at key. +* Specified members that are already a member of this set are ignored. +* If key does not exist, a new set is created before adding the specified members. +* +* @param setName Name of the Set +* @param value Value to be added +*/ + void sadd(String setName, String value); + + /** +* Posts a message to the given channel. +* +* @param channelName Name of the channel to which data will be published +* @param message the message +*/ + void publish(String channelName, String message); --- End diff -- extra space before `publish` > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357162#comment-15357162 ] ASF GitHub Bot commented on FLINK-3034: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69142275 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.JedisCluster; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Redis command container if we want to connect to a Redis cluster. + */ +public class RedisClusterContainer implements RedisCommandsContainer, Closeable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisClusterContainer.class); + + private JedisCluster jedisCluster; + + /** +* Initialize Redis command container for Redis cluster. +* +* @param jedisCluster JedisCluster instance +*/ + public RedisClusterContainer(JedisCluster jedisCluster) { + Preconditions.checkNotNull(jedisCluster, "Jedis cluster can not be null"); + + this.jedisCluster = jedisCluster; + } + + @Override + public void hset(final String key, final String hashField, final String value) { + try { + jedisCluster.hset(key, hashField, value); + } catch (Exception e) { --- End diff -- Seems like we aren't failing on record failures for all command types. Should we also document this behaviour? > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357136#comment-15357136 ] ASF GitHub Bot commented on FLINK-3034: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69138670 --- Diff: flink-streaming-connectors/flink-connector-redis/pom.xml --- @@ -0,0 +1,86 @@ + + +http://maven.apache.org/POM/4.0.0"; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> + + 4.0.0 + + + org.apache.flink + flink-streaming-connectors + 1.1-SNAPSHOT + .. + + + flink-connector-redis_2.10 + flink-connector-redis + + jar + + + 2.8.0 --- End diff -- What Redis versions are we supporting with this Jedis version? Perhaps we should have info about this in the documentation. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357121#comment-15357121 ] ASF GitHub Bot commented on FLINK-3034: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69137523 --- Diff: flink-streaming-connectors/flink-connector-redis/pom.xml --- @@ -0,0 +1,86 @@ + + +http://maven.apache.org/POM/4.0.0"; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> + + 4.0.0 + + + org.apache.flink + flink-streaming-connectors + 1.1-SNAPSHOT + .. + + + flink-connector-redis_2.10 + flink-connector-redis + + jar + + + 2.8.0 + + + + + org.apache.flink + flink-streaming-java_2.10 + ${project.version} --- End diff -- this should be in the `provided` scope. Connector jars don't need to be packaged with the streaming java API jar. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357105#comment-15357105 ] ASF GitHub Bot commented on FLINK-3034: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69135245 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisMapper.java --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.mapper; + +import org.apache.flink.api.common.functions.Function; + +import java.io.Serializable; + +/** + * Function that creates the description how the input data should be mapped to redis type. + *Example: + *{@code + *private static class RedisTestMapper implements RedisMapper> { + *public RedisDataTypeDescription getDataTypeDescription() { + *return new RedisDataTypeDescription(RedisCommand.PUBLISH); + *} + *public String getKeyFromData(Tuple2 data) { + *return data.f0; + *} + *public String getValueFromData(Tuple2 data) { + *return data.f1; + *} + *} + *} + * + * @param The type of the element handled by this {@code RedisMapper} + */ +public interface RedisMapper extends Function, Serializable { + + /** +* Returns descriptor which defines data type. +* +* @return data type descriptor +*/ + RedisCommandDescription getDataTypeDescription(); --- End diff -- Now that this is renamed to `RedisCommandDescription`, should `getDataTypeDescription` be renamed accordingly too? > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357102#comment-15357102 ] ASF GitHub Bot commented on FLINK-3034: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69134232 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java --- @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; + +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * A sink that delivers data to a Redis channel using the Jedis client. + * When creating the sink using first constructor {@link #RedisSink(FlinkJedisPoolConfig, RedisMapper)} + * the sink will create connection using {@link redis.clients.jedis.JedisPool}. + * When using second constructor {@link #RedisSink(FlinkJedisSentinelConfig, RedisMapper)} the sink will create connection + * using {@link redis.clients.jedis.JedisSentinelPool} to Redis cluster. Use this if Redis is + * configured using sentinels else use the third constructor {@link #RedisSink(FlinkJedisClusterConfig, RedisMapper)} + * which use {@link redis.clients.jedis.JedisCluster} to connect to Redis cluster. + * + * Example: + * + * + *{@code + *public static class RedisExampleMapper implements RedisMapper> { + * + * private RedisCommand redisCommand; + * + * public RedisAdditionalDataMapper(RedisCommand redisCommand){ + * this.redisCommand = redisCommand; + * } + * public RedisDataTypeDescription getDataTypeDescription() { + * return new RedisDataTypeDescription(redisCommand, REDIS_ADDITIONAL_KEY); + * } + * public String getKeyFromData(Tuple2 data) { + * return data.f0; + * } + * public String getValueFromData(Tuple2 data) { + * return data.f1; + * } + *} + *JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder() + *.setHost(REDIS_HOST).setPort(REDIS_PORT).build(); + *new RedisSink(jedisPoolConfig, new RedisExampleDataMapper(RedisCommand.LPUSH)); + *} + * + * @param Type of the elements emitted by this sink + */ +public class RedisSink extends RichSinkFunction { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisSink.class); + + /** +* This additional key needed for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}. +* Other {@link RedisDataType} works only with two variable i.e. name of the list and value to be added. +* But for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET} we need three variables. +* For {@link RedisDataType#HASH} we need hash name, hash key and element.
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357099#comment-15357099 ] ASF GitHub Bot commented on FLINK-3034: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69134099 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java --- @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; + +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * A sink that delivers data to a Redis channel using the Jedis client. + * When creating the sink using first constructor {@link #RedisSink(FlinkJedisPoolConfig, RedisMapper)} + * the sink will create connection using {@link redis.clients.jedis.JedisPool}. + * When using second constructor {@link #RedisSink(FlinkJedisSentinelConfig, RedisMapper)} the sink will create connection + * using {@link redis.clients.jedis.JedisSentinelPool} to Redis cluster. Use this if Redis is + * configured using sentinels else use the third constructor {@link #RedisSink(FlinkJedisClusterConfig, RedisMapper)} + * which use {@link redis.clients.jedis.JedisCluster} to connect to Redis cluster. + * + * Example: + * + * + *{@code + *public static class RedisExampleMapper implements RedisMapper> { + * + * private RedisCommand redisCommand; + * + * public RedisAdditionalDataMapper(RedisCommand redisCommand){ + * this.redisCommand = redisCommand; + * } + * public RedisDataTypeDescription getDataTypeDescription() { + * return new RedisDataTypeDescription(redisCommand, REDIS_ADDITIONAL_KEY); + * } + * public String getKeyFromData(Tuple2 data) { + * return data.f0; + * } + * public String getValueFromData(Tuple2 data) { + * return data.f1; + * } + *} + *JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder() + *.setHost(REDIS_HOST).setPort(REDIS_PORT).build(); + *new RedisSink(jedisPoolConfig, new RedisExampleDataMapper(RedisCommand.LPUSH)); + *} + * + * @param Type of the elements emitted by this sink + */ +public class RedisSink extends RichSinkFunction { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisSink.class); + + /** +* This additional key needed for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET}. +* Other {@link RedisDataType} works only with two variable i.e. name of the list and value to be added. +* But for {@link RedisDataType#HASH} and {@link RedisDataType#SORTED_SET} we need three variables. +* For {@link RedisDataType#HASH} we need hash name, hash key and element.
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357089#comment-15357089 ] ASF GitHub Bot commented on FLINK-3034: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r69132723 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java --- @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.util.Preconditions; +import redis.clients.jedis.Protocol; + +import java.io.Serializable; + +/** + * Configuration for Jedis Pool. + */ +public class FlinkJedisPoolConfig implements Serializable { --- End diff -- The 3 configuration classes seems to have quite a bit of duplicate code, as well as inconsistency between them. Ex. All 3 have getters / setters for `timeout`, `maxTotal`, `maxIdle`, etc., while the getter / setter method for `timeout` is sometimes called `getSoTimeout` and sometimes `getTimeout`. Can we implement the shared settings in a base class, and let the 3 types of configuration extend from that? > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357039#comment-15357039 ] ASF GitHub Bot commented on FLINK-3034: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/1813 @mjsax I think the failing `JMXReporterTest.testJMXAvailability` was just hotfixed with this commit yesterday: https://github.com/apache/flink/commit/53630da01bcbfe05eda90869b1198b4e1c554a86 > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357030#comment-15357030 ] ASF GitHub Bot commented on FLINK-3034: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/1813 Sure, I'll give a full review now. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15356225#comment-15356225 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on the issue: https://github.com/apache/flink/pull/1813 @rmetzger What about this failing tests... ``` JMXReporterTest.testJMXAvailability:148 » Runtime Could not start JMX server o... ``` Seems, there is no JIRA -- known issue? -- or no issue? > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15352920#comment-15352920 ] ASF GitHub Bot commented on FLINK-3034: --- Github user subhankarb commented on the issue: https://github.com/apache/flink/pull/1813 @mjsax , @rmetzger plz review. The changed model is described in the PR description. thanks, subhankar > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15351701#comment-15351701 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on the issue: https://github.com/apache/flink/pull/1813 My two cents: 1) seems to got sorted out (thx @tzulitai for the input!) 2) I personally do not care too much about the name conflict. Reusing the same class for sink and source sounds reasonable. Maybe `FlinkRedisConfig` as name? 3) Agreed. `PUBSUB` as datatpye is fine IMHO (so we can use same type for source and sink, which makes it clear if PUB or SUB is used) 4 + 5 ) Your suggestions sound good to me. Please update PR so we can get better lock and feel for it :) @tzulitai @subhankarb Thanks a lot! > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15350742#comment-15350742 ] ASF GitHub Bot commented on FLINK-3034: --- Github user subhankarb commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r68551999 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java --- @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisSentinelPool; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Redis command container if we want to connect to a single Redis server or to Redis sentinels + * If want to connect to a single Redis server, plz use the first constructor {@link #RedisContainer(JedisPool)}. + * If want to connect to a Redis sentinels, Plz use the second constructor {@link #RedisContainer(JedisSentinelPool)} + */ +public class RedisContainer implements RedisCommandsContainer, Closeable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisContainer.class); + + private JedisPool jedisPool; + private JedisSentinelPool jedisSentinelPool; + + + /** +* Use this constructor if to connect with single Redis server. +* +* @param jedisPool JedisPool which actually manages Jedis instances +*/ + public RedisContainer(JedisPool jedisPool) { + Preconditions.checkNotNull(jedisPool, "Jedis Pool can not be null"); + this.jedisPool = jedisPool; + } + + /** +* Use this constructor if Redis environment is clustered with sentinels. +* +* @param sentinelPool SentinelPool which actually manages Jedis instances +*/ + public RedisContainer(final JedisSentinelPool sentinelPool) { + Preconditions.checkNotNull(sentinelPool, "Jedis Sentinel Pool can not be null"); + this.jedisSentinelPool = sentinelPool; + } + + /** +* Closes the Jedis instances. +*/ + @Override + public void close() throws IOException { + if (this.jedisPool != null) { + this.jedisPool.close(); + } + if (this.jedisSentinelPool != null) { + this.jedisSentinelPool.close(); + } + } + + @Override + public void hset(final String hashName, final String key, final String value) { + Jedis jedis = null; + try { + jedis = getInstance(); + jedis.hset(hashName, key, value); --- End diff -- The `RedisDataTypeDescription.additionalKey` is used for the key. Yah the param names are confusing. i am making the changes according to the same line the client. Thank you very much. :) > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15350696#comment-15350696 ] ASF GitHub Bot commented on FLINK-3034: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r68546159 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java --- @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisSentinelPool; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Redis command container if we want to connect to a single Redis server or to Redis sentinels + * If want to connect to a single Redis server, plz use the first constructor {@link #RedisContainer(JedisPool)}. + * If want to connect to a Redis sentinels, Plz use the second constructor {@link #RedisContainer(JedisSentinelPool)} + */ +public class RedisContainer implements RedisCommandsContainer, Closeable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisContainer.class); + + private JedisPool jedisPool; + private JedisSentinelPool jedisSentinelPool; + + + /** +* Use this constructor if to connect with single Redis server. +* +* @param jedisPool JedisPool which actually manages Jedis instances +*/ + public RedisContainer(JedisPool jedisPool) { + Preconditions.checkNotNull(jedisPool, "Jedis Pool can not be null"); + this.jedisPool = jedisPool; + } + + /** +* Use this constructor if Redis environment is clustered with sentinels. +* +* @param sentinelPool SentinelPool which actually manages Jedis instances +*/ + public RedisContainer(final JedisSentinelPool sentinelPool) { + Preconditions.checkNotNull(sentinelPool, "Jedis Sentinel Pool can not be null"); + this.jedisSentinelPool = sentinelPool; + } + + /** +* Closes the Jedis instances. +*/ + @Override + public void close() throws IOException { + if (this.jedisPool != null) { + this.jedisPool.close(); + } + if (this.jedisSentinelPool != null) { + this.jedisSentinelPool.close(); + } + } + + @Override + public void hset(final String hashName, final String key, final String value) { + Jedis jedis = null; + try { + jedis = getInstance(); + jedis.hset(hashName, key, value); --- End diff -- Is it intended to use the `additionalKey` as the primary key for `HASH` too? If yes, the naming here is very confusing. `hashName` / `hashField` should refer to the inner secondary key. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15350685#comment-15350685 ] ASF GitHub Bot commented on FLINK-3034: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r68545039 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java --- @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisSentinelPool; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Redis command container if we want to connect to a single Redis server or to Redis sentinels + * If want to connect to a single Redis server, plz use the first constructor {@link #RedisContainer(JedisPool)}. + * If want to connect to a Redis sentinels, Plz use the second constructor {@link #RedisContainer(JedisSentinelPool)} + */ +public class RedisContainer implements RedisCommandsContainer, Closeable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisContainer.class); + + private JedisPool jedisPool; + private JedisSentinelPool jedisSentinelPool; + + + /** +* Use this constructor if to connect with single Redis server. +* +* @param jedisPool JedisPool which actually manages Jedis instances +*/ + public RedisContainer(JedisPool jedisPool) { + Preconditions.checkNotNull(jedisPool, "Jedis Pool can not be null"); + this.jedisPool = jedisPool; + } + + /** +* Use this constructor if Redis environment is clustered with sentinels. +* +* @param sentinelPool SentinelPool which actually manages Jedis instances +*/ + public RedisContainer(final JedisSentinelPool sentinelPool) { + Preconditions.checkNotNull(sentinelPool, "Jedis Sentinel Pool can not be null"); + this.jedisSentinelPool = sentinelPool; + } + + /** +* Closes the Jedis instances. +*/ + @Override + public void close() throws IOException { + if (this.jedisPool != null) { + this.jedisPool.close(); + } + if (this.jedisSentinelPool != null) { + this.jedisSentinelPool.close(); + } + } + + @Override + public void hset(final String hashName, final String key, final String value) { + Jedis jedis = null; + try { + jedis = getInstance(); + jedis.hset(hashName, key, value); --- End diff -- The argument order here is wrong. Jedis takes the argument for hset in this order: `(key, hashField, value)`. The primary key goes first, then the secondary. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15350673#comment-15350673 ] ASF GitHub Bot commented on FLINK-3034: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/1813 Hi @subhankarb, Sorry, I just read the code and didn't realize you were using the `additionalKey` as the `setName` for `SORTED_SET`. I think this might be quite confusing for Redis users. For Redis users, for sorted sets the `setName` is considered as the primary key, and the per-value score in the set is considered the additional / secondary key. It works the same for the hash data type: the hash fields are considered additional / secondary keys. So right now, the usage principal for `SORTED_SET` and `HASH` seems to be opposite to each other? > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15350646#comment-15350646 ] ASF GitHub Bot commented on FLINK-3034: --- Github user subhankarb commented on the issue: https://github.com/apache/flink/pull/1813 @tzulitai, @mjsax thank you very much for your valuable feedback. 1) `additionalKey` was supposed to one time declaration for `SORTED_SET` and `HASH`. For `HASH` it is the value of hash name and `SORTED_SET` it is the name of the set. I assume it would not change once it declares. 2) `JedisPoolConfig`/ `JedisClusterConfig` were not `Serializable` so i needed a wrapper class for that. @mjsax there is a story for a source for Redis, so `RedisSinkConfig` would not be a valid one as the config would be same for sink and source. Thats why i make the `JedisPoolConfig`/ `JedisClusterConfig` ctor private. So that user forced to use the builder class to avoid confusion. 3) When i started with this i thought it was obvious that if i use PUBSUB in sink, i would always use it for publish and if i use this in source i would always use it for subscribe :) . 4 + 5 ) We can group the command and dataType like. `public enum RedisCommand { LPUSH(RedisDataType.LIST), RPUSH(RedisDataType.LIST); private RedisDataType redisDataType; RedisCommand(RedisDataType redisDataType) { this.redisDataType = redisDataType; } public boolean isInRedisDataType(RedisDataType redisDataType) { return this.redisDataType == redisDataType; } }` And in `RedisDataTypeDescription` we can extract the command . So that in future we can add new command. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15349640#comment-15349640 ] ASF GitHub Bot commented on FLINK-3034: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/1813 Hi @mjsax, Regarding 1): just came to me that for SORTED_SET, a fixed secondary key doesn't make sense, because the secondary key is supposed to define the order of the inserted element in the set. For HASH, I guess it depends on the use case. For example, a stream of user events with user id can update Redis using the user id as primary key, and inner hash field (secondary key) depending on what event occurred. I agree with your suggestion to settle with a subset of commands to start with, and if we want to, make it more flexible for multiple command per data type with new JIRAs as it would probably need larger refactoring. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15349615#comment-15349615 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on the issue: https://github.com/apache/flink/pull/1813 @tzulitai Thank a lot for testing this! Your feedback is really great! 1) I am not a Redis users either -- from my understanding, the second key determined the field that would be updated -- it makes sense to me, that this field is the same for all updates (ie, independent of the data itself). Thus, it would not make sense to extract it from the record via `RedisMapper`. 2) Might be confusing for Redis users (I did not stumble over it as a no-user ;)) -- renaming seems reasonable -- maybe `RedisSinkConfig` ? 3+4) Well, the data type is the same, no matter if you want to read or write it. But I agree, that using the date type itself in `RedisSink` might not be sufficient. I think the idea to support `LPUSH` was, that Flink as a streaming system might do best to append at the tail, not at the head... 5) Similar to my answer to (4) -- the data type is the same -- independently what operation you perform on it. And you state yourself, that `RedisMapper` maps the type to the command/action. 6) See my answer to (1) As a non-Redis users, it is hard for me to judge what flexibility is required/expected by users (ie, secondary key per record or not?). It also seems, that not all available action (LPUSH vs RPUSH) are implemented. How important is it, to support all action -- do you think, that the current once cover the most basic subset to get started with. It might be good, to just start with a subset of commands, and add new commands later on (just a suggestion). And do the think the implemented actions for each type are the most useful? If we want to support multiple different actions per data type, it would be a larger refactoring of this code I think. @subhankarb @rmetzger What is your opinion on this? > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15349055#comment-15349055 ] ASF GitHub Bot commented on FLINK-3034: --- Github user subhankarb commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r68484062 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java --- @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisSentinelPool; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Redis command container if we want to connect to a single Redis server or to Redis sentinels + * If want to connect to a single Redis server, plz use the first constructor {@link #RedisContainer(JedisPool)}. + * If want to connect to a Redis sentinels, Plz use the second constructor {@link #RedisContainer(JedisSentinelPool)} + */ +public class RedisContainer implements RedisCommandsContainer, Closeable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisContainer.class); + + private JedisPool jedisPool; + private JedisSentinelPool jedisSentinelPool; + + + /** +* Use this constructor if to connect with single Redis server. +* +* @param jedisPool JedisPool which actually manages Jedis instances +*/ + public RedisContainer(JedisPool jedisPool) { + Preconditions.checkNotNull(jedisPool, "Jedis Pool can not be null"); + this.jedisPool = jedisPool; + } + + /** +* Use this constructor if Redis environment is clustered with sentinels. +* +* @param sentinelPool SentinelPool which actually manages Jedis instances +*/ + public RedisContainer(final JedisSentinelPool sentinelPool) { + Preconditions.checkNotNull(sentinelPool, "Jedis Sentinel Pool can not be null"); + this.jedisSentinelPool = sentinelPool; + } + + /** +* Closes the Jedis instances. +*/ + @Override + public void close() throws IOException { + if (this.jedisPool != null) { + this.jedisPool.close(); + } + if (this.jedisSentinelPool != null) { + this.jedisSentinelPool.close(); + } + } + + @Override + public void hset(final String hashName, final String key, final String value) { + Jedis jedis = null; + try { + jedis = getInstance(); + jedis.hset(hashName, key, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command HSET to key {} and field {} error message {}", + key, key, e.getMessage()); + } + } finally { + releaseInstance(jedis); --- End diff -- `releaseInstance(jedis)` use jedis `close()` method. The close method return the resource to connection pool. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connect
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15348606#comment-15348606 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on the issue: https://github.com/apache/flink/pull/1813 @rmetzger Thanks :) I did not test with a Redis cluster or similar. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15348422#comment-15348422 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on the issue: https://github.com/apache/flink/pull/1813 Please address last comment. Otherwise, LGTM. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15348407#comment-15348407 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r68412669 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java --- @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.JedisCluster; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Redis command container if we want to connect to a Redis cluster. + */ +public class RedisClusterContainer implements RedisCommandsContainer, Closeable { + --- End diff -- You can actually remove the whole JavaDoc comment (and it will inherit automatically) If you specify a new JavaDoc it replaces the old one; using `inheritDoc` tag you can include the original JavaDoc to extent it. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15343862#comment-15343862 ] ASF GitHub Bot commented on FLINK-3034: --- Github user subhankarb commented on the issue: https://github.com/apache/flink/pull/1813 @mjsax plz review. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15343860#comment-15343860 ] ASF GitHub Bot commented on FLINK-3034: --- Github user subhankarb commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r68008039 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java --- @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.JedisCluster; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Redis command container if we want to connect to a Redis cluster. + */ +public class RedisClusterContainer implements RedisCommandsContainer, Closeable { + --- End diff -- replaced with {@inheritDoc} > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15343857#comment-15343857 ] ASF GitHub Bot commented on FLINK-3034: --- Github user subhankarb commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r68007965 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfig.java --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import com.google.common.base.Preconditions; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Protocol; + +import java.io.Serializable; +import java.util.Set; + +/** + * Configuration for Jedis Sentinel Pool. + */ +public class JedisSentinelConfig implements Serializable { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(JedisSentinelConfig.class); + + private String masterName; + private Set sentinels; + private int connectionTimeout; + private int soTimeout; + private String password; + private int database; + private int maxTotal; + private int maxIdle; + private int minIdle; + + /** +* The master name and sentinels are mandatory, and when you didn't set these, it throws NullPointerException. +* +* @param masterName master name of the replica set +* @param sentinels set of sentinel hosts +* @param connectionTimeout timeout connection timeout +* @param soTimeout timeout socket timeout +* @param password password password, if any +* @param database database database index +* @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool +* @param maxIdle the cap on the number of "idle" instances in the pool +* @param minIdle the minimum number of idle objects to maintain in the pool +* --- End diff -- fixed > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15343708#comment-15343708 ] ASF GitHub Bot commented on FLINK-3034: --- Github user subhankarb commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r67994813 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java --- @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.JedisCluster; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Redis command container if we want to connect to a Redis cluster. + */ +public class RedisClusterContainer implements RedisCommandsContainer, Closeable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisClusterContainer.class); + + private JedisCluster jedisCluster; + + /** +* Initialize Redis command container for Redis cluster. +* +* @param jedisCluster JedisCluster instance +*/ + public RedisClusterContainer(JedisCluster jedisCluster) { + this.jedisCluster = jedisCluster; + } + + /** +* Sets field in the hash stored at key to value. +* If key does not exist, a new key holding a hash is created. +* If field already exists in the hash, it is overwritten. +* +* @param hashName Hash name +* @param key Hash field name +* @param value Hash value +*/ + @Override + public void hset(final String hashName, final String key, final String value) { + try { + jedisCluster.hset(hashName, key, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command HSET to hash {} error message {}", + hashName, key, e.getMessage()); + } + } + } + + /** +* Insert all the specified values at the tail of the list stored at key. +* If key does not exist, it is created as empty list before performing the push operation. +* +* @param listName Name of the List +* @param value Value to be added +*/ + @Override + public void rpush(final String listName, final String value) { + try { + jedisCluster.rpush(listName, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command RPUSH to list {} error message: {}", + listName, e.getMessage()); + } + } + } + + /** +* Add the specified members to the set stored at key. +* Specified members that are already a member of this set are ignored. +* If key does not exist, a new set is created before adding the specified members. +* +* @param setName Name of the Set +* @param value Value to be added +*/ + @Override + public void sadd(final String setName, final String value) { + try { + jedisCluster.sadd(setName, value); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command RPUSH to set {} error message {}", + setName, e.getMessage()); + } + } + } + + /** +
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15342139#comment-15342139 ] ASF GitHub Bot commented on FLINK-3034: --- Github user subhankarb commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r67905768 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java --- @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.JedisCluster; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Redis command container if we want to connect to a Redis cluster. + */ +public class RedisClusterContainer implements RedisCommandsContainer, Closeable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisClusterContainer.class); + + private JedisCluster jedisCluster; + + /** +* Initialize Redis command container for Redis cluster. +* +* @param jedisCluster JedisCluster instance +*/ + public RedisClusterContainer(JedisCluster jedisCluster) { + this.jedisCluster = jedisCluster; --- End diff -- Already checked in RedisCommandsContainerBuilder.build method > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15342111#comment-15342111 ] ASF GitHub Bot commented on FLINK-3034: --- Github user subhankarb commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r67904127 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfig.java --- @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis.common.config; + +import com.google.common.base.Preconditions; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.Protocol; + +import java.io.Serializable; +import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.Set; + +/** + * Configuration for Jedis Cluster. + */ +public class JedisClusterConfig implements Serializable { + private static final long serialVersionUID = 1L; + + private Set nodes; + private int timeout; + private int maxRedirections; + private int maxTotal; + private int maxIdle; + private int minIdle; + + /** +* The list of node is mandatory, and when nodes is not set, it throws NullPointerException. --- End diff -- The constructor is private. So user has to set everything through builder class except nodes. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15341275#comment-15341275 ] ASF GitHub Bot commented on FLINK-3034: --- Github user subhankarb commented on the issue: https://github.com/apache/flink/pull/1813 @mjsax wow... that's a lot. thanks you very very much for your time. i am fixing ASAP. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15338767#comment-15338767 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on the issue: https://github.com/apache/flink/pull/1813 Most comments are nit or minor. Please fix. Otherwise, looks good. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15338764#comment-15338764 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r67622740 --- Diff: flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import redis.clients.jedis.Jedis; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class RedisSinkTest extends RedisTestBase { + + private JedisPoolConfig jedisPoolConfig; + private static final Long NUM_ELEMENTS = 20L; + private static final String REDIS_KEY = "TEST_KEY"; + private static final String REDIS_ADDITIONAL_KEY = "TEST_ADDITIONAL_KEY"; + + StreamExecutionEnvironment env; + + + private Jedis jedis; + + @Before + public void setUp(){ + jedisPoolConfig = new JedisPoolConfig.Builder() + .setHost(REDIS_HOST) + .setPort(REDIS_PORT).build(); + jedis = new Jedis(REDIS_HOST, REDIS_PORT); + env = StreamExecutionEnvironment.getExecutionEnvironment(); + } + + @Test + public void testRedisListDataType() throws Exception { + DataStreamSource> source = env.addSource(new TestSourceFunction()); + RedisSink> redisSink = new RedisSink<>(jedisPoolConfig, + new RedisDataMapper(RedisDataType.LIST)); + + source.addSink(redisSink); + env.execute("Test Redis List Data Type"); + + assertEquals(NUM_ELEMENTS, jedis.llen(REDIS_KEY)); + + jedis.del(REDIS_KEY); + } + + @Test + public void testRedisSetDataType() throws Exception { + DataStreamSource> source = env.addSource(new TestSourceFunction()); + RedisSink> redisSink = new RedisSink<>(jedisPoolConfig, + new RedisDataMapper(RedisDataType.SET)); + + source.addSink(redisSink); + env.execute("Test Redis Set Data Type"); + + assertEquals(NUM_ELEMENTS, jedis.scard(REDIS_KEY)); + + jedis.del(REDIS_KEY); + } + + @Test + public void testRedisHyperLogLogDataType() throws Exception { + DataStreamSource> source = env.addSource(new TestSourceFunction()); + RedisSink> redisSink = new RedisSink<>(jedisPoolConfig, + new RedisDataMapper(RedisDataType.HYPER_LOG_LOG)); + + source.addSink(redisSink); + env.execute("Test Redis Hyper Log Log Data Type"); + + assertEquals(NUM_ELEMENTS, Long.valueOf(jedis.pfcount(REDIS_KEY))); + + jedis.del(REDIS_KEY); + } + + @Test + public void testRedisSortedSetDataType() throws Exception { + DataStreamSo
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15338760#comment-15338760 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r67622724 --- Diff: flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import redis.clients.jedis.Jedis; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class RedisSinkTest extends RedisTestBase { + + private JedisPoolConfig jedisPoolConfig; + private static final Long NUM_ELEMENTS = 20L; + private static final String REDIS_KEY = "TEST_KEY"; + private static final String REDIS_ADDITIONAL_KEY = "TEST_ADDITIONAL_KEY"; + + StreamExecutionEnvironment env; + + + private Jedis jedis; + + @Before + public void setUp(){ + jedisPoolConfig = new JedisPoolConfig.Builder() + .setHost(REDIS_HOST) + .setPort(REDIS_PORT).build(); + jedis = new Jedis(REDIS_HOST, REDIS_PORT); + env = StreamExecutionEnvironment.getExecutionEnvironment(); + } + + @Test + public void testRedisListDataType() throws Exception { + DataStreamSource> source = env.addSource(new TestSourceFunction()); + RedisSink> redisSink = new RedisSink<>(jedisPoolConfig, + new RedisDataMapper(RedisDataType.LIST)); + + source.addSink(redisSink); + env.execute("Test Redis List Data Type"); + + assertEquals(NUM_ELEMENTS, jedis.llen(REDIS_KEY)); + + jedis.del(REDIS_KEY); + } + + @Test + public void testRedisSetDataType() throws Exception { + DataStreamSource> source = env.addSource(new TestSourceFunction()); + RedisSink> redisSink = new RedisSink<>(jedisPoolConfig, + new RedisDataMapper(RedisDataType.SET)); + + source.addSink(redisSink); + env.execute("Test Redis Set Data Type"); + + assertEquals(NUM_ELEMENTS, jedis.scard(REDIS_KEY)); + + jedis.del(REDIS_KEY); + } + + @Test + public void testRedisHyperLogLogDataType() throws Exception { + DataStreamSource> source = env.addSource(new TestSourceFunction()); + RedisSink> redisSink = new RedisSink<>(jedisPoolConfig, + new RedisDataMapper(RedisDataType.HYPER_LOG_LOG)); + + source.addSink(redisSink); + env.execute("Test Redis Hyper Log Log Data Type"); + + assertEquals(NUM_ELEMENTS, Long.valueOf(jedis.pfcount(REDIS_KEY))); + + jedis.del(REDIS_KEY); + } + + @Test + public void testRedisSortedSetDataType() throws Exception { + DataStreamSo
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15338761#comment-15338761 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r67622731 --- Diff: flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import redis.clients.jedis.Jedis; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class RedisSinkTest extends RedisTestBase { + + private JedisPoolConfig jedisPoolConfig; + private static final Long NUM_ELEMENTS = 20L; + private static final String REDIS_KEY = "TEST_KEY"; + private static final String REDIS_ADDITIONAL_KEY = "TEST_ADDITIONAL_KEY"; + + StreamExecutionEnvironment env; + + + private Jedis jedis; + + @Before + public void setUp(){ + jedisPoolConfig = new JedisPoolConfig.Builder() + .setHost(REDIS_HOST) + .setPort(REDIS_PORT).build(); + jedis = new Jedis(REDIS_HOST, REDIS_PORT); + env = StreamExecutionEnvironment.getExecutionEnvironment(); + } + + @Test + public void testRedisListDataType() throws Exception { + DataStreamSource> source = env.addSource(new TestSourceFunction()); + RedisSink> redisSink = new RedisSink<>(jedisPoolConfig, + new RedisDataMapper(RedisDataType.LIST)); + + source.addSink(redisSink); + env.execute("Test Redis List Data Type"); + + assertEquals(NUM_ELEMENTS, jedis.llen(REDIS_KEY)); + + jedis.del(REDIS_KEY); + } + + @Test + public void testRedisSetDataType() throws Exception { + DataStreamSource> source = env.addSource(new TestSourceFunction()); + RedisSink> redisSink = new RedisSink<>(jedisPoolConfig, + new RedisDataMapper(RedisDataType.SET)); + + source.addSink(redisSink); + env.execute("Test Redis Set Data Type"); + + assertEquals(NUM_ELEMENTS, jedis.scard(REDIS_KEY)); + + jedis.del(REDIS_KEY); + } + + @Test + public void testRedisHyperLogLogDataType() throws Exception { + DataStreamSource> source = env.addSource(new TestSourceFunction()); + RedisSink> redisSink = new RedisSink<>(jedisPoolConfig, + new RedisDataMapper(RedisDataType.HYPER_LOG_LOG)); + + source.addSink(redisSink); + env.execute("Test Redis Hyper Log Log Data Type"); + + assertEquals(NUM_ELEMENTS, Long.valueOf(jedis.pfcount(REDIS_KEY))); + + jedis.del(REDIS_KEY); + } + + @Test + public void testRedisSortedSetDataType() throws Exception { + DataStreamSo
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15338762#comment-15338762 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r67622734 --- Diff: flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import redis.clients.jedis.Jedis; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class RedisSinkTest extends RedisTestBase { + + private JedisPoolConfig jedisPoolConfig; + private static final Long NUM_ELEMENTS = 20L; + private static final String REDIS_KEY = "TEST_KEY"; + private static final String REDIS_ADDITIONAL_KEY = "TEST_ADDITIONAL_KEY"; + + StreamExecutionEnvironment env; + + + private Jedis jedis; + + @Before + public void setUp(){ + jedisPoolConfig = new JedisPoolConfig.Builder() + .setHost(REDIS_HOST) + .setPort(REDIS_PORT).build(); + jedis = new Jedis(REDIS_HOST, REDIS_PORT); + env = StreamExecutionEnvironment.getExecutionEnvironment(); + } + + @Test + public void testRedisListDataType() throws Exception { + DataStreamSource> source = env.addSource(new TestSourceFunction()); + RedisSink> redisSink = new RedisSink<>(jedisPoolConfig, + new RedisDataMapper(RedisDataType.LIST)); + + source.addSink(redisSink); + env.execute("Test Redis List Data Type"); + + assertEquals(NUM_ELEMENTS, jedis.llen(REDIS_KEY)); + + jedis.del(REDIS_KEY); + } + + @Test + public void testRedisSetDataType() throws Exception { + DataStreamSource> source = env.addSource(new TestSourceFunction()); + RedisSink> redisSink = new RedisSink<>(jedisPoolConfig, + new RedisDataMapper(RedisDataType.SET)); + + source.addSink(redisSink); + env.execute("Test Redis Set Data Type"); + + assertEquals(NUM_ELEMENTS, jedis.scard(REDIS_KEY)); + + jedis.del(REDIS_KEY); + } + + @Test + public void testRedisHyperLogLogDataType() throws Exception { + DataStreamSource> source = env.addSource(new TestSourceFunction()); + RedisSink> redisSink = new RedisSink<>(jedisPoolConfig, + new RedisDataMapper(RedisDataType.HYPER_LOG_LOG)); + + source.addSink(redisSink); + env.execute("Test Redis Hyper Log Log Data Type"); + + assertEquals(NUM_ELEMENTS, Long.valueOf(jedis.pfcount(REDIS_KEY))); + + jedis.del(REDIS_KEY); + } + + @Test + public void testRedisSortedSetDataType() throws Exception { + DataStreamSo
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15338763#comment-15338763 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r67622738 --- Diff: flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkTest.java --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import redis.clients.jedis.Jedis; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class RedisSinkTest extends RedisTestBase { + + private JedisPoolConfig jedisPoolConfig; + private static final Long NUM_ELEMENTS = 20L; + private static final String REDIS_KEY = "TEST_KEY"; + private static final String REDIS_ADDITIONAL_KEY = "TEST_ADDITIONAL_KEY"; + + StreamExecutionEnvironment env; + + + private Jedis jedis; + + @Before + public void setUp(){ + jedisPoolConfig = new JedisPoolConfig.Builder() + .setHost(REDIS_HOST) + .setPort(REDIS_PORT).build(); + jedis = new Jedis(REDIS_HOST, REDIS_PORT); + env = StreamExecutionEnvironment.getExecutionEnvironment(); + } + + @Test + public void testRedisListDataType() throws Exception { + DataStreamSource> source = env.addSource(new TestSourceFunction()); + RedisSink> redisSink = new RedisSink<>(jedisPoolConfig, + new RedisDataMapper(RedisDataType.LIST)); + + source.addSink(redisSink); + env.execute("Test Redis List Data Type"); + + assertEquals(NUM_ELEMENTS, jedis.llen(REDIS_KEY)); + + jedis.del(REDIS_KEY); + } + + @Test + public void testRedisSetDataType() throws Exception { + DataStreamSource> source = env.addSource(new TestSourceFunction()); + RedisSink> redisSink = new RedisSink<>(jedisPoolConfig, + new RedisDataMapper(RedisDataType.SET)); + + source.addSink(redisSink); + env.execute("Test Redis Set Data Type"); + + assertEquals(NUM_ELEMENTS, jedis.scard(REDIS_KEY)); + + jedis.del(REDIS_KEY); + } + + @Test + public void testRedisHyperLogLogDataType() throws Exception { + DataStreamSource> source = env.addSource(new TestSourceFunction()); + RedisSink> redisSink = new RedisSink<>(jedisPoolConfig, + new RedisDataMapper(RedisDataType.HYPER_LOG_LOG)); + + source.addSink(redisSink); + env.execute("Test Redis Hyper Log Log Data Type"); + + assertEquals(NUM_ELEMENTS, Long.valueOf(jedis.pfcount(REDIS_KEY))); + + jedis.del(REDIS_KEY); + } + + @Test + public void testRedisSortedSetDataType() throws Exception { + DataStreamSo
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15338759#comment-15338759 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r67622681 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java --- @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.redis; + +import com.google.common.base.Preconditions; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * A sink that delivers data to a Redis channel using the Jedis client. + * When creating the sink using first constructor {@link #RedisSink(JedisPoolConfig, RedisMapper)} + * the sink will create connection using {@link redis.clients.jedis.JedisPool}. + * When using second constructor {@link #RedisSink(JedisSentinelConfig, RedisMapper)} the sink will create connection + * using {@link redis.clients.jedis.JedisSentinelPool} to Redis cluster. Use this if redis is + * configured using sentinels else use the third constructor {@link #RedisSink(JedisClusterConfig, RedisMapper)} + * which use {@link redis.clients.jedis.JedisCluster} to connect to Redis cluster. + * + * Example: + * + * + *{@code + *public static class RedisExampleDataMapper implements RedisMapper> { + *public RedisDataTypeDescription getDataTypeDescription() { + *return new RedisDataTypeDescription(dataType, REDIS_ADDITIONAL_KEY); + *} + *public String getKeyFromData(Tuple2 data) { + *return String.valueOf(data.f0); + *} + *public String getValueFromData(Tuple2 data) { + *return String.valueOf(data.f1); --- End diff -- return data.f0; > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)