[GitHub] flink pull request: [FLINK-3541] [Kafka Connector] Clean up workar...
GitHub user skyahead opened a pull request: https://github.com/apache/flink/pull/1846 [FLINK-3541] [Kafka Connector] Clean up workaround in FlinkKafkaConsu⦠â¦mer09 You can merge this pull request into a Git repository by running: $ git pull https://github.com/skyahead/flink FLINK-3541 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1846.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1846 commit 49c291a5bd06f1468bcee40f03a7bbea3bb1be29 Author: Tianji LiDate: 2016-04-01T04:35:39Z [FLINK-3541] [Kafka Connector] Clean up workaround in FlinkKafkaConsumer09 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3541) Clean up workaround in FlinkKafkaConsumer09
[ https://issues.apache.org/jira/browse/FLINK-3541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15221142#comment-15221142 ] ASF GitHub Bot commented on FLINK-3541: --- GitHub user skyahead opened a pull request: https://github.com/apache/flink/pull/1846 [FLINK-3541] [Kafka Connector] Clean up workaround in FlinkKafkaConsu… …mer09 You can merge this pull request into a Git repository by running: $ git pull https://github.com/skyahead/flink FLINK-3541 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1846.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1846 commit 49c291a5bd06f1468bcee40f03a7bbea3bb1be29 Author: Tianji LiDate: 2016-04-01T04:35:39Z [FLINK-3541] [Kafka Connector] Clean up workaround in FlinkKafkaConsumer09 > Clean up workaround in FlinkKafkaConsumer09 > > > Key: FLINK-3541 > URL: https://issues.apache.org/jira/browse/FLINK-3541 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Priority: Minor > > In the current {{FlinkKafkaConsumer09}} implementation, we repeatedly start a > new {{KafkaConsumer}} if the method {{KafkaConsumer.partitionsFor}} returns a > NPE. This is due to a bug with the Kafka version 0.9.0.0. See > https://issues.apache.org/jira/browse/KAFKA-2880. The code can be found in > the constructor of {{FlinkKafkaConsumer09.java:208}}. > However, the problem is marked as fixed for version 0.9.0.1, which we also > use for the flink-connector-kafka. Therefore, we should be able to get rid of > the workaround. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3635) Potential null deference in TwitterExample#SelectEnglishAndTokenizeFlatMap#flatMap
[ https://issues.apache.org/jira/browse/FLINK-3635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15221038#comment-15221038 ] ASF GitHub Bot commented on FLINK-3635: --- GitHub user skyahead opened a pull request: https://github.com/apache/flink/pull/1845 [FLINK-3635] Potential null deference in TwitterExample#SelectEnglish… …AndTokenizeFlatMap#flatMap You can merge this pull request into a Git repository by running: $ git pull https://github.com/skyahead/flink FLINK-3635 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1845.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1845 commit 48a8104361341da2eadd8c50c5761d47c64f4651 Author: Tianji LiDate: 2016-04-01T02:38:23Z [FLINK-3635] Potential null deference in TwitterExample#SelectEnglishAndTokenizeFlatMap#flatMap > Potential null deference in > TwitterExample#SelectEnglishAndTokenizeFlatMap#flatMap > -- > > Key: FLINK-3635 > URL: https://issues.apache.org/jira/browse/FLINK-3635 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class); > if (jsonNode.has("user") && > jsonNode.get("user").get("lang").asText().equals("en")) { > {code} > jsonNode.get("user").get("lang") may return null, leading to > NullPointerException. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3635] Potential null deference in Twitt...
GitHub user skyahead opened a pull request: https://github.com/apache/flink/pull/1845 [FLINK-3635] Potential null deference in TwitterExample#SelectEnglish⦠â¦AndTokenizeFlatMap#flatMap You can merge this pull request into a Git repository by running: $ git pull https://github.com/skyahead/flink FLINK-3635 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1845.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1845 commit 48a8104361341da2eadd8c50c5761d47c64f4651 Author: Tianji LiDate: 2016-04-01T02:38:23Z [FLINK-3635] Potential null deference in TwitterExample#SelectEnglishAndTokenizeFlatMap#flatMap --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3523) Storm SpoutSplitExample fails with a ClassCastException
[ https://issues.apache.org/jira/browse/FLINK-3523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220858#comment-15220858 ] ASF GitHub Bot commented on FLINK-3523: --- GitHub user mjsax opened a pull request: https://github.com/apache/flink/pull/1844 [FLINK-3523] [Storm-Compatibility] Added SplitStreamMapper to program… … to get rid of SplitStreamType wrapper You can merge this pull request into a Git repository by running: $ git pull https://github.com/mjsax/flink flink-3523-split-example Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1844.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1844 commit 8e518b548909b4604a427c3aca5fb2d33aabbb8f Author: mjsaxDate: 2016-03-31T23:23:30Z [FLINK-3523] [Storm-Compatibility] Added SplitStreamMapper to program to get rid of SplitStreamType wrapper > Storm SpoutSplitExample fails with a ClassCastException > --- > > Key: FLINK-3523 > URL: https://issues.apache.org/jira/browse/FLINK-3523 > Project: Flink > Issue Type: Bug > Components: Storm Compatibility >Reporter: Stephan Ewen >Assignee: Matthias J. Sax > > {code} > Caused by: java.lang.ClassCastException: > org.apache.flink.storm.util.SplitStreamType cannot be cast to > java.lang.Integer > at > org.apache.flink.storm.wrappers.StormTuple.getInteger(StormTuple.java:152) > at > org.apache.flink.storm.split.operators.VerifyAndEnrichBolt.execute(VerifyAndEnrichBolt.java:52) > at > org.apache.flink.storm.wrappers.BoltWrapper.processElement(BoltWrapper.java:313) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:168) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3523] [Storm-Compatibility] Added Split...
GitHub user mjsax opened a pull request: https://github.com/apache/flink/pull/1844 [FLINK-3523] [Storm-Compatibility] Added SplitStreamMapper to program⦠⦠to get rid of SplitStreamType wrapper You can merge this pull request into a Git repository by running: $ git pull https://github.com/mjsax/flink flink-3523-split-example Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1844.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1844 commit 8e518b548909b4604a427c3aca5fb2d33aabbb8f Author: mjsaxDate: 2016-03-31T23:23:30Z [FLINK-3523] [Storm-Compatibility] Added SplitStreamMapper to program to get rid of SplitStreamType wrapper --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-3523) Storm SpoutSplitExample fails with a ClassCastException
[ https://issues.apache.org/jira/browse/FLINK-3523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned FLINK-3523: -- Assignee: Matthias J. Sax > Storm SpoutSplitExample fails with a ClassCastException > --- > > Key: FLINK-3523 > URL: https://issues.apache.org/jira/browse/FLINK-3523 > Project: Flink > Issue Type: Bug > Components: Storm Compatibility >Reporter: Stephan Ewen >Assignee: Matthias J. Sax > > {code} > Caused by: java.lang.ClassCastException: > org.apache.flink.storm.util.SplitStreamType cannot be cast to > java.lang.Integer > at > org.apache.flink.storm.wrappers.StormTuple.getInteger(StormTuple.java:152) > at > org.apache.flink.storm.split.operators.VerifyAndEnrichBolt.execute(VerifyAndEnrichBolt.java:52) > at > org.apache.flink.storm.wrappers.BoltWrapper.processElement(BoltWrapper.java:313) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:168) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-2338) Shut down "Storm Topologies" cleanly
[ https://issues.apache.org/jira/browse/FLINK-2338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax closed FLINK-2338. -- Resolution: Fixed > Shut down "Storm Topologies" cleanly > > > Key: FLINK-2338 > URL: https://issues.apache.org/jira/browse/FLINK-2338 > Project: Flink > Issue Type: Improvement > Components: Storm Compatibility >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > Currently, it is not possible to stop a Flink streaming program in a clean > way. Thus, emulating Storm's "kill" command is done the "hard way" resulting > in the following exception shown in the log: > org.apache.flink.runtime.client.JobExecutionException: Communication with > JobManager failed: Lost connection to JobManager akka://flink/user/jobmanager > at > org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:169) > at > org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:205) > at > org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:195) > at > org.apache.flink.streaming.util.TestStreamEnvironment$1.run(TestStreamEnvironment.java:116) > Caused by: java.lang.Exception: Lost connection to JobManager > akka://flink/user/jobmanager > at > org.apache.flink.runtime.client.JobClientActor.onReceive(JobClientActor.java:131) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at > akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46) > at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369) > at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501) > at akka.actor.ActorCell.invoke(ActorCell.scala:486) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > The exception is expected currently. However, a clean "kill" is preferable. > This can done after the new STOP signal is available > (https://issues.apache.org/jira/browse/FLINK-2111). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220550#comment-15220550 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/1813#issuecomment-204094107 One additional overall comment: please use consistent JavaDoc formatting with respect to whitespaced and emtpy lines (sometime you have empty lines before parameter list; somtimes not) and markup (eg, `` vs `` -- you use bot). Try to be consistent; I did not comment on every single inconsistency... Also, write all JavaDoc as complete sentences and add full stops at the end. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/1813#issuecomment-204094107 One additional overall comment: please use consistent JavaDoc formatting with respect to whitespaced and emtpy lines (sometime you have empty lines before parameter list; somtimes not) and markup (eg, `` vs `` -- you use bot). Try to be consistent; I did not comment on every single inconsistency... Also, write all JavaDoc as complete sentences and add full stops at the end. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220544#comment-15220544 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58114600 --- Diff: flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.mapper; + +import org.apache.flink.streaming.connectors.redis.RedisSinkTest; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class RedisDataTypeDescriptionTest { --- End diff -- add `extends TestLogger` > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58114600 --- Diff: flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.mapper; + +import org.apache.flink.streaming.connectors.redis.RedisSinkTest; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class RedisDataTypeDescriptionTest { --- End diff -- add `extends TestLogger` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58114545 --- Diff: flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.junit.Test; + +import java.util.HashSet; +import java.util.Set; + +public class JedisSentinelConfigTest { --- End diff -- add `extends TestLogger` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220542#comment-15220542 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58114513 --- Diff: flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java --- @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.junit.Test; + +public class JedisPoolConfigTest { --- End diff -- add `extends TestLogger` > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220541#comment-15220541 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58114500 --- Diff: flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java --- @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.junit.Test; + +public class JedisClusterConfigTest { --- End diff -- add `extends TestLogger` > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220543#comment-15220543 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58114545 --- Diff: flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.junit.Test; + +import java.util.HashSet; +import java.util.Set; + +public class JedisSentinelConfigTest { --- End diff -- add `extends TestLogger` > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58114513 --- Diff: flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java --- @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.junit.Test; + +public class JedisPoolConfigTest { --- End diff -- add `extends TestLogger` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58114500 --- Diff: flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java --- @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import org.junit.Test; + +public class JedisClusterConfigTest { --- End diff -- add `extends TestLogger` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220537#comment-15220537 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58113913 --- Diff: flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java --- @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisSentinelPool; +import redis.embedded.RedisCluster; +import redis.embedded.util.JedisUtil; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.AfterClass; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.apache.flink.util.NetUtils.getAvailablePort; + +public class RedisSentinelClusterTest { + + private static RedisCluster cluster; + private static final String REDIS_MASTER = "master"; + private static final List sentinels = Arrays.asList(getAvailablePort(), getAvailablePort()); + private static final List group1 = Arrays.asList(getAvailablePort(), getAvailablePort()); + + private JedisSentinelPool jedisSentinelPool; + private JedisSentinelConfig jedisSentinelConfig; + + @BeforeClass + public static void setUpCluster(){ + cluster = RedisCluster.builder().sentinelPorts(sentinels).quorumSize(1) + .serverPorts(group1).replicationGroup(REDIS_MASTER, 1) + .build(); + cluster.start(); + } + + @Before + public void setUp() { + Set hosts = JedisUtil.sentinelHosts(cluster); + jedisSentinelConfig = new JedisSentinelConfig.Builder().setMasterName(REDIS_MASTER) + .setSentinels(hosts).build(); + jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(), + jedisSentinelConfig.getSentinels()); + } + + @Test + public void testRedisSentinelOperation() { + RedisCommandsContainer redisContainer = RedisCommandsContainerBuilder.build(jedisSentinelConfig); + Jedis jedis = null; + try{ + jedis = jedisSentinelPool.getResource(); + redisContainer.set("testKey", "testValue"); --- End diff -- use variables for testKey and testValue to avoid typos (in case test is extended later on) > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58113913 --- Diff: flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java --- @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisSentinelPool; +import redis.embedded.RedisCluster; +import redis.embedded.util.JedisUtil; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.AfterClass; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.apache.flink.util.NetUtils.getAvailablePort; + +public class RedisSentinelClusterTest { + + private static RedisCluster cluster; + private static final String REDIS_MASTER = "master"; + private static final List sentinels = Arrays.asList(getAvailablePort(), getAvailablePort()); + private static final List group1 = Arrays.asList(getAvailablePort(), getAvailablePort()); + + private JedisSentinelPool jedisSentinelPool; + private JedisSentinelConfig jedisSentinelConfig; + + @BeforeClass + public static void setUpCluster(){ + cluster = RedisCluster.builder().sentinelPorts(sentinels).quorumSize(1) + .serverPorts(group1).replicationGroup(REDIS_MASTER, 1) + .build(); + cluster.start(); + } + + @Before + public void setUp() { + Set hosts = JedisUtil.sentinelHosts(cluster); + jedisSentinelConfig = new JedisSentinelConfig.Builder().setMasterName(REDIS_MASTER) + .setSentinels(hosts).build(); + jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(), + jedisSentinelConfig.getSentinels()); + } + + @Test + public void testRedisSentinelOperation() { + RedisCommandsContainer redisContainer = RedisCommandsContainerBuilder.build(jedisSentinelConfig); + Jedis jedis = null; + try{ + jedis = jedisSentinelPool.getResource(); + redisContainer.set("testKey", "testValue"); --- End diff -- use variables for testKey and testValue to avoid typos (in case test is extended later on) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220535#comment-15220535 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58113574 --- Diff: flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java --- @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisSentinelPool; +import redis.embedded.RedisCluster; +import redis.embedded.util.JedisUtil; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.AfterClass; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.apache.flink.util.NetUtils.getAvailablePort; + +public class RedisSentinelClusterTest { --- End diff -- add `extends TestLogger` > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58113574 --- Diff: flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java --- @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisSentinelPool; +import redis.embedded.RedisCluster; +import redis.embedded.util.JedisUtil; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.AfterClass; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.apache.flink.util.NetUtils.getAvailablePort; + +public class RedisSentinelClusterTest { --- End diff -- add `extends TestLogger` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220532#comment-15220532 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58113357 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescription.java --- @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.mapper; + +import org.apache.flink.api.common.functions.Function; + +import java.io.Serializable; + +public class RedisDataTypeDescription implements Function, Serializable { + + private static final long serialVersionUID = 1L; + + private RedisDataType dataType; + private String additionalKey; + + /** +* Use this constructor when data type is HASH or SORTED_SET +* @param dataType the redis data type {@link RedisDataType} +* @param additionalKey additional key for Hash and Sorted set data type + */ + public RedisDataTypeDescription(RedisDataType dataType, String additionalKey) { + this.dataType = dataType; + this.additionalKey = additionalKey; + + if (dataType == RedisDataType.HASH || dataType == RedisDataType.SORTED_SET) { + if (additionalKey == null) { + throw new IllegalArgumentException("Hash and Sorted Set should have additional key"); + } + } + } + + /** +* Use this constructor when data type is not HASH or SORTED_SET +* @param dataType the redis data type {@link RedisDataType} + */ + public RedisDataTypeDescription(RedisDataType dataType) { + this(dataType, null); + } + + public RedisDataType getDataType() { --- End diff -- JavaDoc > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58113357 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescription.java --- @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.mapper; + +import org.apache.flink.api.common.functions.Function; + +import java.io.Serializable; + +public class RedisDataTypeDescription implements Function, Serializable { + + private static final long serialVersionUID = 1L; + + private RedisDataType dataType; + private String additionalKey; + + /** +* Use this constructor when data type is HASH or SORTED_SET +* @param dataType the redis data type {@link RedisDataType} +* @param additionalKey additional key for Hash and Sorted set data type + */ + public RedisDataTypeDescription(RedisDataType dataType, String additionalKey) { + this.dataType = dataType; + this.additionalKey = additionalKey; + + if (dataType == RedisDataType.HASH || dataType == RedisDataType.SORTED_SET) { + if (additionalKey == null) { + throw new IllegalArgumentException("Hash and Sorted Set should have additional key"); + } + } + } + + /** +* Use this constructor when data type is not HASH or SORTED_SET +* @param dataType the redis data type {@link RedisDataType} + */ + public RedisDataTypeDescription(RedisDataType dataType) { + this(dataType, null); + } + + public RedisDataType getDataType() { --- End diff -- JavaDoc --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220533#comment-15220533 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58113371 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescription.java --- @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.mapper; + +import org.apache.flink.api.common.functions.Function; + +import java.io.Serializable; + +public class RedisDataTypeDescription implements Function, Serializable { + + private static final long serialVersionUID = 1L; + + private RedisDataType dataType; + private String additionalKey; + + /** +* Use this constructor when data type is HASH or SORTED_SET +* @param dataType the redis data type {@link RedisDataType} +* @param additionalKey additional key for Hash and Sorted set data type + */ + public RedisDataTypeDescription(RedisDataType dataType, String additionalKey) { + this.dataType = dataType; + this.additionalKey = additionalKey; + + if (dataType == RedisDataType.HASH || dataType == RedisDataType.SORTED_SET) { + if (additionalKey == null) { + throw new IllegalArgumentException("Hash and Sorted Set should have additional key"); + } + } + } + + /** +* Use this constructor when data type is not HASH or SORTED_SET +* @param dataType the redis data type {@link RedisDataType} + */ + public RedisDataTypeDescription(RedisDataType dataType) { + this(dataType, null); + } + + public RedisDataType getDataType() { + return dataType; + } + + public String getAdditionalKey() { --- End diff -- JavaDoc > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220530#comment-15220530 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58113261 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescription.java --- @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.mapper; + +import org.apache.flink.api.common.functions.Function; + +import java.io.Serializable; + +public class RedisDataTypeDescription implements Function, Serializable { --- End diff -- Why `Function`? > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220528#comment-15220528 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58113197 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java --- @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.mapper; + +public enum RedisDataType { --- End diff -- JavaDoc > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220527#comment-15220527 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58113161 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java --- @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisSentinelPool; + +import java.io.Closeable; +import java.io.IOException; + +public class RedisContainer implements RedisCommandsContainer, Closeable{ + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisContainer.class); + + private JedisPool jedisPool; + private JedisSentinelPool jedisSentinelPool; + + + /** +* Constructor +* @param jedisPool JedisPool which actually manages Jedis instances +*/ + public RedisContainer(JedisPool jedisPool) { + this.jedisPool = jedisPool; + } + + public RedisContainer(JedisSentinelPool sentinelPool){ + this.jedisSentinelPool = sentinelPool; + } + + + + @Override + public void close() throws IOException { + this.jedisPool.close(); + } + + /** +* Sets field in the hash stored at key to value. +* If key does not exist, a new key holding a hash is created. +* If field already exists in the hash, it is overwritten. +* +* @param hashName Hash name +* @param key Hash field name +* @param value Hash value +*/ + @Override + public void hset(String hashName, String key, String value) { + Jedis jedis = null; + try{ + jedis = getInstance(); + jedis.hset(hashName, key, value); + }catch (Exception e){ + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command HSET to key {} and field {} error message {}", + key, key, e.getMessage()); + } + }finally { + returnInstance(jedis); + } + } + + /** +* Insert all the specified values at the tail of the list stored at key. +* If key does not exist, it is created as empty list before performing the push operation. +* +* @param listName Name of the List +* @param valueValue to be added +*/ + @Override + public void rpush(String listName, String value) { + Jedis jedis = null; + try{ + jedis = getInstance(); + jedis.rpush(listName, value); + }catch (Exception e){ + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command RPUSH to list {} error message {}", + listName, e.getMessage()); + } + }finally { + returnInstance(jedis); + } + } + + /** +* Add the specified members to the set stored at key. +* Specified members that are already a member of this set are ignored. +* If key does not exist, a new set is created before adding the specified members. +* +* @param setName Name of the Set +* @param value Value to be added +*/ + @Override +
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220529#comment-15220529 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58113229 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescription.java --- @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.mapper; + +import org.apache.flink.api.common.functions.Function; + +import java.io.Serializable; + +public class RedisDataTypeDescription implements Function, Serializable { --- End diff -- JavaDoc > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58113261 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescription.java --- @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.mapper; + +import org.apache.flink.api.common.functions.Function; + +import java.io.Serializable; + +public class RedisDataTypeDescription implements Function, Serializable { --- End diff -- Why `Function`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58113229 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescription.java --- @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.mapper; + +import org.apache.flink.api.common.functions.Function; + +import java.io.Serializable; + +public class RedisDataTypeDescription implements Function, Serializable { --- End diff -- JavaDoc --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58113197 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java --- @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.mapper; + +public enum RedisDataType { --- End diff -- JavaDoc --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220522#comment-15220522 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58112951 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java --- @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisSentinelPool; + +import java.io.Closeable; +import java.io.IOException; + +public class RedisContainer implements RedisCommandsContainer, Closeable{ --- End diff -- JavaDoc > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58113161 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java --- @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisSentinelPool; + +import java.io.Closeable; +import java.io.IOException; + +public class RedisContainer implements RedisCommandsContainer, Closeable{ + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisContainer.class); + + private JedisPool jedisPool; + private JedisSentinelPool jedisSentinelPool; + + + /** +* Constructor +* @param jedisPool JedisPool which actually manages Jedis instances +*/ + public RedisContainer(JedisPool jedisPool) { + this.jedisPool = jedisPool; + } + + public RedisContainer(JedisSentinelPool sentinelPool){ + this.jedisSentinelPool = sentinelPool; + } + + + + @Override + public void close() throws IOException { + this.jedisPool.close(); + } + + /** +* Sets field in the hash stored at key to value. +* If key does not exist, a new key holding a hash is created. +* If field already exists in the hash, it is overwritten. +* +* @param hashName Hash name +* @param key Hash field name +* @param value Hash value +*/ + @Override + public void hset(String hashName, String key, String value) { + Jedis jedis = null; + try{ + jedis = getInstance(); + jedis.hset(hashName, key, value); + }catch (Exception e){ + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command HSET to key {} and field {} error message {}", + key, key, e.getMessage()); + } + }finally { + returnInstance(jedis); + } + } + + /** +* Insert all the specified values at the tail of the list stored at key. +* If key does not exist, it is created as empty list before performing the push operation. +* +* @param listName Name of the List +* @param valueValue to be added +*/ + @Override + public void rpush(String listName, String value) { + Jedis jedis = null; + try{ + jedis = getInstance(); + jedis.rpush(listName, value); + }catch (Exception e){ + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command RPUSH to list {} error message {}", + listName, e.getMessage()); + } + }finally { + returnInstance(jedis); + } + } + + /** +* Add the specified members to the set stored at key. +* Specified members that are already a member of this set are ignored. +* If key does not exist, a new set is created before adding the specified members. +* +* @param setName Name of the Set +* @param value Value to be added +*/ + @Override + public void sadd(String setName, String value) { + Jedis jedis = null; + try{ + jedis = getInstance(); + jedis.sadd(setName, value); + }catch (Exception e){
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58112951 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java --- @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisSentinelPool; + +import java.io.Closeable; +import java.io.IOException; + +public class RedisContainer implements RedisCommandsContainer, Closeable{ --- End diff -- JavaDoc --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58112927 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java --- @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisSentinelPool; + +public class RedisCommandsContainerBuilder { + + /** +* Builds container for single Redis environment. +* @param jedisPoolConfig configuration for JedisPool +* @return container for single Redis environment +*/ + public static RedisCommandsContainer build(JedisPoolConfig jedisPoolConfig) { + GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); + genericObjectPoolConfig.setMaxIdle(jedisPoolConfig.getMaxIdle()); + genericObjectPoolConfig.setMaxTotal(jedisPoolConfig.getMaxTotal()); + genericObjectPoolConfig.setMinIdle(jedisPoolConfig.getMinIdle()); + + JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(), + jedisPoolConfig.getPort(), jedisPoolConfig.getTimeout(), jedisPoolConfig.getPassword(), + jedisPoolConfig.getDatabase()); + return new RedisContainer(jedisPool); + } + + /** +* Builds container for Redis Cluster environment. +* @param config configuration for JedisCluster +* @return container for Redis Cluster environment +*/ + public static RedisCommandsContainer build(JedisClusterConfig config) { + GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); + genericObjectPoolConfig.setMaxIdle(config.getMaxIdle()); + genericObjectPoolConfig.setMaxTotal(config.getMaxTotal()); + genericObjectPoolConfig.setMinIdle(config.getMinIdle()); + + JedisCluster jedisCluster = new JedisCluster(config.getNodes(), config.getTimeout(), + config.getMaxRedirections(), genericObjectPoolConfig); + return new RedisClusterContainer(jedisCluster); + } + + /** +* Builds container for Redis Sentinel environment. +* @param config configuration for JedisSentinel +* @return container for Redis sentinel environment + */ + public static RedisCommandsContainer build(JedisSentinelConfig config) { --- End diff -- Check for `null` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220521#comment-15220521 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58112927 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java --- @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisSentinelPool; + +public class RedisCommandsContainerBuilder { + + /** +* Builds container for single Redis environment. +* @param jedisPoolConfig configuration for JedisPool +* @return container for single Redis environment +*/ + public static RedisCommandsContainer build(JedisPoolConfig jedisPoolConfig) { + GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); + genericObjectPoolConfig.setMaxIdle(jedisPoolConfig.getMaxIdle()); + genericObjectPoolConfig.setMaxTotal(jedisPoolConfig.getMaxTotal()); + genericObjectPoolConfig.setMinIdle(jedisPoolConfig.getMinIdle()); + + JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(), + jedisPoolConfig.getPort(), jedisPoolConfig.getTimeout(), jedisPoolConfig.getPassword(), + jedisPoolConfig.getDatabase()); + return new RedisContainer(jedisPool); + } + + /** +* Builds container for Redis Cluster environment. +* @param config configuration for JedisCluster +* @return container for Redis Cluster environment +*/ + public static RedisCommandsContainer build(JedisClusterConfig config) { + GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); + genericObjectPoolConfig.setMaxIdle(config.getMaxIdle()); + genericObjectPoolConfig.setMaxTotal(config.getMaxTotal()); + genericObjectPoolConfig.setMinIdle(config.getMinIdle()); + + JedisCluster jedisCluster = new JedisCluster(config.getNodes(), config.getTimeout(), + config.getMaxRedirections(), genericObjectPoolConfig); + return new RedisClusterContainer(jedisCluster); + } + + /** +* Builds container for Redis Sentinel environment. +* @param config configuration for JedisSentinel +* @return container for Redis sentinel environment + */ + public static RedisCommandsContainer build(JedisSentinelConfig config) { --- End diff -- Check for `null` > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220518#comment-15220518 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58112760 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java --- @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisSentinelPool; + +public class RedisCommandsContainerBuilder { + + /** +* Builds container for single Redis environment. +* @param jedisPoolConfig configuration for JedisPool +* @return container for single Redis environment +*/ + public static RedisCommandsContainer build(JedisPoolConfig jedisPoolConfig) { + GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); --- End diff -- check `jedisPoolConfig` for `null` > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220520#comment-15220520 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58112842 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java --- @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisSentinelPool; + +public class RedisCommandsContainerBuilder { + + /** +* Builds container for single Redis environment. +* @param jedisPoolConfig configuration for JedisPool +* @return container for single Redis environment +*/ + public static RedisCommandsContainer build(JedisPoolConfig jedisPoolConfig) { + GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); + genericObjectPoolConfig.setMaxIdle(jedisPoolConfig.getMaxIdle()); + genericObjectPoolConfig.setMaxTotal(jedisPoolConfig.getMaxTotal()); + genericObjectPoolConfig.setMinIdle(jedisPoolConfig.getMinIdle()); + + JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(), + jedisPoolConfig.getPort(), jedisPoolConfig.getTimeout(), jedisPoolConfig.getPassword(), + jedisPoolConfig.getDatabase()); + return new RedisContainer(jedisPool); + } + + /** +* Builds container for Redis Cluster environment. +* @param config configuration for JedisCluster +* @return container for Redis Cluster environment +*/ + public static RedisCommandsContainer build(JedisClusterConfig config) { --- End diff -- check for `null` > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58112842 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java --- @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisSentinelPool; + +public class RedisCommandsContainerBuilder { + + /** +* Builds container for single Redis environment. +* @param jedisPoolConfig configuration for JedisPool +* @return container for single Redis environment +*/ + public static RedisCommandsContainer build(JedisPoolConfig jedisPoolConfig) { + GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); + genericObjectPoolConfig.setMaxIdle(jedisPoolConfig.getMaxIdle()); + genericObjectPoolConfig.setMaxTotal(jedisPoolConfig.getMaxTotal()); + genericObjectPoolConfig.setMinIdle(jedisPoolConfig.getMinIdle()); + + JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(), + jedisPoolConfig.getPort(), jedisPoolConfig.getTimeout(), jedisPoolConfig.getPassword(), + jedisPoolConfig.getDatabase()); + return new RedisContainer(jedisPool); + } + + /** +* Builds container for Redis Cluster environment. +* @param config configuration for JedisCluster +* @return container for Redis Cluster environment +*/ + public static RedisCommandsContainer build(JedisClusterConfig config) { --- End diff -- check for `null` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58112760 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java --- @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisSentinelPool; + +public class RedisCommandsContainerBuilder { + + /** +* Builds container for single Redis environment. +* @param jedisPoolConfig configuration for JedisPool +* @return container for single Redis environment +*/ + public static RedisCommandsContainer build(JedisPoolConfig jedisPoolConfig) { + GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); --- End diff -- check `jedisPoolConfig` for `null` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220517#comment-15220517 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58112656 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java --- @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisSentinelPool; + +public class RedisCommandsContainerBuilder { --- End diff -- JavaDoc > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220515#comment-15220515 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58112601 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import java.io.IOException; +import java.io.Serializable; + +public interface RedisCommandsContainer extends Serializable { --- End diff -- JavaDoc missing > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58112601 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import java.io.IOException; +import java.io.Serializable; + +public interface RedisCommandsContainer extends Serializable { --- End diff -- JavaDoc missing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58112656 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java --- @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisSentinelPool; + +public class RedisCommandsContainerBuilder { --- End diff -- JavaDoc --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58112455 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java --- @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.JedisCluster; + +import java.io.Closeable; +import java.io.IOException; + +public class RedisClusterContainer implements RedisCommandsContainer, Closeable { --- End diff -- JavaDoc missing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220513#comment-15220513 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58112455 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java --- @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.container; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.JedisCluster; + +import java.io.Closeable; +import java.io.IOException; + +public class RedisClusterContainer implements RedisCommandsContainer, Closeable { --- End diff -- JavaDoc missing > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58112296 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfig.java --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import com.google.common.base.Preconditions; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.api.common.functions.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Protocol; + +import java.io.Serializable; +import java.util.Set; + +/** + * Configuration for JedisSentinelPool. + */ +public class JedisSentinelConfig implements Function, Serializable { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(JedisSentinelConfig.class); + + private String masterName; + private Set sentinels; + private int connectionTimeout; + private int soTimeout; + private String password; + private int database; + private int maxTotal; + private int maxIdle; + private int minIdle; + + /** +* --- End diff -- Extend JavaDoc with at least one sentence. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220512#comment-15220512 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58112296 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfig.java --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import com.google.common.base.Preconditions; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.api.common.functions.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Protocol; + +import java.io.Serializable; +import java.util.Set; + +/** + * Configuration for JedisSentinelPool. + */ +public class JedisSentinelConfig implements Function, Serializable { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(JedisSentinelConfig.class); + + private String masterName; + private Set sentinels; + private int connectionTimeout; + private int soTimeout; + private String password; + private int database; + private int maxTotal; + private int maxIdle; + private int minIdle; + + /** +* --- End diff -- Extend JavaDoc with at least one sentence. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220510#comment-15220510 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58112229 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfig.java --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import com.google.common.base.Preconditions; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.api.common.functions.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Protocol; + +import java.io.Serializable; +import java.util.Set; + +/** + * Configuration for JedisSentinelPool. + */ +public class JedisSentinelConfig implements Function, Serializable { --- End diff -- `Function`? > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220509#comment-15220509 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58112099 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfig.java --- @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import com.google.common.base.Preconditions; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.api.common.functions.Function; +import redis.clients.jedis.Protocol; + +import java.io.Serializable; + +/** + * Configuration for JedisPool. + */ +public class JedisPoolConfig implements Function, Serializable { + + private static final long serialVersionUID = 1L; + + private String host; + private int port; + private int timeout; + private int database; + private String password; + private int maxTotal; + private int maxIdle; + private int minIdle; + + /** +* Constructor --- End diff -- yup... can be omitted. > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58112229 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfig.java --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import com.google.common.base.Preconditions; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.api.common.functions.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Protocol; + +import java.io.Serializable; +import java.util.Set; + +/** + * Configuration for JedisSentinelPool. + */ +public class JedisSentinelConfig implements Function, Serializable { --- End diff -- `Function`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58112099 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfig.java --- @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import com.google.common.base.Preconditions; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.api.common.functions.Function; +import redis.clients.jedis.Protocol; + +import java.io.Serializable; + +/** + * Configuration for JedisPool. + */ +public class JedisPoolConfig implements Function, Serializable { + + private static final long serialVersionUID = 1L; + + private String host; + private int port; + private int timeout; + private int database; + private String password; + private int maxTotal; + private int maxIdle; + private int minIdle; + + /** +* Constructor --- End diff -- yup... can be omitted. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220506#comment-15220506 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58111937 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfig.java --- @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import com.google.common.base.Preconditions; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.api.common.functions.Function; +import redis.clients.jedis.Protocol; + +import java.io.Serializable; + +/** + * Configuration for JedisPool. + */ +public class JedisPoolConfig implements Function, Serializable { --- End diff -- Why `Function`? > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58112031 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfig.java --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import com.google.common.base.Preconditions; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.api.common.functions.Function; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.Protocol; + +import java.io.Serializable; +import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.Set; + +/** + * Configuration for JedisCluster. + */ +public class JedisClusterConfig implements Function, Serializable { + private static final long serialVersionUID = 1L; + + + private Set nodes; + private int timeout; + private int maxRedirections; + private int maxTotal; + private int maxIdle; + private int minIdle; + + /** +* +* Constructor --- End diff -- Obviously... ;) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220507#comment-15220507 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58112031 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfig.java --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import com.google.common.base.Preconditions; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.api.common.functions.Function; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.Protocol; + +import java.io.Serializable; +import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.Set; + +/** + * Configuration for JedisCluster. + */ +public class JedisClusterConfig implements Function, Serializable { + private static final long serialVersionUID = 1L; + + + private Set nodes; + private int timeout; + private int maxRedirections; + private int maxTotal; + private int maxIdle; + private int minIdle; + + /** +* +* Constructor --- End diff -- Obviously... ;) > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58111937 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfig.java --- @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import com.google.common.base.Preconditions; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.api.common.functions.Function; +import redis.clients.jedis.Protocol; + +import java.io.Serializable; + +/** + * Configuration for JedisPool. + */ +public class JedisPoolConfig implements Function, Serializable { --- End diff -- Why `Function`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58111680 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfig.java --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import com.google.common.base.Preconditions; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.api.common.functions.Function; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.Protocol; + +import java.io.Serializable; +import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.Set; + +/** + * Configuration for JedisCluster. + */ +public class JedisClusterConfig implements Function, Serializable { --- End diff -- Why does it implement `Function`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220502#comment-15220502 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58111680 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfig.java --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis.common.config; + +import com.google.common.base.Preconditions; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.api.common.functions.Function; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.Protocol; + +import java.io.Serializable; +import java.net.InetSocketAddress; +import java.util.HashSet; +import java.util.Set; + +/** + * Configuration for JedisCluster. + */ +public class JedisClusterConfig implements Function, Serializable { --- End diff -- Why does it implement `Function`? > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220501#comment-15220501 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58111485 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; + +import redis.clients.jedis.exceptions.JedisException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * A sink that delivers data to a Redis channel using the Jedis client. + * + * + * When creating the sink using first constructor {@link #RedisSink(JedisPoolConfig, RedisMapper)} + * the sink will create connection using {@link redis.clients.jedis.JedisPool}. + * + * When using second constructor {@link #RedisSink(JedisSentinelConfig, RedisMapper)} the sink will create connection + * using {@link redis.clients.jedis.JedisSentinelPool} to redis cluster. Use this if redis is + * configured using sentinels else use the third constructor {@link #RedisSink(JedisClusterConfig, RedisMapper)} + * which use {@link redis.clients.jedis.JedisCluster} to connect to redis cluster. + * + * + * Example: + * + * {@code + * + * public static class RedisExampleDataMapper implements RedisMapper>{ + * @Override + * public RedisDataTypeDescription getDataTypeDescription() { + * return new RedisDataTypeDescription(dataType, REDIS_ADDITIONAL_KEY); + * } + * @Override + * public String getKeyFromData(Tuple2 data) { + * return String.valueOf(data.f0); + * } + * + * @Override + * public String getValueFromData(Tuple2 data) { + * return String.valueOf(data.f1); + * } + * } + * JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder() + * .setHost(REDIS_HOST) + * .setPort(REDIS_PORT).build(); + * new RedisSink(jedisPoolConfig, new RedisExampleDataMapper()); + * } + * + * @param Type of the elements emitted by this sink + */ +public class RedisSink extends RichSinkFunction{ + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisSink.class); + + private String additionalKey; + private RedisMapper redisSinkMapper; + private RedisDataType redisDataType; + + private JedisPoolConfig jedisPoolConfig; + private JedisSentinelConfig jedisSentinelConfig; + private JedisClusterConfig jedisClusterConfig; + + private
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58111485 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; + +import redis.clients.jedis.exceptions.JedisException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * A sink that delivers data to a Redis channel using the Jedis client. + * + * + * When creating the sink using first constructor {@link #RedisSink(JedisPoolConfig, RedisMapper)} + * the sink will create connection using {@link redis.clients.jedis.JedisPool}. + * + * When using second constructor {@link #RedisSink(JedisSentinelConfig, RedisMapper)} the sink will create connection + * using {@link redis.clients.jedis.JedisSentinelPool} to redis cluster. Use this if redis is + * configured using sentinels else use the third constructor {@link #RedisSink(JedisClusterConfig, RedisMapper)} + * which use {@link redis.clients.jedis.JedisCluster} to connect to redis cluster. + * + * + * Example: + * + * {@code + * + * public static class RedisExampleDataMapper implements RedisMapper>{ + * @Override + * public RedisDataTypeDescription getDataTypeDescription() { + * return new RedisDataTypeDescription(dataType, REDIS_ADDITIONAL_KEY); + * } + * @Override + * public String getKeyFromData(Tuple2 data) { + * return String.valueOf(data.f0); + * } + * + * @Override + * public String getValueFromData(Tuple2 data) { + * return String.valueOf(data.f1); + * } + * } + * JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder() + * .setHost(REDIS_HOST) + * .setPort(REDIS_PORT).build(); + * new RedisSink(jedisPoolConfig, new RedisExampleDataMapper()); + * } + * + * @param Type of the elements emitted by this sink + */ +public class RedisSink extends RichSinkFunction{ + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisSink.class); + + private String additionalKey; + private RedisMapper redisSinkMapper; + private RedisDataType redisDataType; + + private JedisPoolConfig jedisPoolConfig; + private JedisSentinelConfig jedisSentinelConfig; + private JedisClusterConfig jedisClusterConfig; + + private RedisCommandsContainer redisCommandsContainer; + + /** +* Creates a new RedisSink that connects to the Redis Server +* +* @param jedisPoolConfig The configuration of {@link JedisPoolConfig} +* @param
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220500#comment-15220500 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58111329 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; + +import redis.clients.jedis.exceptions.JedisException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * A sink that delivers data to a Redis channel using the Jedis client. + * + * + * When creating the sink using first constructor {@link #RedisSink(JedisPoolConfig, RedisMapper)} + * the sink will create connection using {@link redis.clients.jedis.JedisPool}. + * + * When using second constructor {@link #RedisSink(JedisSentinelConfig, RedisMapper)} the sink will create connection + * using {@link redis.clients.jedis.JedisSentinelPool} to redis cluster. Use this if redis is + * configured using sentinels else use the third constructor {@link #RedisSink(JedisClusterConfig, RedisMapper)} + * which use {@link redis.clients.jedis.JedisCluster} to connect to redis cluster. + * + * + * Example: + * + * {@code + * + * public static class RedisExampleDataMapper implements RedisMapper>{ + * @Override + * public RedisDataTypeDescription getDataTypeDescription() { + * return new RedisDataTypeDescription(dataType, REDIS_ADDITIONAL_KEY); + * } + * @Override + * public String getKeyFromData(Tuple2 data) { + * return String.valueOf(data.f0); + * } + * + * @Override + * public String getValueFromData(Tuple2 data) { + * return String.valueOf(data.f1); + * } + * } + * JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder() + * .setHost(REDIS_HOST) + * .setPort(REDIS_PORT).build(); + * new RedisSink(jedisPoolConfig, new RedisExampleDataMapper()); + * } + * + * @param Type of the elements emitted by this sink + */ +public class RedisSink extends RichSinkFunction{ + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisSink.class); + + private String additionalKey; + private RedisMapper redisSinkMapper; + private RedisDataType redisDataType; + + private JedisPoolConfig jedisPoolConfig; + private JedisSentinelConfig jedisSentinelConfig; + private JedisClusterConfig jedisClusterConfig; + + private
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58111329 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; + +import redis.clients.jedis.exceptions.JedisException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * A sink that delivers data to a Redis channel using the Jedis client. + * + * + * When creating the sink using first constructor {@link #RedisSink(JedisPoolConfig, RedisMapper)} + * the sink will create connection using {@link redis.clients.jedis.JedisPool}. + * + * When using second constructor {@link #RedisSink(JedisSentinelConfig, RedisMapper)} the sink will create connection + * using {@link redis.clients.jedis.JedisSentinelPool} to redis cluster. Use this if redis is + * configured using sentinels else use the third constructor {@link #RedisSink(JedisClusterConfig, RedisMapper)} + * which use {@link redis.clients.jedis.JedisCluster} to connect to redis cluster. + * + * + * Example: + * + * {@code + * + * public static class RedisExampleDataMapper implements RedisMapper>{ + * @Override + * public RedisDataTypeDescription getDataTypeDescription() { + * return new RedisDataTypeDescription(dataType, REDIS_ADDITIONAL_KEY); + * } + * @Override + * public String getKeyFromData(Tuple2 data) { + * return String.valueOf(data.f0); + * } + * + * @Override + * public String getValueFromData(Tuple2 data) { + * return String.valueOf(data.f1); + * } + * } + * JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder() + * .setHost(REDIS_HOST) + * .setPort(REDIS_PORT).build(); + * new RedisSink(jedisPoolConfig, new RedisExampleDataMapper()); + * } + * + * @param Type of the elements emitted by this sink + */ +public class RedisSink extends RichSinkFunction{ + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisSink.class); + + private String additionalKey; + private RedisMapper redisSinkMapper; + private RedisDataType redisDataType; + + private JedisPoolConfig jedisPoolConfig; + private JedisSentinelConfig jedisSentinelConfig; + private JedisClusterConfig jedisClusterConfig; + + private RedisCommandsContainer redisCommandsContainer; + + /** +* Creates a new RedisSink that connects to the Redis Server +* +* @param jedisPoolConfig The configuration of {@link JedisPoolConfig} +* @param
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220494#comment-15220494 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58110997 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; + +import redis.clients.jedis.exceptions.JedisException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * A sink that delivers data to a Redis channel using the Jedis client. + * + * + * When creating the sink using first constructor {@link #RedisSink(JedisPoolConfig, RedisMapper)} + * the sink will create connection using {@link redis.clients.jedis.JedisPool}. + * + * When using second constructor {@link #RedisSink(JedisSentinelConfig, RedisMapper)} the sink will create connection + * using {@link redis.clients.jedis.JedisSentinelPool} to redis cluster. Use this if redis is + * configured using sentinels else use the third constructor {@link #RedisSink(JedisClusterConfig, RedisMapper)} + * which use {@link redis.clients.jedis.JedisCluster} to connect to redis cluster. + * + * + * Example: + * + * {@code + * + * public static class RedisExampleDataMapper implements RedisMapper>{ + * @Override + * public RedisDataTypeDescription getDataTypeDescription() { + * return new RedisDataTypeDescription(dataType, REDIS_ADDITIONAL_KEY); + * } + * @Override + * public String getKeyFromData(Tuple2 data) { + * return String.valueOf(data.f0); + * } + * + * @Override + * public String getValueFromData(Tuple2 data) { + * return String.valueOf(data.f1); + * } + * } + * JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder() + * .setHost(REDIS_HOST) + * .setPort(REDIS_PORT).build(); + * new RedisSink(jedisPoolConfig, new RedisExampleDataMapper()); + * } + * + * @param Type of the elements emitted by this sink + */ +public class RedisSink extends RichSinkFunction{ + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisSink.class); + + private String additionalKey; + private RedisMapper redisSinkMapper; + private RedisDataType redisDataType; + + private JedisPoolConfig jedisPoolConfig; + private JedisSentinelConfig jedisSentinelConfig; + private JedisClusterConfig jedisClusterConfig; + + private
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58110997 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; + +import redis.clients.jedis.exceptions.JedisException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * A sink that delivers data to a Redis channel using the Jedis client. + * + * + * When creating the sink using first constructor {@link #RedisSink(JedisPoolConfig, RedisMapper)} + * the sink will create connection using {@link redis.clients.jedis.JedisPool}. + * + * When using second constructor {@link #RedisSink(JedisSentinelConfig, RedisMapper)} the sink will create connection + * using {@link redis.clients.jedis.JedisSentinelPool} to redis cluster. Use this if redis is + * configured using sentinels else use the third constructor {@link #RedisSink(JedisClusterConfig, RedisMapper)} + * which use {@link redis.clients.jedis.JedisCluster} to connect to redis cluster. + * + * + * Example: + * + * {@code + * + * public static class RedisExampleDataMapper implements RedisMapper>{ + * @Override + * public RedisDataTypeDescription getDataTypeDescription() { + * return new RedisDataTypeDescription(dataType, REDIS_ADDITIONAL_KEY); + * } + * @Override + * public String getKeyFromData(Tuple2 data) { + * return String.valueOf(data.f0); + * } + * + * @Override + * public String getValueFromData(Tuple2 data) { + * return String.valueOf(data.f1); + * } + * } + * JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder() + * .setHost(REDIS_HOST) + * .setPort(REDIS_PORT).build(); + * new RedisSink(jedisPoolConfig, new RedisExampleDataMapper()); + * } + * + * @param Type of the elements emitted by this sink + */ +public class RedisSink extends RichSinkFunction{ + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisSink.class); + + private String additionalKey; + private RedisMapper redisSinkMapper; + private RedisDataType redisDataType; + + private JedisPoolConfig jedisPoolConfig; + private JedisSentinelConfig jedisSentinelConfig; + private JedisClusterConfig jedisClusterConfig; + + private RedisCommandsContainer redisCommandsContainer; + + /** +* Creates a new RedisSink that connects to the Redis Server +* +* @param jedisPoolConfig The configuration of {@link JedisPoolConfig} +* @param
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220489#comment-15220489 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58110429 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; + +import redis.clients.jedis.exceptions.JedisException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * A sink that delivers data to a Redis channel using the Jedis client. + * + * + * When creating the sink using first constructor {@link #RedisSink(JedisPoolConfig, RedisMapper)} + * the sink will create connection using {@link redis.clients.jedis.JedisPool}. + * + * When using second constructor {@link #RedisSink(JedisSentinelConfig, RedisMapper)} the sink will create connection + * using {@link redis.clients.jedis.JedisSentinelPool} to redis cluster. Use this if redis is + * configured using sentinels else use the third constructor {@link #RedisSink(JedisClusterConfig, RedisMapper)} + * which use {@link redis.clients.jedis.JedisCluster} to connect to redis cluster. + * + * + * Example: + * + * {@code + * + * public static class RedisExampleDataMapper implements RedisMapper>{ + * @Override + * public RedisDataTypeDescription getDataTypeDescription() { + * return new RedisDataTypeDescription(dataType, REDIS_ADDITIONAL_KEY); + * } + * @Override + * public String getKeyFromData(Tuple2 data) { + * return String.valueOf(data.f0); + * } + * + * @Override + * public String getValueFromData(Tuple2 data) { + * return String.valueOf(data.f1); + * } + * } + * JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder() + * .setHost(REDIS_HOST) + * .setPort(REDIS_PORT).build(); + * new RedisSink(jedisPoolConfig, new RedisExampleDataMapper()); + * } + * + * @param Type of the elements emitted by this sink + */ +public class RedisSink extends RichSinkFunction{ + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisSink.class); + + private String additionalKey; --- End diff -- For me as an redis newby, it is unclear was `additionalKey` might be. Might be good to add a comment with a short explanation/description. > Redis SInk Connector > > > Key: FLINK-3034 > URL:
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58110429 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; + +import redis.clients.jedis.exceptions.JedisException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * A sink that delivers data to a Redis channel using the Jedis client. + * + * + * When creating the sink using first constructor {@link #RedisSink(JedisPoolConfig, RedisMapper)} + * the sink will create connection using {@link redis.clients.jedis.JedisPool}. + * + * When using second constructor {@link #RedisSink(JedisSentinelConfig, RedisMapper)} the sink will create connection + * using {@link redis.clients.jedis.JedisSentinelPool} to redis cluster. Use this if redis is + * configured using sentinels else use the third constructor {@link #RedisSink(JedisClusterConfig, RedisMapper)} + * which use {@link redis.clients.jedis.JedisCluster} to connect to redis cluster. + * + * + * Example: + * + * {@code + * + * public static class RedisExampleDataMapper implements RedisMapper>{ + * @Override + * public RedisDataTypeDescription getDataTypeDescription() { + * return new RedisDataTypeDescription(dataType, REDIS_ADDITIONAL_KEY); + * } + * @Override + * public String getKeyFromData(Tuple2 data) { + * return String.valueOf(data.f0); + * } + * + * @Override + * public String getValueFromData(Tuple2 data) { + * return String.valueOf(data.f1); + * } + * } + * JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder() + * .setHost(REDIS_HOST) + * .setPort(REDIS_PORT).build(); + * new RedisSink(jedisPoolConfig, new RedisExampleDataMapper()); + * } + * + * @param Type of the elements emitted by this sink + */ +public class RedisSink extends RichSinkFunction{ + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RedisSink.class); + + private String additionalKey; --- End diff -- For me as an redis newby, it is unclear was `additionalKey` might be. Might be good to add a comment with a short explanation/description. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220484#comment-15220484 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58110110 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; + +import redis.clients.jedis.exceptions.JedisException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * A sink that delivers data to a Redis channel using the Jedis client. + * + * + * When creating the sink using first constructor {@link #RedisSink(JedisPoolConfig, RedisMapper)} + * the sink will create connection using {@link redis.clients.jedis.JedisPool}. + * + * When using second constructor {@link #RedisSink(JedisSentinelConfig, RedisMapper)} the sink will create connection + * using {@link redis.clients.jedis.JedisSentinelPool} to redis cluster. Use this if redis is + * configured using sentinels else use the third constructor {@link #RedisSink(JedisClusterConfig, RedisMapper)} + * which use {@link redis.clients.jedis.JedisCluster} to connect to redis cluster. + * + * + * Example: + * + * {@code + * + * public static class RedisExampleDataMapper implements RedisMapper>{ + * @Override + * public RedisDataTypeDescription getDataTypeDescription() { + * return new RedisDataTypeDescription(dataType, REDIS_ADDITIONAL_KEY); + * } + * @Override + * public String getKeyFromData(Tuple2 data) { + * return String.valueOf(data.f0); + * } + * --- End diff -- remove this line or insert blank lines in between all methods for consistency > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58110110 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; + +import redis.clients.jedis.exceptions.JedisException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * A sink that delivers data to a Redis channel using the Jedis client. + * + * + * When creating the sink using first constructor {@link #RedisSink(JedisPoolConfig, RedisMapper)} + * the sink will create connection using {@link redis.clients.jedis.JedisPool}. + * + * When using second constructor {@link #RedisSink(JedisSentinelConfig, RedisMapper)} the sink will create connection + * using {@link redis.clients.jedis.JedisSentinelPool} to redis cluster. Use this if redis is + * configured using sentinels else use the third constructor {@link #RedisSink(JedisClusterConfig, RedisMapper)} + * which use {@link redis.clients.jedis.JedisCluster} to connect to redis cluster. + * + * + * Example: + * + * {@code + * + * public static class RedisExampleDataMapper implements RedisMapper>{ + * @Override + * public RedisDataTypeDescription getDataTypeDescription() { + * return new RedisDataTypeDescription(dataType, REDIS_ADDITIONAL_KEY); + * } + * @Override + * public String getKeyFromData(Tuple2 data) { + * return String.valueOf(data.f0); + * } + * --- End diff -- remove this line or insert blank lines in between all methods for consistency --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220476#comment-15220476 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58109874 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; + +import redis.clients.jedis.exceptions.JedisException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * A sink that delivers data to a Redis channel using the Jedis client. + * + * --- End diff -- unnecessary leading blank before > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58109874 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; + +import redis.clients.jedis.exceptions.JedisException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * A sink that delivers data to a Redis channel using the Jedis client. + * + * --- End diff -- unnecessary leading blank before --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220472#comment-15220472 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58109328 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; + +import redis.clients.jedis.exceptions.JedisException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * A sink that delivers data to a Redis channel using the Jedis client. + * + * + * When creating the sink using first constructor {@link #RedisSink(JedisPoolConfig, RedisMapper)} + * the sink will create connection using {@link redis.clients.jedis.JedisPool}. + * + * When using second constructor {@link #RedisSink(JedisSentinelConfig, RedisMapper)} the sink will create connection + * using {@link redis.clients.jedis.JedisSentinelPool} to redis cluster. Use this if redis is + * configured using sentinels else use the third constructor {@link #RedisSink(JedisClusterConfig, RedisMapper)} + * which use {@link redis.clients.jedis.JedisCluster} to connect to redis cluster. --- End diff -- redis => Redis > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220471#comment-15220471 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58109313 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; + +import redis.clients.jedis.exceptions.JedisException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * A sink that delivers data to a Redis channel using the Jedis client. + * + * + * When creating the sink using first constructor {@link #RedisSink(JedisPoolConfig, RedisMapper)} + * the sink will create connection using {@link redis.clients.jedis.JedisPool}. + * + * When using second constructor {@link #RedisSink(JedisSentinelConfig, RedisMapper)} the sink will create connection + * using {@link redis.clients.jedis.JedisSentinelPool} to redis cluster. Use this if redis is --- End diff -- redis => Redis > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Matthias J. Sax >Assignee: Subhankar Biswas >Priority: Minor > > Flink does not provide a sink connector for Redis. > See FLINK-3033 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58109328 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; + +import redis.clients.jedis.exceptions.JedisException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * A sink that delivers data to a Redis channel using the Jedis client. + * + * + * When creating the sink using first constructor {@link #RedisSink(JedisPoolConfig, RedisMapper)} + * the sink will create connection using {@link redis.clients.jedis.JedisPool}. + * + * When using second constructor {@link #RedisSink(JedisSentinelConfig, RedisMapper)} the sink will create connection + * using {@link redis.clients.jedis.JedisSentinelPool} to redis cluster. Use this if redis is + * configured using sentinels else use the third constructor {@link #RedisSink(JedisClusterConfig, RedisMapper)} + * which use {@link redis.clients.jedis.JedisCluster} to connect to redis cluster. --- End diff -- redis => Redis --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58109313 --- Diff: flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.redis; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig; +import org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer; +import org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; + +import redis.clients.jedis.exceptions.JedisException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * A sink that delivers data to a Redis channel using the Jedis client. + * + * + * When creating the sink using first constructor {@link #RedisSink(JedisPoolConfig, RedisMapper)} + * the sink will create connection using {@link redis.clients.jedis.JedisPool}. + * + * When using second constructor {@link #RedisSink(JedisSentinelConfig, RedisMapper)} the sink will create connection + * using {@link redis.clients.jedis.JedisSentinelPool} to redis cluster. Use this if redis is --- End diff -- redis => Redis --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3034) Redis SInk Connector
[ https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220454#comment-15220454 ] ASF GitHub Bot commented on FLINK-3034: --- Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58108235 --- Diff: docs/apis/streaming/connectors/redis.md --- @@ -0,0 +1,172 @@ +--- +title: "Redis Connector" + +# Sub-level navigation +sub-nav-group: streaming +sub-nav-parent: connectors +sub-nav-pos: 6 +sub-nav-title: Redis +--- + + +This connector provides a Sink that can write to +[Redis](http://redis.io/) and also can publish data to [Redis PubSub](http://redis.io/topics/pubsub). To use this connector, add the +following dependency to your project: +{% highlight xml %} + + org.apache.flink + flink-connector-redis{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution [here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution). + + Installing Redis +Follow the instructions from the [Redis download page](http://redis.io/download). + + Redis Sink +A class providing an interface for sending data to Redis. +The sink can use three different methods for communicating with different type of Redis environments: +1. Single Redis Server +2. Redis Cluster +3. Redis Sentinel + +This code shows how to create a sink that communicate to a single redis server: + + + +{% highlight java %} +public static class RedisExampleMapper implements RedisMapper>{ + +@Override +public RedisDataTypeDescription getDataTypeDescription() { +return new RedisDataTypeDescription(RedisDataType.HASH, "HASH_NAME"); +} + +@Override +public String getKeyFromData(Tuple2 data) { +return data.f0; +} + +@Override +public String getValueFromData(Tuple2 data) { +return data.f1; +} +} +JedisPoolConfig conf = new JedisPoolConfig.Builder().setHost("127.0.0.1").build(); + +DataStream stream = ...; +stream.addSink(new RedisSink >(conf, new RedisExampleMapper()); +{% endhighlight %} + + +{% highlight scala %} +class RedisExampleMapper extends RedisMapper[(String, String)]{ + override def getDataTypeDescription: RedisDataTypeDescription = { +new RedisDataTypeDescription(RedisDataType.HASH, "HASH_NAME") + } + + override def getKeyFromData(data: (String, String)): String = data._1 + + override def getValueFromData(data: (String, String)): String = data._2 +} +val conf = new JedisPoolConfig.Builder().setHost("127.0.0.1").build() +stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper)) +{% endhighlight %} + + + +This example code does the same, but for Redis Cluster: + + + +{% highlight java %} + +JedisPoolConfig conf = new JedisClusterConfig.Builder() +.setNodes(new HashSet(Arrays.asList(new InetSocketAddress(5601.build(); + +DataStream stream = ...; +stream.addSink(new RedisSink >(conf, new RedisExampleMapper()); +{% endhighlight %} + + +{% highlight scala %} +val conf = new JedisClusterConfig.Builder().setNodes(...).build() +stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper)) +{% endhighlight %} + + + +This example shows when the Redis environment is with Sentinels: + + + +{% highlight java %} + +JedisSentinelConfig conf = new JedisSentinelConfig.Builder().setMasterName("master").setSentinels(...).build(); + +DataStream stream = ...; +stream.addSink(new RedisSink >(conf, new RedisExampleMapper()); +{% endhighlight %} + + +{% highlight scala %} +val conf = new JedisSentinelConfig.Builder().setMasterName("master").setSentinels(...).build() +stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper)) +{% endhighlight %} + + + +This section gives a description of all the available Data types and what redis command used for that. --- End diff -- Typo: Data types => data types > Redis SInk Connector > > > Key: FLINK-3034 > URL: https://issues.apache.org/jira/browse/FLINK-3034 > Project: Flink > Issue Type: New Feature > Components:
[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/1813#discussion_r58108235 --- Diff: docs/apis/streaming/connectors/redis.md --- @@ -0,0 +1,172 @@ +--- +title: "Redis Connector" + +# Sub-level navigation +sub-nav-group: streaming +sub-nav-parent: connectors +sub-nav-pos: 6 +sub-nav-title: Redis +--- + + +This connector provides a Sink that can write to +[Redis](http://redis.io/) and also can publish data to [Redis PubSub](http://redis.io/topics/pubsub). To use this connector, add the +following dependency to your project: +{% highlight xml %} + + org.apache.flink + flink-connector-redis{{ site.scala_version_suffix }} + {{site.version }} + +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution [here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution). + + Installing Redis +Follow the instructions from the [Redis download page](http://redis.io/download). + + Redis Sink +A class providing an interface for sending data to Redis. +The sink can use three different methods for communicating with different type of Redis environments: +1. Single Redis Server +2. Redis Cluster +3. Redis Sentinel + +This code shows how to create a sink that communicate to a single redis server: + + + +{% highlight java %} +public static class RedisExampleMapper implements RedisMapper>{ + +@Override +public RedisDataTypeDescription getDataTypeDescription() { +return new RedisDataTypeDescription(RedisDataType.HASH, "HASH_NAME"); +} + +@Override +public String getKeyFromData(Tuple2 data) { +return data.f0; +} + +@Override +public String getValueFromData(Tuple2 data) { +return data.f1; +} +} +JedisPoolConfig conf = new JedisPoolConfig.Builder().setHost("127.0.0.1").build(); + +DataStream stream = ...; +stream.addSink(new RedisSink >(conf, new RedisExampleMapper()); +{% endhighlight %} + + +{% highlight scala %} +class RedisExampleMapper extends RedisMapper[(String, String)]{ + override def getDataTypeDescription: RedisDataTypeDescription = { +new RedisDataTypeDescription(RedisDataType.HASH, "HASH_NAME") + } + + override def getKeyFromData(data: (String, String)): String = data._1 + + override def getValueFromData(data: (String, String)): String = data._2 +} +val conf = new JedisPoolConfig.Builder().setHost("127.0.0.1").build() +stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper)) +{% endhighlight %} + + + +This example code does the same, but for Redis Cluster: + + + +{% highlight java %} + +JedisPoolConfig conf = new JedisClusterConfig.Builder() +.setNodes(new HashSet(Arrays.asList(new InetSocketAddress(5601.build(); + +DataStream stream = ...; +stream.addSink(new RedisSink >(conf, new RedisExampleMapper()); +{% endhighlight %} + + +{% highlight scala %} +val conf = new JedisClusterConfig.Builder().setNodes(...).build() +stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper)) +{% endhighlight %} + + + +This example shows when the Redis environment is with Sentinels: + + + +{% highlight java %} + +JedisSentinelConfig conf = new JedisSentinelConfig.Builder().setMasterName("master").setSentinels(...).build(); + +DataStream stream = ...; +stream.addSink(new RedisSink >(conf, new RedisExampleMapper()); +{% endhighlight %} + + +{% highlight scala %} +val conf = new JedisSentinelConfig.Builder().setMasterName("master").setSentinels(...).build() +stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper)) +{% endhighlight %} + + + +This section gives a description of all the available Data types and what redis command used for that. --- End diff -- Typo: Data types => data types --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [docs] Add first version of "Concepts" documen...
GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/1843 [docs] Add first version of "Concepts" documentation. I think that the Flink docs (linked from the website) need a general introduction of the concepts behind Flink. Such a thing is important for people to get a "mental model" of what Flink does. Also, most of the later documentation assumes that people know about operators, streams, parallelism, JobManagers, TaskManagers, etc. This **Concepts** page introduces these things briefly and high level, so people get a basic understanding of the who-is-who and what-is-what in Flink. *Since these pull requests don't render the SVG files, I added them here.* ![prorgam_dataflow](https://cloud.githubusercontent.com/assets/1727146/14184836/6f41aefe-f777-11e5-8eb4-eab949d1790f.png) -- ![parallel_dataflow](https://cloud.githubusercontent.com/assets/1727146/14184846/7f663e9e-f777-11e5-9cee-930fa8a95957.png) -- ![operators_tasks_chaining](https://cloud.githubusercontent.com/assets/1727146/14184849/833a4e3e-f777-11e5-8146-25343dad233b.png) -- ![processes](https://cloud.githubusercontent.com/assets/1727146/14184861/8af17aee-f777-11e5-831d-d044002d7e1b.png) -- ![tasks_slots](https://cloud.githubusercontent.com/assets/1727146/14184863/8dd6c98a-f777-11e5-8488-b50bcbf28dea.png) -- ![slot_sharing](https://cloud.githubusercontent.com/assets/1727146/14184868/916b5746-f777-11e5-9924-48ddae65e767.png) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink concepts Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1843.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1843 commit 3b4f9d2859684f6537d623fc871c5d1d6a1c2fcb Author: Stephan EwenDate: 2016-03-31T17:25:35Z [docs] Add first version of "concepts" documentation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2522) Integrate Streaming Api into Flink-scala-shell
[ https://issues.apache.org/jira/browse/FLINK-2522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220273#comment-15220273 ] ASF GitHub Bot commented on FLINK-2522: --- Github user nikste commented on a diff in the pull request: https://github.com/apache/flink/pull/1412#discussion_r58094009 --- Diff: flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala --- @@ -124,7 +127,9 @@ class FlinkILoop( "org.apache.flink.api.java.operators._", "org.apache.flink.api.java.sampling._", "org.apache.flink.api.scala._", -"org.apache.flink.api.scala.utils._" +"org.apache.flink.api.scala.utils._", +"org.apache.flink.streaming.api.scala._", +"org.apache.flink.streaming.api.windowing.time._" --- End diff -- I've added `streaming.api.scala` and `windowing.time` any other default imports important for streaming? > Integrate Streaming Api into Flink-scala-shell > -- > > Key: FLINK-2522 > URL: https://issues.apache.org/jira/browse/FLINK-2522 > Project: Flink > Issue Type: Improvement > Components: Scala Shell >Reporter: Nikolaas Steenbergen >Assignee: Nikolaas Steenbergen > > startup scala shell with "-s" or "-streaming" flag to use the streaming api -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...
Github user nikste commented on a diff in the pull request: https://github.com/apache/flink/pull/1412#discussion_r58094009 --- Diff: flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala --- @@ -124,7 +127,9 @@ class FlinkILoop( "org.apache.flink.api.java.operators._", "org.apache.flink.api.java.sampling._", "org.apache.flink.api.scala._", -"org.apache.flink.api.scala.utils._" +"org.apache.flink.api.scala.utils._", +"org.apache.flink.streaming.api.scala._", +"org.apache.flink.streaming.api.windowing.time._" --- End diff -- I've added `streaming.api.scala` and `windowing.time` any other default imports important for streaming? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...
Github user nikste commented on the pull request: https://github.com/apache/flink/pull/1412#issuecomment-204038110 So since a lot has changed in the master I basically rewrote the whole thing. Seems to work now. What default imports should be included for the streaming API? Also I noticed, that the commit history of the files is lost after moving the module from staging. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2522) Integrate Streaming Api into Flink-scala-shell
[ https://issues.apache.org/jira/browse/FLINK-2522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220268#comment-15220268 ] ASF GitHub Bot commented on FLINK-2522: --- Github user nikste commented on the pull request: https://github.com/apache/flink/pull/1412#issuecomment-204038110 So since a lot has changed in the master I basically rewrote the whole thing. Seems to work now. What default imports should be included for the streaming API? Also I noticed, that the commit history of the files is lost after moving the module from staging. > Integrate Streaming Api into Flink-scala-shell > -- > > Key: FLINK-2522 > URL: https://issues.apache.org/jira/browse/FLINK-2522 > Project: Flink > Issue Type: Improvement > Components: Scala Shell >Reporter: Nikolaas Steenbergen >Assignee: Nikolaas Steenbergen > > startup scala shell with "-s" or "-streaming" flag to use the streaming api -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2522) Integrate Streaming Api into Flink-scala-shell
[ https://issues.apache.org/jira/browse/FLINK-2522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15219900#comment-15219900 ] ASF GitHub Bot commented on FLINK-2522: --- Github user rawkintrevo commented on the pull request: https://github.com/apache/flink/pull/1412#issuecomment-203953994 @nikste awesome work, any updates? > Integrate Streaming Api into Flink-scala-shell > -- > > Key: FLINK-2522 > URL: https://issues.apache.org/jira/browse/FLINK-2522 > Project: Flink > Issue Type: Improvement > Components: Scala Shell >Reporter: Nikolaas Steenbergen >Assignee: Nikolaas Steenbergen > > startup scala shell with "-s" or "-streaming" flag to use the streaming api -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...
Github user rawkintrevo commented on the pull request: https://github.com/apache/flink/pull/1412#issuecomment-203953994 @nikste awesome work, any updates? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3669) WindowOperator registers a lot of timers at StreamTask
[ https://issues.apache.org/jira/browse/FLINK-3669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15219865#comment-15219865 ] Konstantin Knauf commented on FLINK-3669: - In my opionen the 1st option is the cleanest, but I can hardly imagine a usecase for which the 3rd option would be problem, so I think it is totally fine. In this case ´registerProcessingTimeTimer()´ should document that the timestamp is "ceiled" to the next interval and refer to the configuration parameter. > WindowOperator registers a lot of timers at StreamTask > -- > > Key: FLINK-3669 > URL: https://issues.apache.org/jira/browse/FLINK-3669 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.0.1 >Reporter: Aljoscha Krettek >Priority: Blocker > > Right now, the WindowOperator registers a timer at the StreamTask for every > processing-time timer that a Trigger registers. We should combine several > registered trigger timers to only register one low-level timer (timer > coalescing). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3669) WindowOperator registers a lot of timers at StreamTask
[ https://issues.apache.org/jira/browse/FLINK-3669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15219845#comment-15219845 ] Aljoscha Krettek commented on FLINK-3669: - Yes, you're right that's still a problem. The issue with removing the TriggerTasks is, that a) right now, the WindowOperator does not have a handle to the ScheduledFuture and b) if we combine the triggers for a certain timestamp into one trigger we cannot simply delete that TriggerTask until we know that all timers in the WindowOperator for that timestamp have been removed. By now, I see basically three solutions for our problem: 1. Create one TriggerTask per {{registerProcessingTimeTimer()}} call of the user (as is the case now), keep the {{ScheduledFuture}}, remove the timer when the user calls {{deleteProcessingTimeTimer()}} 2. Coalesce multiple {{registerProcessingTimeTimer()}} calls to only create one TriggerTask to reduce the burden on the {{ScheduledThreadPoolExecutor}}, keep track of how many user triggers we have for a timestamp, remove using the {{ScheduledFuture}} once the number drops to zero 3. Change the timer service to call {{trigger()}} of the {{WindowOperator}} on a fixed interval, say 10 ms, the logic in {{WindowOperator.trigger()}} already internally checks which triggers should fire based on the timestamp given. This would mean that the burden on {{ScheduledThreadPoolExecutor}} is constant does not depend on the number of keys or number of user timers registered but it is potentially wasteful. (when I say user above, I mean the {{Trigger}} implementation) I think I would go with option 3. the granularity can be user configurable, of course. There is, of course, a tradeoff. If the user has very large windows firing all those timers is a huge waste, but the other options require to hold a data structure to keep track of things. What do you think? > WindowOperator registers a lot of timers at StreamTask > -- > > Key: FLINK-3669 > URL: https://issues.apache.org/jira/browse/FLINK-3669 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.0.1 >Reporter: Aljoscha Krettek >Priority: Blocker > > Right now, the WindowOperator registers a timer at the StreamTask for every > processing-time timer that a Trigger registers. We should combine several > registered trigger timers to only register one low-level timer (timer > coalescing). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3614] Remove Non-Keyed Window Operator
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1805#issuecomment-203875675 Any thoughts on this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3614) Remove Non-Keyed Window Operator
[ https://issues.apache.org/jira/browse/FLINK-3614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15219732#comment-15219732 ] ASF GitHub Bot commented on FLINK-3614: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1805#issuecomment-203875675 Any thoughts on this? > Remove Non-Keyed Window Operator > > > Key: FLINK-3614 > URL: https://issues.apache.org/jira/browse/FLINK-3614 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > I propose to remove the special Non-Keyed Window Operator and implement > non-parallel windows by using the standard WindowOperator with a dummy > KeySelector. > Maintaining everything for two WindowOperators is a huge burden. The > implementation is completely separate by now. For example, the Non-Keyed > window operator does not use the StateBackend for state, i.e. cannot use > RocksDB. Also, with upcoming changes (Merging/Session windows, aligned > windows) this will only increase the maintenance burden. > Also, the fast AlignedProcessingTimeWindows operators also only support the > Parallel/Keyed case. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3685) Logical error in code for DateSerializer deserialize with reuse
ZhengBowen created FLINK-3685: - Summary: Logical error in code for DateSerializer deserialize with reuse Key: FLINK-3685 URL: https://issues.apache.org/jira/browse/FLINK-3685 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.0.0 Reporter: ZhengBowen There is a logical error in the following function in DateSerializer.java when source read '-1' function is: ``` public Date deserialize(Date reuse, DataInputView source) throws IOException { long v = source.readLong(); if(v == -1L) { return null; } reuse.setTime(v); return reuse; } ``` when call this function for first time, if return null, then 'reuse' will be set null by caller; when call this function for second time,if 'v!=-1' ,reuse.setTime(v) will throw NPE. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-3579 Improve String concatenation (Ram)
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1821#discussion_r58029854 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala --- @@ -59,6 +59,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { Literal(str.toInt) } else if (str.endsWith("f") | str.endsWith("F")) { Literal(str.toFloat) +} else if (str.endsWith(".")) { --- End diff -- Of course both are valid expressions but `1.` is simply not a valid number literal. Instead `1` would be valid. I guess you should change your grammar such that `1.abs()` will parsed into something like `functionApplication(NumberLiteral(1), abs)` and the `.` indicates the function application but is not part of the number literal token. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3579) Improve String concatenation
[ https://issues.apache.org/jira/browse/FLINK-3579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15219703#comment-15219703 ] ASF GitHub Bot commented on FLINK-3579: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1821#discussion_r58029854 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala --- @@ -59,6 +59,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { Literal(str.toInt) } else if (str.endsWith("f") | str.endsWith("F")) { Literal(str.toFloat) +} else if (str.endsWith(".")) { --- End diff -- Of course both are valid expressions but `1.` is simply not a valid number literal. Instead `1` would be valid. I guess you should change your grammar such that `1.abs()` will parsed into something like `functionApplication(NumberLiteral(1), abs)` and the `.` indicates the function application but is not part of the number literal token. > Improve String concatenation > > > Key: FLINK-3579 > URL: https://issues.apache.org/jira/browse/FLINK-3579 > Project: Flink > Issue Type: Bug > Components: Table API >Reporter: Timo Walther >Assignee: ramkrishna.s.vasudevan >Priority: Minor > > Concatenation of a String and non-String does not work properly. > e.g. {{f0 + 42}} leads to RelBuilder Exception > ExpressionParser does not like {{f0 + 42.cast(STRING)}} either. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3684) CEP operator does not forward watermarks properly
[ https://issues.apache.org/jira/browse/FLINK-3684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15219674#comment-15219674 ] ASF GitHub Bot commented on FLINK-3684: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1842 > CEP operator does not forward watermarks properly > - > > Key: FLINK-3684 > URL: https://issues.apache.org/jira/browse/FLINK-3684 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.1.0, 1.0.1 > > > The CEP stream operator don't emit a proper watermark when using event time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3684] [cep] Add proper watermark emissi...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1842 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-3684) CEP operator does not forward watermarks properly
[ https://issues.apache.org/jira/browse/FLINK-3684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-3684. Resolution: Fixed Fixed via e3759a5e65181040066dbb278266f2c1fa226347 > CEP operator does not forward watermarks properly > - > > Key: FLINK-3684 > URL: https://issues.apache.org/jira/browse/FLINK-3684 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.1.0, 1.0.1 > > > The CEP stream operator don't emit a proper watermark when using event time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3684) CEP operator does not forward watermarks properly
[ https://issues.apache.org/jira/browse/FLINK-3684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15219666#comment-15219666 ] ASF GitHub Bot commented on FLINK-3684: --- Github user uce commented on the pull request: https://github.com/apache/flink/pull/1842#issuecomment-203854053 +1 to merge. > CEP operator does not forward watermarks properly > - > > Key: FLINK-3684 > URL: https://issues.apache.org/jira/browse/FLINK-3684 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.1.0, 1.0.1 > > > The CEP stream operator don't emit a proper watermark when using event time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3684) CEP operator does not forward watermarks properly
[ https://issues.apache.org/jira/browse/FLINK-3684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15219669#comment-15219669 ] ASF GitHub Bot commented on FLINK-3684: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1842#issuecomment-203854306 Changes are trivial and local maven verify passes. Will merge it. > CEP operator does not forward watermarks properly > - > > Key: FLINK-3684 > URL: https://issues.apache.org/jira/browse/FLINK-3684 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.1.0, 1.0.1 > > > The CEP stream operator don't emit a proper watermark when using event time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3684] [cep] Add proper watermark emissi...
Github user uce commented on the pull request: https://github.com/apache/flink/pull/1842#issuecomment-203854053 +1 to merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3684) CEP operator does not forward watermarks properly
[ https://issues.apache.org/jira/browse/FLINK-3684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15219662#comment-15219662 ] ASF GitHub Bot commented on FLINK-3684: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/1842 [FLINK-3684] [cep] Add proper watermark emission to CEP operators Adds watermark emission to CEP stream operators. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixCEPWatermarkPropagation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1842.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1842 commit 9feb6eac17a0b90cf6f30f06adb4a7b97e2d9382 Author: Till RohrmannDate: 2016-03-31T09:35:19Z [FLINK-3684] [cep] Add proper watermark emission to CEP operators > CEP operator does not forward watermarks properly > - > > Key: FLINK-3684 > URL: https://issues.apache.org/jira/browse/FLINK-3684 > Project: Flink > Issue Type: Bug > Components: CEP >Affects Versions: 1.0.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Fix For: 1.1.0, 1.0.1 > > > The CEP stream operator don't emit a proper watermark when using event time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3684) CEP operator does not forward watermarks properly
Till Rohrmann created FLINK-3684: Summary: CEP operator does not forward watermarks properly Key: FLINK-3684 URL: https://issues.apache.org/jira/browse/FLINK-3684 Project: Flink Issue Type: Bug Components: CEP Affects Versions: 1.0.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Fix For: 1.1.0, 1.0.1 The CEP stream operator don't emit a proper watermark when using event time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3684] [cep] Add proper watermark emissi...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/1842 [FLINK-3684] [cep] Add proper watermark emission to CEP operators Adds watermark emission to CEP stream operators. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixCEPWatermarkPropagation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1842.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1842 commit 9feb6eac17a0b90cf6f30f06adb4a7b97e2d9382 Author: Till RohrmannDate: 2016-03-31T09:35:19Z [FLINK-3684] [cep] Add proper watermark emission to CEP operators --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---