[GitHub] flink pull request: [FLINK-3541] [Kafka Connector] Clean up workar...

2016-03-31 Thread skyahead
GitHub user skyahead opened a pull request:

https://github.com/apache/flink/pull/1846

[FLINK-3541] [Kafka Connector] Clean up workaround in FlinkKafkaConsu…

…mer09

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/skyahead/flink FLINK-3541

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1846.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1846


commit 49c291a5bd06f1468bcee40f03a7bbea3bb1be29
Author: Tianji Li 
Date:   2016-04-01T04:35:39Z

[FLINK-3541] [Kafka Connector] Clean up workaround in FlinkKafkaConsumer09




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3541) Clean up workaround in FlinkKafkaConsumer09

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15221142#comment-15221142
 ] 

ASF GitHub Bot commented on FLINK-3541:
---

GitHub user skyahead opened a pull request:

https://github.com/apache/flink/pull/1846

[FLINK-3541] [Kafka Connector] Clean up workaround in FlinkKafkaConsu…

…mer09

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/skyahead/flink FLINK-3541

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1846.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1846


commit 49c291a5bd06f1468bcee40f03a7bbea3bb1be29
Author: Tianji Li 
Date:   2016-04-01T04:35:39Z

[FLINK-3541] [Kafka Connector] Clean up workaround in FlinkKafkaConsumer09




> Clean up workaround in FlinkKafkaConsumer09 
> 
>
> Key: FLINK-3541
> URL: https://issues.apache.org/jira/browse/FLINK-3541
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Priority: Minor
>
> In the current {{FlinkKafkaConsumer09}} implementation, we repeatedly start a 
> new {{KafkaConsumer}} if the method {{KafkaConsumer.partitionsFor}} returns a 
> NPE. This is due to a bug with the Kafka version 0.9.0.0. See 
> https://issues.apache.org/jira/browse/KAFKA-2880. The code can be found in 
> the constructor of {{FlinkKafkaConsumer09.java:208}}.
> However, the problem is marked as fixed for version 0.9.0.1, which we also 
> use for the flink-connector-kafka. Therefore, we should be able to get rid of 
> the workaround.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3635) Potential null deference in TwitterExample#SelectEnglishAndTokenizeFlatMap#flatMap

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15221038#comment-15221038
 ] 

ASF GitHub Bot commented on FLINK-3635:
---

GitHub user skyahead opened a pull request:

https://github.com/apache/flink/pull/1845

[FLINK-3635] Potential null deference in TwitterExample#SelectEnglish…

…AndTokenizeFlatMap#flatMap

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/skyahead/flink FLINK-3635

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1845.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1845


commit 48a8104361341da2eadd8c50c5761d47c64f4651
Author: Tianji Li 
Date:   2016-04-01T02:38:23Z

[FLINK-3635] Potential null deference in 
TwitterExample#SelectEnglishAndTokenizeFlatMap#flatMap




> Potential null deference in 
> TwitterExample#SelectEnglishAndTokenizeFlatMap#flatMap
> --
>
> Key: FLINK-3635
> URL: https://issues.apache.org/jira/browse/FLINK-3635
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class);
>   if (jsonNode.has("user") && 
> jsonNode.get("user").get("lang").asText().equals("en")) {
> {code}
> jsonNode.get("user").get("lang") may return null, leading to 
> NullPointerException.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3635] Potential null deference in Twitt...

2016-03-31 Thread skyahead
GitHub user skyahead opened a pull request:

https://github.com/apache/flink/pull/1845

[FLINK-3635] Potential null deference in TwitterExample#SelectEnglish…

…AndTokenizeFlatMap#flatMap

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/skyahead/flink FLINK-3635

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1845.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1845


commit 48a8104361341da2eadd8c50c5761d47c64f4651
Author: Tianji Li 
Date:   2016-04-01T02:38:23Z

[FLINK-3635] Potential null deference in 
TwitterExample#SelectEnglishAndTokenizeFlatMap#flatMap




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3523) Storm SpoutSplitExample fails with a ClassCastException

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220858#comment-15220858
 ] 

ASF GitHub Bot commented on FLINK-3523:
---

GitHub user mjsax opened a pull request:

https://github.com/apache/flink/pull/1844

[FLINK-3523] [Storm-Compatibility] Added SplitStreamMapper to program…

… to get rid of SplitStreamType wrapper

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mjsax/flink flink-3523-split-example

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1844.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1844


commit 8e518b548909b4604a427c3aca5fb2d33aabbb8f
Author: mjsax 
Date:   2016-03-31T23:23:30Z

[FLINK-3523] [Storm-Compatibility] Added SplitStreamMapper to program to 
get rid of SplitStreamType wrapper




> Storm SpoutSplitExample fails with a ClassCastException
> ---
>
> Key: FLINK-3523
> URL: https://issues.apache.org/jira/browse/FLINK-3523
> Project: Flink
>  Issue Type: Bug
>  Components: Storm Compatibility
>Reporter: Stephan Ewen
>Assignee: Matthias J. Sax
>
> {code}
> Caused by: java.lang.ClassCastException: 
> org.apache.flink.storm.util.SplitStreamType cannot be cast to 
> java.lang.Integer
>   at 
> org.apache.flink.storm.wrappers.StormTuple.getInteger(StormTuple.java:152)
>   at 
> org.apache.flink.storm.split.operators.VerifyAndEnrichBolt.execute(VerifyAndEnrichBolt.java:52)
>   at 
> org.apache.flink.storm.wrappers.BoltWrapper.processElement(BoltWrapper.java:313)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:168)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3523] [Storm-Compatibility] Added Split...

2016-03-31 Thread mjsax
GitHub user mjsax opened a pull request:

https://github.com/apache/flink/pull/1844

[FLINK-3523] [Storm-Compatibility] Added SplitStreamMapper to program…

… to get rid of SplitStreamType wrapper

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mjsax/flink flink-3523-split-example

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1844.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1844


commit 8e518b548909b4604a427c3aca5fb2d33aabbb8f
Author: mjsax 
Date:   2016-03-31T23:23:30Z

[FLINK-3523] [Storm-Compatibility] Added SplitStreamMapper to program to 
get rid of SplitStreamType wrapper




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-3523) Storm SpoutSplitExample fails with a ClassCastException

2016-03-31 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned FLINK-3523:
--

Assignee: Matthias J. Sax

> Storm SpoutSplitExample fails with a ClassCastException
> ---
>
> Key: FLINK-3523
> URL: https://issues.apache.org/jira/browse/FLINK-3523
> Project: Flink
>  Issue Type: Bug
>  Components: Storm Compatibility
>Reporter: Stephan Ewen
>Assignee: Matthias J. Sax
>
> {code}
> Caused by: java.lang.ClassCastException: 
> org.apache.flink.storm.util.SplitStreamType cannot be cast to 
> java.lang.Integer
>   at 
> org.apache.flink.storm.wrappers.StormTuple.getInteger(StormTuple.java:152)
>   at 
> org.apache.flink.storm.split.operators.VerifyAndEnrichBolt.execute(VerifyAndEnrichBolt.java:52)
>   at 
> org.apache.flink.storm.wrappers.BoltWrapper.processElement(BoltWrapper.java:313)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:168)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-2338) Shut down "Storm Topologies" cleanly

2016-03-31 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax closed FLINK-2338.
--
Resolution: Fixed

> Shut down "Storm Topologies" cleanly
> 
>
> Key: FLINK-2338
> URL: https://issues.apache.org/jira/browse/FLINK-2338
> Project: Flink
>  Issue Type: Improvement
>  Components: Storm Compatibility
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, it is not possible to stop a Flink streaming program in a clean 
> way. Thus, emulating Storm's "kill" command is done the "hard way" resulting 
> in the following exception shown in the log:
> org.apache.flink.runtime.client.JobExecutionException: Communication with 
> JobManager failed: Lost connection to JobManager akka://flink/user/jobmanager
> at 
> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:169)
> at 
> org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:205)
> at 
> org.apache.flink.runtime.minicluster.FlinkMiniCluster.submitJobAndWait(FlinkMiniCluster.scala:195)
> at 
> org.apache.flink.streaming.util.TestStreamEnvironment$1.run(TestStreamEnvironment.java:116)
> Caused by: java.lang.Exception: Lost connection to JobManager 
> akka://flink/user/jobmanager
> at 
> org.apache.flink.runtime.client.JobClientActor.onReceive(JobClientActor.java:131)
> at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at 
> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
> at akka.actor.ActorCell.invoke(ActorCell.scala:486)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> The exception is expected currently. However, a clean "kill" is preferable. 
> This can done after the new STOP signal is available 
> (https://issues.apache.org/jira/browse/FLINK-2111).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220550#comment-15220550
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/1813#issuecomment-204094107
  
One additional overall comment: please use consistent JavaDoc formatting 
with respect to whitespaced and emtpy lines (sometime you have empty lines 
before parameter list; somtimes not) and markup (eg, `` vs `` -- you use 
bot). Try to be consistent; I did not comment on every single inconsistency...

Also, write all JavaDoc as complete sentences and add full stops at the 
end. 


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-31 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/1813#issuecomment-204094107
  
One additional overall comment: please use consistent JavaDoc formatting 
with respect to whitespaced and emtpy lines (sometime you have empty lines 
before parameter list; somtimes not) and markup (eg, `` vs `` -- you use 
bot). Try to be consistent; I did not comment on every single inconsistency...

Also, write all JavaDoc as complete sentences and add full stops at the 
end. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220544#comment-15220544
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58114600
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import org.apache.flink.streaming.connectors.redis.RedisSinkTest;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class RedisDataTypeDescriptionTest {
--- End diff --

add `extends TestLogger`


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-31 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58114600
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescriptionTest.java
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import org.apache.flink.streaming.connectors.redis.RedisSinkTest;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class RedisDataTypeDescriptionTest {
--- End diff --

add `extends TestLogger`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-31 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58114545
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class JedisSentinelConfigTest {
--- End diff --

add `extends TestLogger`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220542#comment-15220542
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58114513
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java
 ---
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.junit.Test;
+
+public class JedisPoolConfigTest {
--- End diff --

add `extends TestLogger`


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220541#comment-15220541
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58114500
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
 ---
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.junit.Test;
+
+public class JedisClusterConfigTest {
--- End diff --

add `extends TestLogger`


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220543#comment-15220543
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58114545
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfigTest.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class JedisSentinelConfigTest {
--- End diff --

add `extends TestLogger`


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-31 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58114513
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfigTest.java
 ---
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.junit.Test;
+
+public class JedisPoolConfigTest {
--- End diff --

add `extends TestLogger`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-31 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58114500
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfigTest.java
 ---
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import org.junit.Test;
+
+public class JedisClusterConfigTest {
--- End diff --

add `extends TestLogger`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220537#comment-15220537
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58113913
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java
 ---
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisSentinelPool;
+import redis.embedded.RedisCluster;
+import redis.embedded.util.JedisUtil;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.flink.util.NetUtils.getAvailablePort;
+
+public class RedisSentinelClusterTest {
+
+   private static RedisCluster cluster;
+   private static final String REDIS_MASTER = "master";
+   private static final List sentinels = 
Arrays.asList(getAvailablePort(), getAvailablePort());
+   private static final List group1 = 
Arrays.asList(getAvailablePort(), getAvailablePort());
+
+   private JedisSentinelPool jedisSentinelPool;
+   private JedisSentinelConfig jedisSentinelConfig;
+
+   @BeforeClass
+   public static void setUpCluster(){
+   cluster = 
RedisCluster.builder().sentinelPorts(sentinels).quorumSize(1)
+   .serverPorts(group1).replicationGroup(REDIS_MASTER, 1)
+   .build();
+   cluster.start();
+   }
+
+   @Before
+   public void setUp() {
+   Set hosts = JedisUtil.sentinelHosts(cluster);
+   jedisSentinelConfig = new 
JedisSentinelConfig.Builder().setMasterName(REDIS_MASTER)
+   .setSentinels(hosts).build();
+   jedisSentinelPool = new 
JedisSentinelPool(jedisSentinelConfig.getMasterName(),
+   jedisSentinelConfig.getSentinels());
+   }
+
+   @Test
+   public void testRedisSentinelOperation() {
+   RedisCommandsContainer redisContainer = 
RedisCommandsContainerBuilder.build(jedisSentinelConfig);
+   Jedis jedis = null;
+   try{
+   jedis = jedisSentinelPool.getResource();
+   redisContainer.set("testKey", "testValue");
--- End diff --

use variables for testKey and testValue to avoid typos (in case test is 
extended later on)


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-31 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58113913
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java
 ---
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisSentinelPool;
+import redis.embedded.RedisCluster;
+import redis.embedded.util.JedisUtil;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.flink.util.NetUtils.getAvailablePort;
+
+public class RedisSentinelClusterTest {
+
+   private static RedisCluster cluster;
+   private static final String REDIS_MASTER = "master";
+   private static final List sentinels = 
Arrays.asList(getAvailablePort(), getAvailablePort());
+   private static final List group1 = 
Arrays.asList(getAvailablePort(), getAvailablePort());
+
+   private JedisSentinelPool jedisSentinelPool;
+   private JedisSentinelConfig jedisSentinelConfig;
+
+   @BeforeClass
+   public static void setUpCluster(){
+   cluster = 
RedisCluster.builder().sentinelPorts(sentinels).quorumSize(1)
+   .serverPorts(group1).replicationGroup(REDIS_MASTER, 1)
+   .build();
+   cluster.start();
+   }
+
+   @Before
+   public void setUp() {
+   Set hosts = JedisUtil.sentinelHosts(cluster);
+   jedisSentinelConfig = new 
JedisSentinelConfig.Builder().setMasterName(REDIS_MASTER)
+   .setSentinels(hosts).build();
+   jedisSentinelPool = new 
JedisSentinelPool(jedisSentinelConfig.getMasterName(),
+   jedisSentinelConfig.getSentinels());
+   }
+
+   @Test
+   public void testRedisSentinelOperation() {
+   RedisCommandsContainer redisContainer = 
RedisCommandsContainerBuilder.build(jedisSentinelConfig);
+   Jedis jedis = null;
+   try{
+   jedis = jedisSentinelPool.getResource();
+   redisContainer.set("testKey", "testValue");
--- End diff --

use variables for testKey and testValue to avoid typos (in case test is 
extended later on)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220535#comment-15220535
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58113574
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java
 ---
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisSentinelPool;
+import redis.embedded.RedisCluster;
+import redis.embedded.util.JedisUtil;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.flink.util.NetUtils.getAvailablePort;
+
+public class RedisSentinelClusterTest {
--- End diff --

add `extends TestLogger`


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-31 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58113574
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSentinelClusterTest.java
 ---
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisSentinelPool;
+import redis.embedded.RedisCluster;
+import redis.embedded.util.JedisUtil;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.flink.util.NetUtils.getAvailablePort;
+
+public class RedisSentinelClusterTest {
--- End diff --

add `extends TestLogger`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220532#comment-15220532
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58113357
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescription.java
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+public class RedisDataTypeDescription implements Function, Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private RedisDataType dataType;
+   private String additionalKey;
+
+   /**
+* Use this constructor when data type is HASH or SORTED_SET
+* @param dataType the redis data type {@link RedisDataType}
+* @param additionalKey additional key for Hash and Sorted set data type
+ */
+   public RedisDataTypeDescription(RedisDataType dataType, String 
additionalKey) {
+   this.dataType = dataType;
+   this.additionalKey = additionalKey;
+
+   if (dataType == RedisDataType.HASH || dataType == 
RedisDataType.SORTED_SET) {
+   if (additionalKey == null) {
+   throw new IllegalArgumentException("Hash and 
Sorted Set should have additional key");
+   }
+   }
+   }
+
+   /**
+* Use this constructor when data type is not HASH or SORTED_SET
+* @param dataType the redis data type {@link RedisDataType}
+ */
+   public RedisDataTypeDescription(RedisDataType dataType) {
+   this(dataType, null);
+   }
+
+   public RedisDataType getDataType() {
--- End diff --

JavaDoc


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-31 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58113357
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescription.java
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+public class RedisDataTypeDescription implements Function, Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private RedisDataType dataType;
+   private String additionalKey;
+
+   /**
+* Use this constructor when data type is HASH or SORTED_SET
+* @param dataType the redis data type {@link RedisDataType}
+* @param additionalKey additional key for Hash and Sorted set data type
+ */
+   public RedisDataTypeDescription(RedisDataType dataType, String 
additionalKey) {
+   this.dataType = dataType;
+   this.additionalKey = additionalKey;
+
+   if (dataType == RedisDataType.HASH || dataType == 
RedisDataType.SORTED_SET) {
+   if (additionalKey == null) {
+   throw new IllegalArgumentException("Hash and 
Sorted Set should have additional key");
+   }
+   }
+   }
+
+   /**
+* Use this constructor when data type is not HASH or SORTED_SET
+* @param dataType the redis data type {@link RedisDataType}
+ */
+   public RedisDataTypeDescription(RedisDataType dataType) {
+   this(dataType, null);
+   }
+
+   public RedisDataType getDataType() {
--- End diff --

JavaDoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220533#comment-15220533
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58113371
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescription.java
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+public class RedisDataTypeDescription implements Function, Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private RedisDataType dataType;
+   private String additionalKey;
+
+   /**
+* Use this constructor when data type is HASH or SORTED_SET
+* @param dataType the redis data type {@link RedisDataType}
+* @param additionalKey additional key for Hash and Sorted set data type
+ */
+   public RedisDataTypeDescription(RedisDataType dataType, String 
additionalKey) {
+   this.dataType = dataType;
+   this.additionalKey = additionalKey;
+
+   if (dataType == RedisDataType.HASH || dataType == 
RedisDataType.SORTED_SET) {
+   if (additionalKey == null) {
+   throw new IllegalArgumentException("Hash and 
Sorted Set should have additional key");
+   }
+   }
+   }
+
+   /**
+* Use this constructor when data type is not HASH or SORTED_SET
+* @param dataType the redis data type {@link RedisDataType}
+ */
+   public RedisDataTypeDescription(RedisDataType dataType) {
+   this(dataType, null);
+   }
+
+   public RedisDataType getDataType() {
+   return dataType;
+   }
+
+   public String getAdditionalKey() {
--- End diff --

JavaDoc


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220530#comment-15220530
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58113261
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescription.java
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+public class RedisDataTypeDescription implements Function, Serializable {
--- End diff --

Why `Function`?


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220528#comment-15220528
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58113197
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
 ---
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+public enum RedisDataType {
--- End diff --

JavaDoc


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220527#comment-15220527
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58113161
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
 ---
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public class RedisContainer implements RedisCommandsContainer, Closeable{
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisContainer.class);
+
+   private JedisPool jedisPool;
+   private JedisSentinelPool jedisSentinelPool;
+
+
+   /**
+* Constructor
+* @param jedisPool JedisPool which actually manages Jedis instances
+*/
+   public RedisContainer(JedisPool jedisPool) {
+   this.jedisPool = jedisPool;
+   }
+
+   public RedisContainer(JedisSentinelPool sentinelPool){
+   this.jedisSentinelPool = sentinelPool;
+   }
+
+
+
+   @Override
+   public void close() throws IOException {
+   this.jedisPool.close();
+   }
+
+   /**
+* Sets field in the hash stored at key to value.
+* If key does not exist, a new key holding a hash is created.
+* If field already exists in the hash, it is overwritten.
+*
+* @param hashName   Hash name
+* @param key Hash field name
+* @param value Hash value
+*/
+   @Override
+   public void hset(String hashName, String key, String value) {
+   Jedis jedis = null;
+   try{
+   jedis = getInstance();
+   jedis.hset(hashName, key, value);
+   }catch (Exception e){
+   if (LOG.isErrorEnabled()) {
+   LOG.error("Cannot send Redis message with 
command HSET to key {} and field {} error message {}",
+   key, key, e.getMessage());
+   }
+   }finally {
+   returnInstance(jedis);
+   }
+   }
+
+   /**
+* Insert all the specified values at the tail of the list stored at 
key.
+* If key does not exist, it is created as empty list before performing 
the push operation.
+*
+* @param listName Name of the List
+* @param valueValue to be added
+*/
+   @Override
+   public void rpush(String listName, String value) {
+   Jedis jedis = null;
+   try{
+   jedis = getInstance();
+   jedis.rpush(listName, value);
+   }catch (Exception e){
+   if (LOG.isErrorEnabled()) {
+   LOG.error("Cannot send Redis message with 
command RPUSH to list {} error message {}",
+   listName, e.getMessage());
+   }
+   }finally {
+   returnInstance(jedis);
+   }
+   }
+
+   /**
+* Add the specified members to the set stored at key.
+* Specified members that are already a member of this set are ignored.
+* If key does not exist, a new set is created before adding the 
specified members.
+*
+* @param setName Name of the Set
+* @param value   Value to be added
+*/
+   @Override
+   

[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220529#comment-15220529
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58113229
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescription.java
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+public class RedisDataTypeDescription implements Function, Serializable {
--- End diff --

JavaDoc


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-31 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58113261
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescription.java
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+public class RedisDataTypeDescription implements Function, Serializable {
--- End diff --

Why `Function`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-31 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58113229
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataTypeDescription.java
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+public class RedisDataTypeDescription implements Function, Serializable {
--- End diff --

JavaDoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-31 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58113197
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisDataType.java
 ---
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.mapper;
+
+public enum RedisDataType {
--- End diff --

JavaDoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220522#comment-15220522
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58112951
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
 ---
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public class RedisContainer implements RedisCommandsContainer, Closeable{
--- End diff --

JavaDoc


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-31 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58113161
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
 ---
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public class RedisContainer implements RedisCommandsContainer, Closeable{
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisContainer.class);
+
+   private JedisPool jedisPool;
+   private JedisSentinelPool jedisSentinelPool;
+
+
+   /**
+* Constructor
+* @param jedisPool JedisPool which actually manages Jedis instances
+*/
+   public RedisContainer(JedisPool jedisPool) {
+   this.jedisPool = jedisPool;
+   }
+
+   public RedisContainer(JedisSentinelPool sentinelPool){
+   this.jedisSentinelPool = sentinelPool;
+   }
+
+
+
+   @Override
+   public void close() throws IOException {
+   this.jedisPool.close();
+   }
+
+   /**
+* Sets field in the hash stored at key to value.
+* If key does not exist, a new key holding a hash is created.
+* If field already exists in the hash, it is overwritten.
+*
+* @param hashName   Hash name
+* @param key Hash field name
+* @param value Hash value
+*/
+   @Override
+   public void hset(String hashName, String key, String value) {
+   Jedis jedis = null;
+   try{
+   jedis = getInstance();
+   jedis.hset(hashName, key, value);
+   }catch (Exception e){
+   if (LOG.isErrorEnabled()) {
+   LOG.error("Cannot send Redis message with 
command HSET to key {} and field {} error message {}",
+   key, key, e.getMessage());
+   }
+   }finally {
+   returnInstance(jedis);
+   }
+   }
+
+   /**
+* Insert all the specified values at the tail of the list stored at 
key.
+* If key does not exist, it is created as empty list before performing 
the push operation.
+*
+* @param listName Name of the List
+* @param valueValue to be added
+*/
+   @Override
+   public void rpush(String listName, String value) {
+   Jedis jedis = null;
+   try{
+   jedis = getInstance();
+   jedis.rpush(listName, value);
+   }catch (Exception e){
+   if (LOG.isErrorEnabled()) {
+   LOG.error("Cannot send Redis message with 
command RPUSH to list {} error message {}",
+   listName, e.getMessage());
+   }
+   }finally {
+   returnInstance(jedis);
+   }
+   }
+
+   /**
+* Add the specified members to the set stored at key.
+* Specified members that are already a member of this set are ignored.
+* If key does not exist, a new set is created before adding the 
specified members.
+*
+* @param setName Name of the Set
+* @param value   Value to be added
+*/
+   @Override
+   public void sadd(String setName, String value) {
+   Jedis jedis = null;
+   try{
+   jedis = getInstance();
+   jedis.sadd(setName, value);
+   }catch (Exception e){

[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-31 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58112951
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java
 ---
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public class RedisContainer implements RedisCommandsContainer, Closeable{
--- End diff --

JavaDoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-31 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58112927
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+public class RedisCommandsContainerBuilder {
+
+   /**
+* Builds container for single Redis environment.
+* @param jedisPoolConfig configuration for JedisPool
+* @return container for single Redis environment
+*/
+   public static RedisCommandsContainer build(JedisPoolConfig 
jedisPoolConfig) {
+   GenericObjectPoolConfig genericObjectPoolConfig = new 
GenericObjectPoolConfig();
+   
genericObjectPoolConfig.setMaxIdle(jedisPoolConfig.getMaxIdle());
+   
genericObjectPoolConfig.setMaxTotal(jedisPoolConfig.getMaxTotal());
+   
genericObjectPoolConfig.setMinIdle(jedisPoolConfig.getMinIdle());
+
+   JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, 
jedisPoolConfig.getHost(),
+   jedisPoolConfig.getPort(), 
jedisPoolConfig.getTimeout(), jedisPoolConfig.getPassword(),
+   jedisPoolConfig.getDatabase());
+   return new RedisContainer(jedisPool);
+   }
+
+   /**
+* Builds container for Redis Cluster environment.
+* @param config configuration for JedisCluster
+* @return container for Redis Cluster environment
+*/
+   public static RedisCommandsContainer build(JedisClusterConfig config) {
+   GenericObjectPoolConfig genericObjectPoolConfig = new 
GenericObjectPoolConfig();
+   genericObjectPoolConfig.setMaxIdle(config.getMaxIdle());
+   genericObjectPoolConfig.setMaxTotal(config.getMaxTotal());
+   genericObjectPoolConfig.setMinIdle(config.getMinIdle());
+
+   JedisCluster jedisCluster = new JedisCluster(config.getNodes(), 
config.getTimeout(),
+   config.getMaxRedirections(), genericObjectPoolConfig);
+   return new RedisClusterContainer(jedisCluster);
+   }
+
+   /**
+* Builds container for Redis Sentinel environment.
+* @param config configuration for JedisSentinel
+* @return container for Redis sentinel environment
+ */
+   public static RedisCommandsContainer build(JedisSentinelConfig config) {
--- End diff --

Check for `null`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220521#comment-15220521
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58112927
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+public class RedisCommandsContainerBuilder {
+
+   /**
+* Builds container for single Redis environment.
+* @param jedisPoolConfig configuration for JedisPool
+* @return container for single Redis environment
+*/
+   public static RedisCommandsContainer build(JedisPoolConfig 
jedisPoolConfig) {
+   GenericObjectPoolConfig genericObjectPoolConfig = new 
GenericObjectPoolConfig();
+   
genericObjectPoolConfig.setMaxIdle(jedisPoolConfig.getMaxIdle());
+   
genericObjectPoolConfig.setMaxTotal(jedisPoolConfig.getMaxTotal());
+   
genericObjectPoolConfig.setMinIdle(jedisPoolConfig.getMinIdle());
+
+   JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, 
jedisPoolConfig.getHost(),
+   jedisPoolConfig.getPort(), 
jedisPoolConfig.getTimeout(), jedisPoolConfig.getPassword(),
+   jedisPoolConfig.getDatabase());
+   return new RedisContainer(jedisPool);
+   }
+
+   /**
+* Builds container for Redis Cluster environment.
+* @param config configuration for JedisCluster
+* @return container for Redis Cluster environment
+*/
+   public static RedisCommandsContainer build(JedisClusterConfig config) {
+   GenericObjectPoolConfig genericObjectPoolConfig = new 
GenericObjectPoolConfig();
+   genericObjectPoolConfig.setMaxIdle(config.getMaxIdle());
+   genericObjectPoolConfig.setMaxTotal(config.getMaxTotal());
+   genericObjectPoolConfig.setMinIdle(config.getMinIdle());
+
+   JedisCluster jedisCluster = new JedisCluster(config.getNodes(), 
config.getTimeout(),
+   config.getMaxRedirections(), genericObjectPoolConfig);
+   return new RedisClusterContainer(jedisCluster);
+   }
+
+   /**
+* Builds container for Redis Sentinel environment.
+* @param config configuration for JedisSentinel
+* @return container for Redis sentinel environment
+ */
+   public static RedisCommandsContainer build(JedisSentinelConfig config) {
--- End diff --

Check for `null`


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220518#comment-15220518
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58112760
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+public class RedisCommandsContainerBuilder {
+
+   /**
+* Builds container for single Redis environment.
+* @param jedisPoolConfig configuration for JedisPool
+* @return container for single Redis environment
+*/
+   public static RedisCommandsContainer build(JedisPoolConfig 
jedisPoolConfig) {
+   GenericObjectPoolConfig genericObjectPoolConfig = new 
GenericObjectPoolConfig();
--- End diff --

check `jedisPoolConfig` for `null`


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220520#comment-15220520
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58112842
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+public class RedisCommandsContainerBuilder {
+
+   /**
+* Builds container for single Redis environment.
+* @param jedisPoolConfig configuration for JedisPool
+* @return container for single Redis environment
+*/
+   public static RedisCommandsContainer build(JedisPoolConfig 
jedisPoolConfig) {
+   GenericObjectPoolConfig genericObjectPoolConfig = new 
GenericObjectPoolConfig();
+   
genericObjectPoolConfig.setMaxIdle(jedisPoolConfig.getMaxIdle());
+   
genericObjectPoolConfig.setMaxTotal(jedisPoolConfig.getMaxTotal());
+   
genericObjectPoolConfig.setMinIdle(jedisPoolConfig.getMinIdle());
+
+   JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, 
jedisPoolConfig.getHost(),
+   jedisPoolConfig.getPort(), 
jedisPoolConfig.getTimeout(), jedisPoolConfig.getPassword(),
+   jedisPoolConfig.getDatabase());
+   return new RedisContainer(jedisPool);
+   }
+
+   /**
+* Builds container for Redis Cluster environment.
+* @param config configuration for JedisCluster
+* @return container for Redis Cluster environment
+*/
+   public static RedisCommandsContainer build(JedisClusterConfig config) {
--- End diff --

check for `null`


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-31 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58112842
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+public class RedisCommandsContainerBuilder {
+
+   /**
+* Builds container for single Redis environment.
+* @param jedisPoolConfig configuration for JedisPool
+* @return container for single Redis environment
+*/
+   public static RedisCommandsContainer build(JedisPoolConfig 
jedisPoolConfig) {
+   GenericObjectPoolConfig genericObjectPoolConfig = new 
GenericObjectPoolConfig();
+   
genericObjectPoolConfig.setMaxIdle(jedisPoolConfig.getMaxIdle());
+   
genericObjectPoolConfig.setMaxTotal(jedisPoolConfig.getMaxTotal());
+   
genericObjectPoolConfig.setMinIdle(jedisPoolConfig.getMinIdle());
+
+   JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, 
jedisPoolConfig.getHost(),
+   jedisPoolConfig.getPort(), 
jedisPoolConfig.getTimeout(), jedisPoolConfig.getPassword(),
+   jedisPoolConfig.getDatabase());
+   return new RedisContainer(jedisPool);
+   }
+
+   /**
+* Builds container for Redis Cluster environment.
+* @param config configuration for JedisCluster
+* @return container for Redis Cluster environment
+*/
+   public static RedisCommandsContainer build(JedisClusterConfig config) {
--- End diff --

check for `null`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-31 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58112760
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+public class RedisCommandsContainerBuilder {
+
+   /**
+* Builds container for single Redis environment.
+* @param jedisPoolConfig configuration for JedisPool
+* @return container for single Redis environment
+*/
+   public static RedisCommandsContainer build(JedisPoolConfig 
jedisPoolConfig) {
+   GenericObjectPoolConfig genericObjectPoolConfig = new 
GenericObjectPoolConfig();
--- End diff --

check `jedisPoolConfig` for `null`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220517#comment-15220517
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58112656
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+public class RedisCommandsContainerBuilder {
--- End diff --

JavaDoc


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220515#comment-15220515
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58112601
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+public interface RedisCommandsContainer extends  Serializable {
--- End diff --

JavaDoc missing


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-31 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58112601
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+public interface RedisCommandsContainer extends  Serializable {
--- End diff --

JavaDoc missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-31 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58112656
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisSentinelPool;
+
+public class RedisCommandsContainerBuilder {
--- End diff --

JavaDoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-31 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58112455
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
 ---
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public class RedisClusterContainer implements RedisCommandsContainer, 
Closeable {
--- End diff --

JavaDoc missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220513#comment-15220513
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58112455
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java
 ---
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public class RedisClusterContainer implements RedisCommandsContainer, 
Closeable {
--- End diff --

JavaDoc missing


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-31 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58112296
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfig.java
 ---
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.api.common.functions.Function;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+import java.util.Set;
+
+/**
+ * Configuration for JedisSentinelPool.
+ */
+public class JedisSentinelConfig implements Function, Serializable {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(JedisSentinelConfig.class);
+
+   private String masterName;
+   private Set sentinels;
+   private int connectionTimeout;
+   private int soTimeout;
+   private String password;
+   private int database;
+   private int maxTotal;
+   private int maxIdle;
+   private int minIdle;
+
+   /**
+*
--- End diff --

Extend JavaDoc with at least one sentence.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220512#comment-15220512
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58112296
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfig.java
 ---
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.api.common.functions.Function;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+import java.util.Set;
+
+/**
+ * Configuration for JedisSentinelPool.
+ */
+public class JedisSentinelConfig implements Function, Serializable {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(JedisSentinelConfig.class);
+
+   private String masterName;
+   private Set sentinels;
+   private int connectionTimeout;
+   private int soTimeout;
+   private String password;
+   private int database;
+   private int maxTotal;
+   private int maxIdle;
+   private int minIdle;
+
+   /**
+*
--- End diff --

Extend JavaDoc with at least one sentence.


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220510#comment-15220510
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58112229
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfig.java
 ---
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.api.common.functions.Function;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+import java.util.Set;
+
+/**
+ * Configuration for JedisSentinelPool.
+ */
+public class JedisSentinelConfig implements Function, Serializable {
--- End diff --

`Function`?


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220509#comment-15220509
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58112099
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfig.java
 ---
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.api.common.functions.Function;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+
+/**
+ * Configuration for JedisPool.
+ */
+public class JedisPoolConfig implements Function, Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private String host;
+   private int port;
+   private int timeout;
+   private int database;
+   private String password;
+   private int maxTotal;
+   private int maxIdle;
+   private int minIdle;
+
+   /**
+* Constructor
--- End diff --

yup... can be omitted.


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-31 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58112229
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisSentinelConfig.java
 ---
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.api.common.functions.Function;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+import java.util.Set;
+
+/**
+ * Configuration for JedisSentinelPool.
+ */
+public class JedisSentinelConfig implements Function, Serializable {
--- End diff --

`Function`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-31 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58112099
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfig.java
 ---
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.api.common.functions.Function;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+
+/**
+ * Configuration for JedisPool.
+ */
+public class JedisPoolConfig implements Function, Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private String host;
+   private int port;
+   private int timeout;
+   private int database;
+   private String password;
+   private int maxTotal;
+   private int maxIdle;
+   private int minIdle;
+
+   /**
+* Constructor
--- End diff --

yup... can be omitted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220506#comment-15220506
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58111937
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfig.java
 ---
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.api.common.functions.Function;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+
+/**
+ * Configuration for JedisPool.
+ */
+public class JedisPoolConfig implements Function, Serializable {
--- End diff --

Why `Function`?


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-31 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58112031
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfig.java
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.api.common.functions.Function;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Configuration for JedisCluster.
+ */
+public class JedisClusterConfig implements Function, Serializable {
+   private static final long serialVersionUID = 1L;
+
+
+   private Set nodes;
+   private int timeout;
+   private int maxRedirections;
+   private int maxTotal;
+   private int maxIdle;
+   private int minIdle;
+
+   /**
+*
+* Constructor
--- End diff --

Obviously... ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220507#comment-15220507
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58112031
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfig.java
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.api.common.functions.Function;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Configuration for JedisCluster.
+ */
+public class JedisClusterConfig implements Function, Serializable {
+   private static final long serialVersionUID = 1L;
+
+
+   private Set nodes;
+   private int timeout;
+   private int maxRedirections;
+   private int maxTotal;
+   private int maxIdle;
+   private int minIdle;
+
+   /**
+*
+* Constructor
--- End diff --

Obviously... ;)


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-31 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58111937
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisPoolConfig.java
 ---
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.api.common.functions.Function;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+
+/**
+ * Configuration for JedisPool.
+ */
+public class JedisPoolConfig implements Function, Serializable {
--- End diff --

Why `Function`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-31 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58111680
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfig.java
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.api.common.functions.Function;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Configuration for JedisCluster.
+ */
+public class JedisClusterConfig implements Function, Serializable {
--- End diff --

Why does it implement `Function`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220502#comment-15220502
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58111680
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/JedisClusterConfig.java
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.config;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.api.common.functions.Function;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Protocol;
+
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Configuration for JedisCluster.
+ */
+public class JedisClusterConfig implements Function, Serializable {
--- End diff --

Why does it implement `Function`?


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220501#comment-15220501
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58111485
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import redis.clients.jedis.exceptions.JedisException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ *
+ *  
+ * When creating the sink using first constructor {@link 
#RedisSink(JedisPoolConfig, RedisMapper)}
+ * the sink will create connection using {@link 
redis.clients.jedis.JedisPool}.
+ *
+ * When using second constructor {@link #RedisSink(JedisSentinelConfig, 
RedisMapper)} the sink will create connection
+ * using {@link redis.clients.jedis.JedisSentinelPool} to redis cluster. 
Use this if redis is
+ * configured using sentinels else use the third constructor {@link 
#RedisSink(JedisClusterConfig, RedisMapper)}
+ * which use {@link redis.clients.jedis.JedisCluster} to connect to redis 
cluster.
+ *
+ * 
+ * Example:
+ *
+ * {@code
+ *
+ * public static class RedisExampleDataMapper implements 
RedisMapper>{
+ * @Override
+ * public RedisDataTypeDescription 
getDataTypeDescription() {
+ * return new RedisDataTypeDescription(dataType, 
REDIS_ADDITIONAL_KEY);
+ * }
+ * @Override
+ * public String getKeyFromData(Tuple2 data) {
+ * return String.valueOf(data.f0);
+ * }
+ *
+ * @Override
+ * public String getValueFromData(Tuple2 data) {
+ * return String.valueOf(data.f1);
+ * }
+ * }
+ * JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
+ * .setHost(REDIS_HOST)
+ * .setPort(REDIS_PORT).build();
+ *  new RedisSink(jedisPoolConfig, new 
RedisExampleDataMapper());
+ * }
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class RedisSink extends RichSinkFunction{
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisSink.class);
+
+   private String additionalKey;
+   private RedisMapper redisSinkMapper;
+   private RedisDataType redisDataType;
+
+   private JedisPoolConfig jedisPoolConfig;
+   private JedisSentinelConfig jedisSentinelConfig;
+   private JedisClusterConfig jedisClusterConfig;
+
+   private 

[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-31 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58111485
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import redis.clients.jedis.exceptions.JedisException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ *
+ *  
+ * When creating the sink using first constructor {@link 
#RedisSink(JedisPoolConfig, RedisMapper)}
+ * the sink will create connection using {@link 
redis.clients.jedis.JedisPool}.
+ *
+ * When using second constructor {@link #RedisSink(JedisSentinelConfig, 
RedisMapper)} the sink will create connection
+ * using {@link redis.clients.jedis.JedisSentinelPool} to redis cluster. 
Use this if redis is
+ * configured using sentinels else use the third constructor {@link 
#RedisSink(JedisClusterConfig, RedisMapper)}
+ * which use {@link redis.clients.jedis.JedisCluster} to connect to redis 
cluster.
+ *
+ * 
+ * Example:
+ *
+ * {@code
+ *
+ * public static class RedisExampleDataMapper implements 
RedisMapper>{
+ * @Override
+ * public RedisDataTypeDescription 
getDataTypeDescription() {
+ * return new RedisDataTypeDescription(dataType, 
REDIS_ADDITIONAL_KEY);
+ * }
+ * @Override
+ * public String getKeyFromData(Tuple2 data) {
+ * return String.valueOf(data.f0);
+ * }
+ *
+ * @Override
+ * public String getValueFromData(Tuple2 data) {
+ * return String.valueOf(data.f1);
+ * }
+ * }
+ * JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
+ * .setHost(REDIS_HOST)
+ * .setPort(REDIS_PORT).build();
+ *  new RedisSink(jedisPoolConfig, new 
RedisExampleDataMapper());
+ * }
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class RedisSink extends RichSinkFunction{
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisSink.class);
+
+   private String additionalKey;
+   private RedisMapper redisSinkMapper;
+   private RedisDataType redisDataType;
+
+   private JedisPoolConfig jedisPoolConfig;
+   private JedisSentinelConfig jedisSentinelConfig;
+   private JedisClusterConfig jedisClusterConfig;
+
+   private RedisCommandsContainer redisCommandsContainer;
+
+   /**
+*  Creates a new RedisSink that connects to the Redis Server
+*
+* @param jedisPoolConfig The configuration of {@link JedisPoolConfig}
+* @param 

[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220500#comment-15220500
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58111329
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import redis.clients.jedis.exceptions.JedisException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ *
+ *  
+ * When creating the sink using first constructor {@link 
#RedisSink(JedisPoolConfig, RedisMapper)}
+ * the sink will create connection using {@link 
redis.clients.jedis.JedisPool}.
+ *
+ * When using second constructor {@link #RedisSink(JedisSentinelConfig, 
RedisMapper)} the sink will create connection
+ * using {@link redis.clients.jedis.JedisSentinelPool} to redis cluster. 
Use this if redis is
+ * configured using sentinels else use the third constructor {@link 
#RedisSink(JedisClusterConfig, RedisMapper)}
+ * which use {@link redis.clients.jedis.JedisCluster} to connect to redis 
cluster.
+ *
+ * 
+ * Example:
+ *
+ * {@code
+ *
+ * public static class RedisExampleDataMapper implements 
RedisMapper>{
+ * @Override
+ * public RedisDataTypeDescription 
getDataTypeDescription() {
+ * return new RedisDataTypeDescription(dataType, 
REDIS_ADDITIONAL_KEY);
+ * }
+ * @Override
+ * public String getKeyFromData(Tuple2 data) {
+ * return String.valueOf(data.f0);
+ * }
+ *
+ * @Override
+ * public String getValueFromData(Tuple2 data) {
+ * return String.valueOf(data.f1);
+ * }
+ * }
+ * JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
+ * .setHost(REDIS_HOST)
+ * .setPort(REDIS_PORT).build();
+ *  new RedisSink(jedisPoolConfig, new 
RedisExampleDataMapper());
+ * }
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class RedisSink extends RichSinkFunction{
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisSink.class);
+
+   private String additionalKey;
+   private RedisMapper redisSinkMapper;
+   private RedisDataType redisDataType;
+
+   private JedisPoolConfig jedisPoolConfig;
+   private JedisSentinelConfig jedisSentinelConfig;
+   private JedisClusterConfig jedisClusterConfig;
+
+   private 

[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-31 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58111329
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import redis.clients.jedis.exceptions.JedisException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ *
+ *  
+ * When creating the sink using first constructor {@link 
#RedisSink(JedisPoolConfig, RedisMapper)}
+ * the sink will create connection using {@link 
redis.clients.jedis.JedisPool}.
+ *
+ * When using second constructor {@link #RedisSink(JedisSentinelConfig, 
RedisMapper)} the sink will create connection
+ * using {@link redis.clients.jedis.JedisSentinelPool} to redis cluster. 
Use this if redis is
+ * configured using sentinels else use the third constructor {@link 
#RedisSink(JedisClusterConfig, RedisMapper)}
+ * which use {@link redis.clients.jedis.JedisCluster} to connect to redis 
cluster.
+ *
+ * 
+ * Example:
+ *
+ * {@code
+ *
+ * public static class RedisExampleDataMapper implements 
RedisMapper>{
+ * @Override
+ * public RedisDataTypeDescription 
getDataTypeDescription() {
+ * return new RedisDataTypeDescription(dataType, 
REDIS_ADDITIONAL_KEY);
+ * }
+ * @Override
+ * public String getKeyFromData(Tuple2 data) {
+ * return String.valueOf(data.f0);
+ * }
+ *
+ * @Override
+ * public String getValueFromData(Tuple2 data) {
+ * return String.valueOf(data.f1);
+ * }
+ * }
+ * JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
+ * .setHost(REDIS_HOST)
+ * .setPort(REDIS_PORT).build();
+ *  new RedisSink(jedisPoolConfig, new 
RedisExampleDataMapper());
+ * }
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class RedisSink extends RichSinkFunction{
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisSink.class);
+
+   private String additionalKey;
+   private RedisMapper redisSinkMapper;
+   private RedisDataType redisDataType;
+
+   private JedisPoolConfig jedisPoolConfig;
+   private JedisSentinelConfig jedisSentinelConfig;
+   private JedisClusterConfig jedisClusterConfig;
+
+   private RedisCommandsContainer redisCommandsContainer;
+
+   /**
+*  Creates a new RedisSink that connects to the Redis Server
+*
+* @param jedisPoolConfig The configuration of {@link JedisPoolConfig}
+* @param 

[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220494#comment-15220494
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58110997
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import redis.clients.jedis.exceptions.JedisException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ *
+ *  
+ * When creating the sink using first constructor {@link 
#RedisSink(JedisPoolConfig, RedisMapper)}
+ * the sink will create connection using {@link 
redis.clients.jedis.JedisPool}.
+ *
+ * When using second constructor {@link #RedisSink(JedisSentinelConfig, 
RedisMapper)} the sink will create connection
+ * using {@link redis.clients.jedis.JedisSentinelPool} to redis cluster. 
Use this if redis is
+ * configured using sentinels else use the third constructor {@link 
#RedisSink(JedisClusterConfig, RedisMapper)}
+ * which use {@link redis.clients.jedis.JedisCluster} to connect to redis 
cluster.
+ *
+ * 
+ * Example:
+ *
+ * {@code
+ *
+ * public static class RedisExampleDataMapper implements 
RedisMapper>{
+ * @Override
+ * public RedisDataTypeDescription 
getDataTypeDescription() {
+ * return new RedisDataTypeDescription(dataType, 
REDIS_ADDITIONAL_KEY);
+ * }
+ * @Override
+ * public String getKeyFromData(Tuple2 data) {
+ * return String.valueOf(data.f0);
+ * }
+ *
+ * @Override
+ * public String getValueFromData(Tuple2 data) {
+ * return String.valueOf(data.f1);
+ * }
+ * }
+ * JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
+ * .setHost(REDIS_HOST)
+ * .setPort(REDIS_PORT).build();
+ *  new RedisSink(jedisPoolConfig, new 
RedisExampleDataMapper());
+ * }
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class RedisSink extends RichSinkFunction{
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisSink.class);
+
+   private String additionalKey;
+   private RedisMapper redisSinkMapper;
+   private RedisDataType redisDataType;
+
+   private JedisPoolConfig jedisPoolConfig;
+   private JedisSentinelConfig jedisSentinelConfig;
+   private JedisClusterConfig jedisClusterConfig;
+
+   private 

[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-31 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58110997
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import redis.clients.jedis.exceptions.JedisException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ *
+ *  
+ * When creating the sink using first constructor {@link 
#RedisSink(JedisPoolConfig, RedisMapper)}
+ * the sink will create connection using {@link 
redis.clients.jedis.JedisPool}.
+ *
+ * When using second constructor {@link #RedisSink(JedisSentinelConfig, 
RedisMapper)} the sink will create connection
+ * using {@link redis.clients.jedis.JedisSentinelPool} to redis cluster. 
Use this if redis is
+ * configured using sentinels else use the third constructor {@link 
#RedisSink(JedisClusterConfig, RedisMapper)}
+ * which use {@link redis.clients.jedis.JedisCluster} to connect to redis 
cluster.
+ *
+ * 
+ * Example:
+ *
+ * {@code
+ *
+ * public static class RedisExampleDataMapper implements 
RedisMapper>{
+ * @Override
+ * public RedisDataTypeDescription 
getDataTypeDescription() {
+ * return new RedisDataTypeDescription(dataType, 
REDIS_ADDITIONAL_KEY);
+ * }
+ * @Override
+ * public String getKeyFromData(Tuple2 data) {
+ * return String.valueOf(data.f0);
+ * }
+ *
+ * @Override
+ * public String getValueFromData(Tuple2 data) {
+ * return String.valueOf(data.f1);
+ * }
+ * }
+ * JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
+ * .setHost(REDIS_HOST)
+ * .setPort(REDIS_PORT).build();
+ *  new RedisSink(jedisPoolConfig, new 
RedisExampleDataMapper());
+ * }
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class RedisSink extends RichSinkFunction{
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisSink.class);
+
+   private String additionalKey;
+   private RedisMapper redisSinkMapper;
+   private RedisDataType redisDataType;
+
+   private JedisPoolConfig jedisPoolConfig;
+   private JedisSentinelConfig jedisSentinelConfig;
+   private JedisClusterConfig jedisClusterConfig;
+
+   private RedisCommandsContainer redisCommandsContainer;
+
+   /**
+*  Creates a new RedisSink that connects to the Redis Server
+*
+* @param jedisPoolConfig The configuration of {@link JedisPoolConfig}
+* @param 

[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220489#comment-15220489
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58110429
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import redis.clients.jedis.exceptions.JedisException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ *
+ *  
+ * When creating the sink using first constructor {@link 
#RedisSink(JedisPoolConfig, RedisMapper)}
+ * the sink will create connection using {@link 
redis.clients.jedis.JedisPool}.
+ *
+ * When using second constructor {@link #RedisSink(JedisSentinelConfig, 
RedisMapper)} the sink will create connection
+ * using {@link redis.clients.jedis.JedisSentinelPool} to redis cluster. 
Use this if redis is
+ * configured using sentinels else use the third constructor {@link 
#RedisSink(JedisClusterConfig, RedisMapper)}
+ * which use {@link redis.clients.jedis.JedisCluster} to connect to redis 
cluster.
+ *
+ * 
+ * Example:
+ *
+ * {@code
+ *
+ * public static class RedisExampleDataMapper implements 
RedisMapper>{
+ * @Override
+ * public RedisDataTypeDescription 
getDataTypeDescription() {
+ * return new RedisDataTypeDescription(dataType, 
REDIS_ADDITIONAL_KEY);
+ * }
+ * @Override
+ * public String getKeyFromData(Tuple2 data) {
+ * return String.valueOf(data.f0);
+ * }
+ *
+ * @Override
+ * public String getValueFromData(Tuple2 data) {
+ * return String.valueOf(data.f1);
+ * }
+ * }
+ * JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
+ * .setHost(REDIS_HOST)
+ * .setPort(REDIS_PORT).build();
+ *  new RedisSink(jedisPoolConfig, new 
RedisExampleDataMapper());
+ * }
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class RedisSink extends RichSinkFunction{
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisSink.class);
+
+   private String additionalKey;
--- End diff --

For me as an redis newby, it is unclear was `additionalKey` might be. Might 
be good to add a comment with a short explanation/description.


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: 

[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-31 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58110429
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import redis.clients.jedis.exceptions.JedisException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ *
+ *  
+ * When creating the sink using first constructor {@link 
#RedisSink(JedisPoolConfig, RedisMapper)}
+ * the sink will create connection using {@link 
redis.clients.jedis.JedisPool}.
+ *
+ * When using second constructor {@link #RedisSink(JedisSentinelConfig, 
RedisMapper)} the sink will create connection
+ * using {@link redis.clients.jedis.JedisSentinelPool} to redis cluster. 
Use this if redis is
+ * configured using sentinels else use the third constructor {@link 
#RedisSink(JedisClusterConfig, RedisMapper)}
+ * which use {@link redis.clients.jedis.JedisCluster} to connect to redis 
cluster.
+ *
+ * 
+ * Example:
+ *
+ * {@code
+ *
+ * public static class RedisExampleDataMapper implements 
RedisMapper>{
+ * @Override
+ * public RedisDataTypeDescription 
getDataTypeDescription() {
+ * return new RedisDataTypeDescription(dataType, 
REDIS_ADDITIONAL_KEY);
+ * }
+ * @Override
+ * public String getKeyFromData(Tuple2 data) {
+ * return String.valueOf(data.f0);
+ * }
+ *
+ * @Override
+ * public String getValueFromData(Tuple2 data) {
+ * return String.valueOf(data.f1);
+ * }
+ * }
+ * JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
+ * .setHost(REDIS_HOST)
+ * .setPort(REDIS_PORT).build();
+ *  new RedisSink(jedisPoolConfig, new 
RedisExampleDataMapper());
+ * }
+ *
+ * @param  Type of the elements emitted by this sink
+ */
+public class RedisSink extends RichSinkFunction{
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RedisSink.class);
+
+   private String additionalKey;
--- End diff --

For me as an redis newby, it is unclear was `additionalKey` might be. Might 
be good to add a comment with a short explanation/description.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220484#comment-15220484
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58110110
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import redis.clients.jedis.exceptions.JedisException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ *
+ *  
+ * When creating the sink using first constructor {@link 
#RedisSink(JedisPoolConfig, RedisMapper)}
+ * the sink will create connection using {@link 
redis.clients.jedis.JedisPool}.
+ *
+ * When using second constructor {@link #RedisSink(JedisSentinelConfig, 
RedisMapper)} the sink will create connection
+ * using {@link redis.clients.jedis.JedisSentinelPool} to redis cluster. 
Use this if redis is
+ * configured using sentinels else use the third constructor {@link 
#RedisSink(JedisClusterConfig, RedisMapper)}
+ * which use {@link redis.clients.jedis.JedisCluster} to connect to redis 
cluster.
+ *
+ * 
+ * Example:
+ *
+ * {@code
+ *
+ * public static class RedisExampleDataMapper implements 
RedisMapper>{
+ * @Override
+ * public RedisDataTypeDescription 
getDataTypeDescription() {
+ * return new RedisDataTypeDescription(dataType, 
REDIS_ADDITIONAL_KEY);
+ * }
+ * @Override
+ * public String getKeyFromData(Tuple2 data) {
+ * return String.valueOf(data.f0);
+ * }
+ *
--- End diff --

remove this line or insert blank lines in between all methods for 
consistency


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-31 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58110110
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import redis.clients.jedis.exceptions.JedisException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ *
+ *  
+ * When creating the sink using first constructor {@link 
#RedisSink(JedisPoolConfig, RedisMapper)}
+ * the sink will create connection using {@link 
redis.clients.jedis.JedisPool}.
+ *
+ * When using second constructor {@link #RedisSink(JedisSentinelConfig, 
RedisMapper)} the sink will create connection
+ * using {@link redis.clients.jedis.JedisSentinelPool} to redis cluster. 
Use this if redis is
+ * configured using sentinels else use the third constructor {@link 
#RedisSink(JedisClusterConfig, RedisMapper)}
+ * which use {@link redis.clients.jedis.JedisCluster} to connect to redis 
cluster.
+ *
+ * 
+ * Example:
+ *
+ * {@code
+ *
+ * public static class RedisExampleDataMapper implements 
RedisMapper>{
+ * @Override
+ * public RedisDataTypeDescription 
getDataTypeDescription() {
+ * return new RedisDataTypeDescription(dataType, 
REDIS_ADDITIONAL_KEY);
+ * }
+ * @Override
+ * public String getKeyFromData(Tuple2 data) {
+ * return String.valueOf(data.f0);
+ * }
+ *
--- End diff --

remove this line or insert blank lines in between all methods for 
consistency


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220476#comment-15220476
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58109874
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import redis.clients.jedis.exceptions.JedisException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ *
+ *  
--- End diff --

unnecessary leading blank before 


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-31 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58109874
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import redis.clients.jedis.exceptions.JedisException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ *
+ *  
--- End diff --

unnecessary leading blank before 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220472#comment-15220472
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58109328
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import redis.clients.jedis.exceptions.JedisException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ *
+ *  
+ * When creating the sink using first constructor {@link 
#RedisSink(JedisPoolConfig, RedisMapper)}
+ * the sink will create connection using {@link 
redis.clients.jedis.JedisPool}.
+ *
+ * When using second constructor {@link #RedisSink(JedisSentinelConfig, 
RedisMapper)} the sink will create connection
+ * using {@link redis.clients.jedis.JedisSentinelPool} to redis cluster. 
Use this if redis is
+ * configured using sentinels else use the third constructor {@link 
#RedisSink(JedisClusterConfig, RedisMapper)}
+ * which use {@link redis.clients.jedis.JedisCluster} to connect to redis 
cluster.
--- End diff --

redis => Redis


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220471#comment-15220471
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58109313
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import redis.clients.jedis.exceptions.JedisException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ *
+ *  
+ * When creating the sink using first constructor {@link 
#RedisSink(JedisPoolConfig, RedisMapper)}
+ * the sink will create connection using {@link 
redis.clients.jedis.JedisPool}.
+ *
+ * When using second constructor {@link #RedisSink(JedisSentinelConfig, 
RedisMapper)} the sink will create connection
+ * using {@link redis.clients.jedis.JedisSentinelPool} to redis cluster. 
Use this if redis is
--- End diff --

redis => Redis


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Matthias J. Sax
>Assignee: Subhankar Biswas
>Priority: Minor
>
> Flink does not provide a sink connector for Redis.
> See FLINK-3033



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-31 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58109328
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import redis.clients.jedis.exceptions.JedisException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ *
+ *  
+ * When creating the sink using first constructor {@link 
#RedisSink(JedisPoolConfig, RedisMapper)}
+ * the sink will create connection using {@link 
redis.clients.jedis.JedisPool}.
+ *
+ * When using second constructor {@link #RedisSink(JedisSentinelConfig, 
RedisMapper)} the sink will create connection
+ * using {@link redis.clients.jedis.JedisSentinelPool} to redis cluster. 
Use this if redis is
+ * configured using sentinels else use the third constructor {@link 
#RedisSink(JedisClusterConfig, RedisMapper)}
+ * which use {@link redis.clients.jedis.JedisCluster} to connect to redis 
cluster.
--- End diff --

redis => Redis


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-31 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58109313
  
--- Diff: 
flink-streaming-connectors/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisClusterConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisPoolConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.config.JedisSentinelConfig;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainer;
+import 
org.apache.flink.streaming.connectors.redis.common.container.RedisCommandsContainerBuilder;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataTypeDescription;
+import 
org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
+
+import redis.clients.jedis.exceptions.JedisException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A sink that delivers data to a Redis channel using the Jedis client.
+ *
+ *  
+ * When creating the sink using first constructor {@link 
#RedisSink(JedisPoolConfig, RedisMapper)}
+ * the sink will create connection using {@link 
redis.clients.jedis.JedisPool}.
+ *
+ * When using second constructor {@link #RedisSink(JedisSentinelConfig, 
RedisMapper)} the sink will create connection
+ * using {@link redis.clients.jedis.JedisSentinelPool} to redis cluster. 
Use this if redis is
--- End diff --

redis => Redis


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3034) Redis SInk Connector

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220454#comment-15220454
 ] 

ASF GitHub Bot commented on FLINK-3034:
---

Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58108235
  
--- Diff: docs/apis/streaming/connectors/redis.md ---
@@ -0,0 +1,172 @@
+---
+title: "Redis Connector"
+
+# Sub-level navigation
+sub-nav-group: streaming
+sub-nav-parent: connectors
+sub-nav-pos: 6
+sub-nav-title: Redis
+---
+
+
+This connector provides a Sink that can write to
+[Redis](http://redis.io/) and also can publish data to [Redis 
PubSub](http://redis.io/topics/pubsub). To use this connector, add the
+following dependency to your project:
+{% highlight xml %}
+
+  org.apache.flink
+  flink-connector-redis{{ site.scala_version_suffix 
}}
+  {{site.version }}
+
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary 
distribution. See linking with them for cluster execution 
[here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
+
+ Installing Redis
+Follow the instructions from the [Redis download 
page](http://redis.io/download).
+
+ Redis Sink
+A class providing an interface for sending data to Redis. 
+The sink can use three different methods for communicating with different 
type of Redis environments:
+1. Single Redis Server
+2. Redis Cluster
+3. Redis Sentinel
+
+This code shows how to create a sink that communicate to a single redis 
server:
+
+
+
+{% highlight java %}
+public static class RedisExampleMapper implements 
RedisMapper>{
+
+@Override
+public RedisDataTypeDescription getDataTypeDescription() {
+return new RedisDataTypeDescription(RedisDataType.HASH, 
"HASH_NAME");
+}
+
+@Override
+public String getKeyFromData(Tuple2 data) {
+return data.f0;
+}
+
+@Override
+public String getValueFromData(Tuple2 data) {
+return data.f1;
+}
+}
+JedisPoolConfig conf = new 
JedisPoolConfig.Builder().setHost("127.0.0.1").build();
+
+DataStream stream = ...;
+stream.addSink(new RedisSink>(conf, new 
RedisExampleMapper());
+{% endhighlight %}
+
+
+{% highlight scala %}
+class RedisExampleMapper extends RedisMapper[(String, String)]{
+  override def getDataTypeDescription: RedisDataTypeDescription = {
+new RedisDataTypeDescription(RedisDataType.HASH, "HASH_NAME")
+  }
+
+  override def getKeyFromData(data: (String, String)): String = data._1
+
+  override def getValueFromData(data: (String, String)): String = data._2
+}
+val conf = new JedisPoolConfig.Builder().setHost("127.0.0.1").build()
+stream.addSink(new RedisSink[(String, String)](conf, new 
RedisExampleMapper))
+{% endhighlight %}
+
+
+
+This example code does the same, but for Redis Cluster:
+
+
+
+{% highlight java %}
+
+JedisPoolConfig conf = new JedisClusterConfig.Builder()
+.setNodes(new HashSet(Arrays.asList(new 
InetSocketAddress(5601.build();
+
+DataStream stream = ...;
+stream.addSink(new RedisSink>(conf, new 
RedisExampleMapper());
+{% endhighlight %}
+
+
+{% highlight scala %}
+val conf = new JedisClusterConfig.Builder().setNodes(...).build()
+stream.addSink(new RedisSink[(String, String)](conf, new 
RedisExampleMapper))
+{% endhighlight %}
+
+
+
+This example shows when the Redis environment is with Sentinels:
+
+
+
+{% highlight java %}
+
+JedisSentinelConfig conf = new 
JedisSentinelConfig.Builder().setMasterName("master").setSentinels(...).build();
+
+DataStream stream = ...;
+stream.addSink(new RedisSink>(conf, new 
RedisExampleMapper());
+{% endhighlight %}
+
+
+{% highlight scala %}
+val conf = new 
JedisSentinelConfig.Builder().setMasterName("master").setSentinels(...).build()
+stream.addSink(new RedisSink[(String, String)](conf, new 
RedisExampleMapper))
+{% endhighlight %}
+
+
+
+This section gives a description of all the available Data types and what 
redis command used for that.
--- End diff --

Typo: Data types => data types


> Redis SInk Connector
> 
>
> Key: FLINK-3034
> URL: https://issues.apache.org/jira/browse/FLINK-3034
> Project: Flink
>  Issue Type: New Feature
>  Components: 

[GitHub] flink pull request: [FLINK-3034] Redis Sink Connector

2016-03-31 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/1813#discussion_r58108235
  
--- Diff: docs/apis/streaming/connectors/redis.md ---
@@ -0,0 +1,172 @@
+---
+title: "Redis Connector"
+
+# Sub-level navigation
+sub-nav-group: streaming
+sub-nav-parent: connectors
+sub-nav-pos: 6
+sub-nav-title: Redis
+---
+
+
+This connector provides a Sink that can write to
+[Redis](http://redis.io/) and also can publish data to [Redis 
PubSub](http://redis.io/topics/pubsub). To use this connector, add the
+following dependency to your project:
+{% highlight xml %}
+
+  org.apache.flink
+  flink-connector-redis{{ site.scala_version_suffix 
}}
+  {{site.version }}
+
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary 
distribution. See linking with them for cluster execution 
[here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
+
+ Installing Redis
+Follow the instructions from the [Redis download 
page](http://redis.io/download).
+
+ Redis Sink
+A class providing an interface for sending data to Redis. 
+The sink can use three different methods for communicating with different 
type of Redis environments:
+1. Single Redis Server
+2. Redis Cluster
+3. Redis Sentinel
+
+This code shows how to create a sink that communicate to a single redis 
server:
+
+
+
+{% highlight java %}
+public static class RedisExampleMapper implements 
RedisMapper>{
+
+@Override
+public RedisDataTypeDescription getDataTypeDescription() {
+return new RedisDataTypeDescription(RedisDataType.HASH, 
"HASH_NAME");
+}
+
+@Override
+public String getKeyFromData(Tuple2 data) {
+return data.f0;
+}
+
+@Override
+public String getValueFromData(Tuple2 data) {
+return data.f1;
+}
+}
+JedisPoolConfig conf = new 
JedisPoolConfig.Builder().setHost("127.0.0.1").build();
+
+DataStream stream = ...;
+stream.addSink(new RedisSink>(conf, new 
RedisExampleMapper());
+{% endhighlight %}
+
+
+{% highlight scala %}
+class RedisExampleMapper extends RedisMapper[(String, String)]{
+  override def getDataTypeDescription: RedisDataTypeDescription = {
+new RedisDataTypeDescription(RedisDataType.HASH, "HASH_NAME")
+  }
+
+  override def getKeyFromData(data: (String, String)): String = data._1
+
+  override def getValueFromData(data: (String, String)): String = data._2
+}
+val conf = new JedisPoolConfig.Builder().setHost("127.0.0.1").build()
+stream.addSink(new RedisSink[(String, String)](conf, new 
RedisExampleMapper))
+{% endhighlight %}
+
+
+
+This example code does the same, but for Redis Cluster:
+
+
+
+{% highlight java %}
+
+JedisPoolConfig conf = new JedisClusterConfig.Builder()
+.setNodes(new HashSet(Arrays.asList(new 
InetSocketAddress(5601.build();
+
+DataStream stream = ...;
+stream.addSink(new RedisSink>(conf, new 
RedisExampleMapper());
+{% endhighlight %}
+
+
+{% highlight scala %}
+val conf = new JedisClusterConfig.Builder().setNodes(...).build()
+stream.addSink(new RedisSink[(String, String)](conf, new 
RedisExampleMapper))
+{% endhighlight %}
+
+
+
+This example shows when the Redis environment is with Sentinels:
+
+
+
+{% highlight java %}
+
+JedisSentinelConfig conf = new 
JedisSentinelConfig.Builder().setMasterName("master").setSentinels(...).build();
+
+DataStream stream = ...;
+stream.addSink(new RedisSink>(conf, new 
RedisExampleMapper());
+{% endhighlight %}
+
+
+{% highlight scala %}
+val conf = new 
JedisSentinelConfig.Builder().setMasterName("master").setSentinels(...).build()
+stream.addSink(new RedisSink[(String, String)](conf, new 
RedisExampleMapper))
+{% endhighlight %}
+
+
+
+This section gives a description of all the available Data types and what 
redis command used for that.
--- End diff --

Typo: Data types => data types


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [docs] Add first version of "Concepts" documen...

2016-03-31 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/1843

[docs] Add first version of "Concepts" documentation.

I think that the Flink docs (linked from the website) need a general 
introduction of the concepts behind Flink.

Such a thing is important for people to get a "mental model" of what Flink 
does. Also, most of the later documentation assumes that people know about 
operators, streams, parallelism, JobManagers, TaskManagers, etc.

This **Concepts** page introduces these things briefly and high level, so 
people get a basic understanding of the who-is-who and what-is-what in Flink.

*Since these pull requests don't render the SVG files, I added them here.*


![prorgam_dataflow](https://cloud.githubusercontent.com/assets/1727146/14184836/6f41aefe-f777-11e5-8eb4-eab949d1790f.png)

--


![parallel_dataflow](https://cloud.githubusercontent.com/assets/1727146/14184846/7f663e9e-f777-11e5-9cee-930fa8a95957.png)

--


![operators_tasks_chaining](https://cloud.githubusercontent.com/assets/1727146/14184849/833a4e3e-f777-11e5-8146-25343dad233b.png)

--


![processes](https://cloud.githubusercontent.com/assets/1727146/14184861/8af17aee-f777-11e5-831d-d044002d7e1b.png)

--


![tasks_slots](https://cloud.githubusercontent.com/assets/1727146/14184863/8dd6c98a-f777-11e5-8488-b50bcbf28dea.png)

--


![slot_sharing](https://cloud.githubusercontent.com/assets/1727146/14184868/916b5746-f777-11e5-9924-48ddae65e767.png)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink concepts

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1843.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1843


commit 3b4f9d2859684f6537d623fc871c5d1d6a1c2fcb
Author: Stephan Ewen 
Date:   2016-03-31T17:25:35Z

[docs] Add first version of "concepts" documentation.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2522) Integrate Streaming Api into Flink-scala-shell

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220273#comment-15220273
 ] 

ASF GitHub Bot commented on FLINK-2522:
---

Github user nikste commented on a diff in the pull request:

https://github.com/apache/flink/pull/1412#discussion_r58094009
  
--- Diff: 
flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala ---
@@ -124,7 +127,9 @@ class FlinkILoop(
 "org.apache.flink.api.java.operators._",
 "org.apache.flink.api.java.sampling._",
 "org.apache.flink.api.scala._",
-"org.apache.flink.api.scala.utils._"
+"org.apache.flink.api.scala.utils._",
+"org.apache.flink.streaming.api.scala._",
+"org.apache.flink.streaming.api.windowing.time._"
--- End diff --

I've added `streaming.api.scala` and `windowing.time` any other default 
imports important for streaming?


> Integrate Streaming Api into Flink-scala-shell
> --
>
> Key: FLINK-2522
> URL: https://issues.apache.org/jira/browse/FLINK-2522
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala Shell
>Reporter: Nikolaas Steenbergen
>Assignee: Nikolaas Steenbergen
>
> startup scala shell with "-s" or "-streaming" flag to use the streaming api



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2016-03-31 Thread nikste
Github user nikste commented on a diff in the pull request:

https://github.com/apache/flink/pull/1412#discussion_r58094009
  
--- Diff: 
flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala ---
@@ -124,7 +127,9 @@ class FlinkILoop(
 "org.apache.flink.api.java.operators._",
 "org.apache.flink.api.java.sampling._",
 "org.apache.flink.api.scala._",
-"org.apache.flink.api.scala.utils._"
+"org.apache.flink.api.scala.utils._",
+"org.apache.flink.streaming.api.scala._",
+"org.apache.flink.streaming.api.windowing.time._"
--- End diff --

I've added `streaming.api.scala` and `windowing.time` any other default 
imports important for streaming?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2016-03-31 Thread nikste
Github user nikste commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-204038110
  
So since a lot has changed in the master I basically rewrote the whole 
thing. Seems to work now.

What default imports should be included for the streaming API? 

Also I noticed, that the commit history of the files is lost after moving 
the module from staging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2522) Integrate Streaming Api into Flink-scala-shell

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220268#comment-15220268
 ] 

ASF GitHub Bot commented on FLINK-2522:
---

Github user nikste commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-204038110
  
So since a lot has changed in the master I basically rewrote the whole 
thing. Seems to work now.

What default imports should be included for the streaming API? 

Also I noticed, that the commit history of the files is lost after moving 
the module from staging.


> Integrate Streaming Api into Flink-scala-shell
> --
>
> Key: FLINK-2522
> URL: https://issues.apache.org/jira/browse/FLINK-2522
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala Shell
>Reporter: Nikolaas Steenbergen
>Assignee: Nikolaas Steenbergen
>
> startup scala shell with "-s" or "-streaming" flag to use the streaming api



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2522) Integrate Streaming Api into Flink-scala-shell

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15219900#comment-15219900
 ] 

ASF GitHub Bot commented on FLINK-2522:
---

Github user rawkintrevo commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-203953994
  
@nikste awesome work, any updates?


> Integrate Streaming Api into Flink-scala-shell
> --
>
> Key: FLINK-2522
> URL: https://issues.apache.org/jira/browse/FLINK-2522
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala Shell
>Reporter: Nikolaas Steenbergen
>Assignee: Nikolaas Steenbergen
>
> startup scala shell with "-s" or "-streaming" flag to use the streaming api



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2522] Streaming support for Flink-Scala...

2016-03-31 Thread rawkintrevo
Github user rawkintrevo commented on the pull request:

https://github.com/apache/flink/pull/1412#issuecomment-203953994
  
@nikste awesome work, any updates?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3669) WindowOperator registers a lot of timers at StreamTask

2016-03-31 Thread Konstantin Knauf (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15219865#comment-15219865
 ] 

Konstantin Knauf commented on FLINK-3669:
-

In my opionen the 1st option is the cleanest, but I can hardly imagine a 
usecase for which the 3rd option would be problem, so I think it is totally 
fine. In this case ´registerProcessingTimeTimer()´ should document that the 
timestamp is "ceiled" to the next interval and refer to the configuration 
parameter.

> WindowOperator registers a lot of timers at StreamTask
> --
>
> Key: FLINK-3669
> URL: https://issues.apache.org/jira/browse/FLINK-3669
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.0.1
>Reporter: Aljoscha Krettek
>Priority: Blocker
>
> Right now, the WindowOperator registers a timer at the StreamTask for every 
> processing-time timer that a Trigger registers. We should combine several 
> registered trigger timers to only register one low-level timer (timer 
> coalescing).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3669) WindowOperator registers a lot of timers at StreamTask

2016-03-31 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15219845#comment-15219845
 ] 

Aljoscha Krettek commented on FLINK-3669:
-

Yes, you're right that's still a problem. The issue with removing the 
TriggerTasks is, that a) right now, the WindowOperator does not have a handle 
to the ScheduledFuture and b) if we combine the triggers for a certain 
timestamp into one trigger we cannot simply delete that TriggerTask until we 
know that all timers in the WindowOperator for that timestamp have been 
removed. By now, I see basically three solutions for our problem:

1. Create one TriggerTask per {{registerProcessingTimeTimer()}} call of the 
user (as is the case now), keep the {{ScheduledFuture}}, remove the timer when 
the user calls {{deleteProcessingTimeTimer()}}
2. Coalesce multiple {{registerProcessingTimeTimer()}} calls to only create one 
TriggerTask to reduce the burden on the {{ScheduledThreadPoolExecutor}}, keep 
track of how many user triggers we have for a timestamp, remove using the 
{{ScheduledFuture}} once the number drops to zero
3. Change the timer service to call {{trigger()}} of the {{WindowOperator}} on 
a fixed interval, say 10 ms, the logic in {{WindowOperator.trigger()}} already 
internally checks which triggers should fire based on the timestamp given. This 
would mean that the burden on {{ScheduledThreadPoolExecutor}} is constant does 
not depend on the number of keys or number of user timers registered but it is 
potentially wasteful. 

(when I say user above, I mean the {{Trigger}} implementation)

I think I would go with option 3. the granularity can be user configurable, of 
course. There is, of course, a tradeoff. If the user has very large windows 
firing all those timers is a huge waste, but the other options require to hold 
a data structure to keep track of things.

What do you think?

> WindowOperator registers a lot of timers at StreamTask
> --
>
> Key: FLINK-3669
> URL: https://issues.apache.org/jira/browse/FLINK-3669
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.0.1
>Reporter: Aljoscha Krettek
>Priority: Blocker
>
> Right now, the WindowOperator registers a timer at the StreamTask for every 
> processing-time timer that a Trigger registers. We should combine several 
> registered trigger timers to only register one low-level timer (timer 
> coalescing).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3614] Remove Non-Keyed Window Operator

2016-03-31 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1805#issuecomment-203875675
  
Any thoughts on this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3614) Remove Non-Keyed Window Operator

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15219732#comment-15219732
 ] 

ASF GitHub Bot commented on FLINK-3614:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1805#issuecomment-203875675
  
Any thoughts on this?


> Remove Non-Keyed Window Operator
> 
>
> Key: FLINK-3614
> URL: https://issues.apache.org/jira/browse/FLINK-3614
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> I propose to remove the special Non-Keyed Window Operator and implement 
> non-parallel windows by using the standard WindowOperator with a dummy 
> KeySelector.
> Maintaining everything for two WindowOperators is a huge burden. The 
> implementation is completely separate by now. For example, the Non-Keyed 
> window operator does not use the StateBackend for state, i.e. cannot use 
> RocksDB. Also, with upcoming changes (Merging/Session windows, aligned 
> windows) this will only increase the maintenance burden.
> Also, the fast AlignedProcessingTimeWindows operators also only support the 
> Parallel/Keyed case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3685) Logical error in code for DateSerializer deserialize with reuse

2016-03-31 Thread ZhengBowen (JIRA)
ZhengBowen created FLINK-3685:
-

 Summary: Logical error in code for DateSerializer deserialize with 
reuse
 Key: FLINK-3685
 URL: https://issues.apache.org/jira/browse/FLINK-3685
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.0.0
Reporter: ZhengBowen


There is a logical error in the following function in DateSerializer.java when 
source read '-1'
function is:
```
public Date deserialize(Date reuse, DataInputView source) throws IOException {
long v = source.readLong();
if(v == -1L) {
return null;
}
reuse.setTime(v);
return reuse;
}
```
when call this function for first time, if return null, then 'reuse' will be 
set null by caller;
when call this function for second time,if 'v!=-1' ,reuse.setTime(v) will throw 
NPE.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: FLINK-3579 Improve String concatenation (Ram)

2016-03-31 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1821#discussion_r58029854
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
 ---
@@ -59,6 +59,8 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
   Literal(str.toInt)
 } else if (str.endsWith("f") | str.endsWith("F")) {
   Literal(str.toFloat)
+} else if (str.endsWith(".")) {
--- End diff --

Of course both are valid expressions but `1.` is simply not a valid number 
literal. Instead `1` would be valid. I guess you should change your grammar 
such that `1.abs()` will parsed into something like 
`functionApplication(NumberLiteral(1), abs)` and the `.` indicates the function 
application but is not part of the number literal token.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3579) Improve String concatenation

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15219703#comment-15219703
 ] 

ASF GitHub Bot commented on FLINK-3579:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1821#discussion_r58029854
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
 ---
@@ -59,6 +59,8 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
   Literal(str.toInt)
 } else if (str.endsWith("f") | str.endsWith("F")) {
   Literal(str.toFloat)
+} else if (str.endsWith(".")) {
--- End diff --

Of course both are valid expressions but `1.` is simply not a valid number 
literal. Instead `1` would be valid. I guess you should change your grammar 
such that `1.abs()` will parsed into something like 
`functionApplication(NumberLiteral(1), abs)` and the `.` indicates the function 
application but is not part of the number literal token.


> Improve String concatenation
> 
>
> Key: FLINK-3579
> URL: https://issues.apache.org/jira/browse/FLINK-3579
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Reporter: Timo Walther
>Assignee: ramkrishna.s.vasudevan
>Priority: Minor
>
> Concatenation of a String and non-String does not work properly.
> e.g. {{f0 + 42}} leads to RelBuilder Exception
> ExpressionParser does not like {{f0 + 42.cast(STRING)}} either.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3684) CEP operator does not forward watermarks properly

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15219674#comment-15219674
 ] 

ASF GitHub Bot commented on FLINK-3684:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1842


> CEP operator does not forward watermarks properly
> -
>
> Key: FLINK-3684
> URL: https://issues.apache.org/jira/browse/FLINK-3684
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.1.0, 1.0.1
>
>
> The CEP stream operator don't emit a proper watermark when using event time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3684] [cep] Add proper watermark emissi...

2016-03-31 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1842


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-3684) CEP operator does not forward watermarks properly

2016-03-31 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-3684.

Resolution: Fixed

Fixed via e3759a5e65181040066dbb278266f2c1fa226347

> CEP operator does not forward watermarks properly
> -
>
> Key: FLINK-3684
> URL: https://issues.apache.org/jira/browse/FLINK-3684
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.1.0, 1.0.1
>
>
> The CEP stream operator don't emit a proper watermark when using event time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3684) CEP operator does not forward watermarks properly

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15219666#comment-15219666
 ] 

ASF GitHub Bot commented on FLINK-3684:
---

Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1842#issuecomment-203854053
  
+1 to merge.


> CEP operator does not forward watermarks properly
> -
>
> Key: FLINK-3684
> URL: https://issues.apache.org/jira/browse/FLINK-3684
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.1.0, 1.0.1
>
>
> The CEP stream operator don't emit a proper watermark when using event time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3684) CEP operator does not forward watermarks properly

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15219669#comment-15219669
 ] 

ASF GitHub Bot commented on FLINK-3684:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1842#issuecomment-203854306
  
Changes are trivial and local maven verify passes. Will merge it.


> CEP operator does not forward watermarks properly
> -
>
> Key: FLINK-3684
> URL: https://issues.apache.org/jira/browse/FLINK-3684
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.1.0, 1.0.1
>
>
> The CEP stream operator don't emit a proper watermark when using event time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3684] [cep] Add proper watermark emissi...

2016-03-31 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1842#issuecomment-203854053
  
+1 to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3684) CEP operator does not forward watermarks properly

2016-03-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15219662#comment-15219662
 ] 

ASF GitHub Bot commented on FLINK-3684:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/1842

[FLINK-3684] [cep] Add proper watermark emission to CEP operators

Adds watermark emission to CEP stream operators.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixCEPWatermarkPropagation

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1842.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1842


commit 9feb6eac17a0b90cf6f30f06adb4a7b97e2d9382
Author: Till Rohrmann 
Date:   2016-03-31T09:35:19Z

[FLINK-3684] [cep] Add proper watermark emission to CEP operators




> CEP operator does not forward watermarks properly
> -
>
> Key: FLINK-3684
> URL: https://issues.apache.org/jira/browse/FLINK-3684
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.1.0, 1.0.1
>
>
> The CEP stream operator don't emit a proper watermark when using event time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3684) CEP operator does not forward watermarks properly

2016-03-31 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-3684:


 Summary: CEP operator does not forward watermarks properly
 Key: FLINK-3684
 URL: https://issues.apache.org/jira/browse/FLINK-3684
 Project: Flink
  Issue Type: Bug
  Components: CEP
Affects Versions: 1.0.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.1.0, 1.0.1


The CEP stream operator don't emit a proper watermark when using event time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3684] [cep] Add proper watermark emissi...

2016-03-31 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/1842

[FLINK-3684] [cep] Add proper watermark emission to CEP operators

Adds watermark emission to CEP stream operators.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixCEPWatermarkPropagation

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1842.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1842


commit 9feb6eac17a0b90cf6f30f06adb4a7b97e2d9382
Author: Till Rohrmann 
Date:   2016-03-31T09:35:19Z

[FLINK-3684] [cep] Add proper watermark emission to CEP operators




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >