[GitHub] storm pull request: Storm 616 : Storm-jdbc connector.

2015-01-05 Thread Parth-Brahmbhatt
GitHub user Parth-Brahmbhatt opened a pull request:

https://github.com/apache/storm/pull/372

Storm 616 : Storm-jdbc connector.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Parth-Brahmbhatt/incubator-storm STORM-616

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/372.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #372


commit 65e9f0c814b2cddc772880042259b66194fd6fb7
Author: Parth Brahmbhatt 
Date:   2014-12-05T22:48:34Z

 STORM-586: TridentKafkaEmitter should catch updateOffsetException.

commit 5b160168c75c0e8c4c402a5e24f606dab697fbef
Author: Parth Brahmbhatt 
Date:   2015-01-06T03:14:18Z

STORM-616: Jdbc connector for storm.

commit ab9f778ae50a1e224ebdcc58e6249009fc1f91cc
Author: Parth Brahmbhatt 
Date:   2015-01-06T03:23:52Z

Merge remote-tracking branch 'upstream/master' into STORM-616

commit d260759ac203383e27668a7cb7090926029f7406
Author: Parth Brahmbhatt 
Date:   2015-01-06T03:31:05Z

STORM-616 : removing unintended changes.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (STORM-616) add storm-jdbc to list of external connectors.

2015-01-05 Thread Parth Brahmbhatt (JIRA)
Parth Brahmbhatt created STORM-616:
--

 Summary: add storm-jdbc to list of external connectors.
 Key: STORM-616
 URL: https://issues.apache.org/jira/browse/STORM-616
 Project: Apache Storm
  Issue Type: New JIRA Project
Reporter: Parth Brahmbhatt
Assignee: Parth Brahmbhatt
Priority: Minor


There have been several questions in the apache mailing list around how to use 
storm to write tuples to a relational database. Storm should add a jdbc 
connector to its list of external connectors that has a general implementation 
to insert data into relational dbs as part of a topology. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-609) add storm-redis to storm external

2015-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14265363#comment-14265363
 ] 

ASF GitHub Bot commented on STORM-609:
--

Github user HeartSaVioR commented on the pull request:

https://github.com/apache/storm/pull/365#issuecomment-68803852
  
@dashengju Great! Thanks for applying.


> add storm-redis to storm external
> -
>
> Key: STORM-609
> URL: https://issues.apache.org/jira/browse/STORM-609
> Project: Apache Storm
>  Issue Type: New Feature
>Affects Versions: 0.10.0
>Reporter: DashengJu
>Assignee: DashengJu
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-609] Add storm-redis to storm external

2015-01-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the pull request:

https://github.com/apache/storm/pull/365#issuecomment-68803852
  
@dashengju Great! Thanks for applying.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (STORM-615) Add REST API to upload topology

2015-01-05 Thread Sriharsha Chintalapani (JIRA)
Sriharsha Chintalapani created STORM-615:


 Summary: Add REST API to upload topology
 Key: STORM-615
 URL: https://issues.apache.org/jira/browse/STORM-615
 Project: Apache Storm
  Issue Type: Bug
Reporter: Sriharsha Chintalapani
Assignee: Sriharsha Chintalapani


Add REST api /api/v1/submitTopology to upload topology jars and config using 
REST api.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-603 Log errors when required kafka param...

2015-01-05 Thread curtisallen
Github user curtisallen commented on a diff in the pull request:

https://github.com/apache/storm/pull/361#discussion_r22474338
  
--- Diff: 
external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java ---
@@ -41,6 +41,14 @@
 private String _topic;
 
 public DynamicBrokersReader(Map conf, String zkStr, String zkPath, 
String topic) {
+// Check required parameters
+if(conf == null) {LOG.error("conf cannot be null");}
--- End diff --

@harshach Agreed. I didn't take that approach because it would require 
significantly more changes to the spout since `DynamicBrokerReader` doesn't 
reference `KafkaConfig` or `SpoutConfig` currently. Also it would require a 
modification to `DynamicBrokerReader`'s constructor to accept a `KafkaConfig` 
instance breaking legacy code. I thought that would be a big change for a minor 
storm release.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-603) storm-kafka: Log errors when missing required configuration fields

2015-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14264787#comment-14264787
 ] 

ASF GitHub Bot commented on STORM-603:
--

Github user curtisallen commented on a diff in the pull request:

https://github.com/apache/storm/pull/361#discussion_r22474338
  
--- Diff: 
external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java ---
@@ -41,6 +41,14 @@
 private String _topic;
 
 public DynamicBrokersReader(Map conf, String zkStr, String zkPath, 
String topic) {
+// Check required parameters
+if(conf == null) {LOG.error("conf cannot be null");}
--- End diff --

@harshach Agreed. I didn't take that approach because it would require 
significantly more changes to the spout since `DynamicBrokerReader` doesn't 
reference `KafkaConfig` or `SpoutConfig` currently. Also it would require a 
modification to `DynamicBrokerReader`'s constructor to accept a `KafkaConfig` 
instance breaking legacy code. I thought that would be a big change for a minor 
storm release.


> storm-kafka: Log errors when missing required configuration fields
> --
>
> Key: STORM-603
> URL: https://issues.apache.org/jira/browse/STORM-603
> Project: Apache Storm
>  Issue Type: Bug
>Reporter: Curtis Allen
>Assignee: Curtis Allen
>
> I was upgrading our topologies to storm-0.9.3 this 
> [commit|https://github.com/apache/storm/commit/2596e335f27a57784a93a57823bd93dde587909f]
>  introduced a change that threw me for a loop. When submitting my topology I 
> got the following error.
> {code}
> [main] ERROR storm.kafka.DynamicBrokersReader - Couldn't connect to zookeeper
>  java.lang.IllegalArgumentException: Don't know how to convert null to int
>   at backtype.storm.utils.Utils.getInt(Utils.java:301) 
> ~[storm-core-0.9.3.jar:0.9.3]
>   at 
> storm.kafka.DynamicBrokersReader.(DynamicBrokersReader.java:47) 
> ~[gambit-storm-threads-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>   at 
> com.pearson.gambit.threads.storm.ThreadsTopology.main(ThreadsTopology.java:45)
>  [gambit-storm-threads-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>  Exception in thread "main" java.lang.RuntimeException: 
> java.lang.RuntimeException: java.lang.NullPointerException
>   at 
> storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:81)
>   at 
> com.pearson.gambit.threads.storm.ThreadsTopology.main(ThreadsTopology.java:48)
>  Caused by: java.lang.RuntimeException: java.lang.NullPointerException
>   at 
> storm.kafka.DynamicBrokersReader.getNumPartitions(DynamicBrokersReader.java:94)
>   at 
> storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:65)
>   ... 1 more
>  Caused by: java.lang.NullPointerException
>   at 
> storm.kafka.DynamicBrokersReader.getNumPartitions(DynamicBrokersReader.java:91)
>   ... 2 more 
> {code}
> It took me a while to figure out that this error stems from missing the 
> {code}backtype.storm.Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT{code} property 
> in the conf map provided to the {code}storm.kafka.DynamicBrokersReader{code} 
> constructer. It would be nice to check the required configuration parameters 
> and throw a RuntimeException if any are missing. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-603) storm-kafka: Log errors when missing required configuration fields

2015-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14264781#comment-14264781
 ] 

ASF GitHub Bot commented on STORM-603:
--

Github user curtisallen commented on a diff in the pull request:

https://github.com/apache/storm/pull/361#discussion_r22474024
  
--- Diff: 
external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java ---
@@ -149,4 +158,19 @@ private Broker getBrokerHost(byte[] contents) {
 }
 }
 
+private void validateConfig(final Map conf) {
+if(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT) == null) {
--- End diff --

@harshach there are no defaults for these values, hence the reason for this 
pull request. The constructor for the DynamicBrokerReader accepts a 
`java.util.Map` Which must contain the values that are verified in this 
`validateConfig` method.


> storm-kafka: Log errors when missing required configuration fields
> --
>
> Key: STORM-603
> URL: https://issues.apache.org/jira/browse/STORM-603
> Project: Apache Storm
>  Issue Type: Bug
>Reporter: Curtis Allen
>Assignee: Curtis Allen
>
> I was upgrading our topologies to storm-0.9.3 this 
> [commit|https://github.com/apache/storm/commit/2596e335f27a57784a93a57823bd93dde587909f]
>  introduced a change that threw me for a loop. When submitting my topology I 
> got the following error.
> {code}
> [main] ERROR storm.kafka.DynamicBrokersReader - Couldn't connect to zookeeper
>  java.lang.IllegalArgumentException: Don't know how to convert null to int
>   at backtype.storm.utils.Utils.getInt(Utils.java:301) 
> ~[storm-core-0.9.3.jar:0.9.3]
>   at 
> storm.kafka.DynamicBrokersReader.(DynamicBrokersReader.java:47) 
> ~[gambit-storm-threads-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>   at 
> com.pearson.gambit.threads.storm.ThreadsTopology.main(ThreadsTopology.java:45)
>  [gambit-storm-threads-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
>  Exception in thread "main" java.lang.RuntimeException: 
> java.lang.RuntimeException: java.lang.NullPointerException
>   at 
> storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:81)
>   at 
> com.pearson.gambit.threads.storm.ThreadsTopology.main(ThreadsTopology.java:48)
>  Caused by: java.lang.RuntimeException: java.lang.NullPointerException
>   at 
> storm.kafka.DynamicBrokersReader.getNumPartitions(DynamicBrokersReader.java:94)
>   at 
> storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:65)
>   ... 1 more
>  Caused by: java.lang.NullPointerException
>   at 
> storm.kafka.DynamicBrokersReader.getNumPartitions(DynamicBrokersReader.java:91)
>   ... 2 more 
> {code}
> It took me a while to figure out that this error stems from missing the 
> {code}backtype.storm.Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT{code} property 
> in the conf map provided to the {code}storm.kafka.DynamicBrokersReader{code} 
> constructer. It would be nice to check the required configuration parameters 
> and throw a RuntimeException if any are missing. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-603 Log errors when required kafka param...

2015-01-05 Thread curtisallen
Github user curtisallen commented on a diff in the pull request:

https://github.com/apache/storm/pull/361#discussion_r22474024
  
--- Diff: 
external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java ---
@@ -149,4 +158,19 @@ private Broker getBrokerHost(byte[] contents) {
 }
 }
 
+private void validateConfig(final Map conf) {
+if(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT) == null) {
--- End diff --

@harshach there are no defaults for these values, hence the reason for this 
pull request. The constructor for the DynamicBrokerReader accepts a 
`java.util.Map` Which must contain the values that are verified in this 
`validateConfig` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: null pointer exception

2015-01-05 Thread Bobby Evans
What version of storm are you using?  Are any of the bolts shell bolts?  There 
is a known issue where this can happen if two shell bolts share an executor, 
because they are multi-threaded. - Bobby
 

 On Sunday, January 4, 2015 10:19 PM, clay teahouse 
 wrote:
   

 Hi All,I have the following topologyspout -> Bolt1 --> Bolt2
Neither bolts are async or multi-threaded. Bolt2 uses http client to make 
post/put requests to a web server. Both bolts ack the tuples before exiting the 
execute.
The topology runs fine for a while under a load of about 50MB/minute. After 
about 10-15 minutes the topology dies with the following error. Any hint what 
the issue might be? I'd appreciate any feedback.
java.lang.RuntimeException: java.lang.NullPointerException        at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
 ~[storm-core-0.9.3.jar:0.9.3]        at 
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
 ~[storm-core-0.9.3.jar:0.9.3]        at 
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) 
~[storm-core-0.9.3.jar:0.9.3]        at 
backtype.storm.disruptor$consume_loop_STAR_$fn__759.invoke(disruptor.clj:94) 
~[storm-core-0.9.3.jar:0.9.3]        at 
backtype.storm.util$async_loop$fn__458.invoke(util.clj:463) 
~[storm-core-0.9.3.jar:0.9.3]        at clojure.lang.AFn.run(AFn.java:24) 
[clojure-1.5.1.jar:na]        at java.lang.Thread.run(Thread.java:745) 
[na:1.7.0_65]Caused by: java.lang.NullPointerException: null        at 
clojure.lang.RT.intCast(RT.java:1087) ~[clojure-1.5.1.jar:na]        at 
backtype.storm.daemon.worker$mk_transfer_fn$fn__3548.invoke(worker.clj:129) 
~[storm-core-0.9.3.jar:0.9.3]        at 
backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3282.invoke(executor.clj:258)
 ~[storm-core-0.9.3.jar:0.9.3]        at 
backtype.storm.disruptor$clojure_handler$reify__746.onEvent(disruptor.clj:58) 
~[storm-core-0.9.3.jar:0.9.3]        at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
 ~[storm-core-0.9.3.jar:0.9.3]        ... 6 common frames omitted1059203 
[Thread-76-disruptor-executor[34 34]-send-queue] ERROR 
backtype.storm.daemon.executor -java.lang.RuntimeException: 
java.lang.NullPointerException        at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
 ~[storm-core-0.9.3.jar:0.9.3]        at 
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
 ~[storm-core-0.9.3.jar:0.9.3]        at 
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) 
~[storm-core-0.9.3.jar:0.9.3]        at 
backtype.storm.disruptor$consume_loop_STAR_$fn__759.invoke(disruptor.clj:94) 
~[storm-core-0.9.3.jar:0.9.3]        at 
backtype.storm.util$async_loop$fn__458.invoke(util.clj:463) 
~[storm-core-0.9.3.jar:0.9.3]        at clojure.lang.AFn.run(AFn.java:24) 
[clojure-1.5.1.jar:na]        at java.lang.Thread.run(Thread.java:745) 
[na:1.7.0_65]Caused by: java.lang.NullPointerException: null        at 
clojure.lang.RT.intCast(RT.java:1087) ~[clojure-1.5.1.jar:na]        at 
backtype.storm.daemon.worker$mk_transfer_fn$fn__3548.invoke(worker.clj:129) 
~[storm-core-0.9.3.jar:0.9.3]        at 
backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3282.invoke(executor.clj:258)
 ~[storm-core-0.9.3.jar:0.9.3]        at 
backtype.storm.disruptor$clojure_handler$reify__746.onEvent(disruptor.clj:58) 
~[storm-core-0.9.3.jar:0.9.3]        at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
 ~[storm-core-0.9.3.jar:0.9.3]        ... 6 common frames omitted1060135 
[Thread-76-disruptor-executor[34 34]-send-queue] ERROR backtype.storm.util - 
Halting process: ("Worker died")java.lang.RuntimeException: ("Worker died")     
   at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) 
[storm-core-0.9.3.jar:0.9.3]        at 
clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]        at 
backtype.storm.daemon.worker$fn__3807$fn__3808.invoke(worker.clj:452) 
[storm-core-0.9.3.jar:0.9.3]        at 
backtype.storm.daemon.executor$mk_executor_data$fn__3273$fn__3274.invoke(executor.clj:240)
 [storm-core-0.9.3.jar:0.9.3]        at 
backtype.storm.util$async_loop$fn__458.invoke(util.clj:473) 
[storm-core-0.9.3.jar:0.9.3]        at clojure.lang.AFn.run(AFn.java:24) 
[clojure-1.5.1.jar:na]        at java.lang.Thread.run(Thread.java:745) 
[na:1.7.0_65]


   

[jira] [Commented] (STORM-607) storm-hbase HBaseMapState should support user to customize the hbase-key & hbase-qualifier

2015-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14264483#comment-14264483
 ] 

ASF GitHub Bot commented on STORM-607:
--

Github user dashengju commented on the pull request:

https://github.com/apache/storm/pull/364#issuecomment-68693496
  
@ptgoetz , can you help to review this PR?


> storm-hbase HBaseMapState should support user to customize the hbase-key & 
> hbase-qualifier
> --
>
> Key: STORM-607
> URL: https://issues.apache.org/jira/browse/STORM-607
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-hbase
>Affects Versions: 0.9.3
>Reporter: DashengJu
>Assignee: DashengJu
>
> In HBaseMapState, user can specific hbase-qualifier by Options, and the 
> hbase-key is composed by all the keys by multiPut's List keys.
> for example, If I have stream with , grouped by  date>, then, the hbase-key is composed by, the hbase-qualifier 
> is pv.
> But when I want hbase-key is deal_id, and hbase-qualifier is pv+date, 
> HBaseMapState can not support this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-607] storm-hbase HBaseMapState should s...

2015-01-05 Thread dashengju
Github user dashengju commented on the pull request:

https://github.com/apache/storm/pull/364#issuecomment-68693496
  
@ptgoetz , can you help to review this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-609) add storm-redis to storm external

2015-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14264480#comment-14264480
 ] 

ASF GitHub Bot commented on STORM-609:
--

Github user dashengju commented on the pull request:

https://github.com/apache/storm/pull/365#issuecomment-68692877
  
@HeartSaVioR , thanks for your careful review, I have changed all the notes.


> add storm-redis to storm external
> -
>
> Key: STORM-609
> URL: https://issues.apache.org/jira/browse/STORM-609
> Project: Apache Storm
>  Issue Type: New Feature
>Affects Versions: 0.10.0
>Reporter: DashengJu
>Assignee: DashengJu
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-609] Add storm-redis to storm external

2015-01-05 Thread dashengju
Github user dashengju commented on the pull request:

https://github.com/apache/storm/pull/365#issuecomment-68692877
  
@HeartSaVioR , thanks for your careful review, I have changed all the notes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-609) add storm-redis to storm external

2015-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14264467#comment-14264467
 ] 

ASF GitHub Bot commented on STORM-609:
--

Github user dashengju commented on a diff in the pull request:

https://github.com/apache/storm/pull/365#discussion_r22454671
  
--- Diff: 
external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
 ---
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident.state;
+
+import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseStateUpdater;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+public class RedisClusterStateUpdater extends 
BaseStateUpdater {
+private static final Logger logger = 
LoggerFactory.getLogger(RedisClusterState.class);
+
+private static final long DEFAULT_EXPIRE_INTERVAL_MS = 8640;
+
+private final String redisKeyPrefix;
+private final TridentTupleMapper tupleMapper;
+private final long expireIntervalMs;
+
+public RedisClusterStateUpdater(String redisKeyPrefix, 
TridentTupleMapper tupleMapper, long expireIntervalMs) {
+this.redisKeyPrefix = redisKeyPrefix;
+this.tupleMapper = tupleMapper;
+if (expireIntervalMs > 0) {
+this.expireIntervalMs = expireIntervalMs;
+} else {
+this.expireIntervalMs = DEFAULT_EXPIRE_INTERVAL_MS;
+}
+}
+
+@Override
+public void updateState(RedisClusterState redisClusterState, 
List inputs,
+TridentCollector collector) {
+long expireAt = System.currentTimeMillis() + expireIntervalMs;
+
+JedisCluster jedisCluster = null;
+try {
+jedisCluster = redisClusterState.getJedisCluster();
+for (TridentTuple input : inputs) {
+String key = 
this.tupleMapper.getKeyFromTridentTuple(input);
+String redisKey = key;
+if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) 
{
+redisKey = redisKeyPrefix + redisKey;
+}
+String value = 
this.tupleMapper.getValueFromTridentTuple(input);
+
+logger.debug("update key[" + key + "] redisKey[" + 
redisKey + "] value[" + value + "]");
+
+jedisCluster.set(redisKey, value);
--- End diff --

good idea, thx


> add storm-redis to storm external
> -
>
> Key: STORM-609
> URL: https://issues.apache.org/jira/browse/STORM-609
> Project: Apache Storm
>  Issue Type: New Feature
>Affects Versions: 0.10.0
>Reporter: DashengJu
>Assignee: DashengJu
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-609] Add storm-redis to storm external

2015-01-05 Thread dashengju
Github user dashengju commented on a diff in the pull request:

https://github.com/apache/storm/pull/365#discussion_r22454671
  
--- Diff: 
external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java
 ---
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident.state;
+
+import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseStateUpdater;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+public class RedisClusterStateUpdater extends 
BaseStateUpdater {
+private static final Logger logger = 
LoggerFactory.getLogger(RedisClusterState.class);
+
+private static final long DEFAULT_EXPIRE_INTERVAL_MS = 8640;
+
+private final String redisKeyPrefix;
+private final TridentTupleMapper tupleMapper;
+private final long expireIntervalMs;
+
+public RedisClusterStateUpdater(String redisKeyPrefix, 
TridentTupleMapper tupleMapper, long expireIntervalMs) {
+this.redisKeyPrefix = redisKeyPrefix;
+this.tupleMapper = tupleMapper;
+if (expireIntervalMs > 0) {
+this.expireIntervalMs = expireIntervalMs;
+} else {
+this.expireIntervalMs = DEFAULT_EXPIRE_INTERVAL_MS;
+}
+}
+
+@Override
+public void updateState(RedisClusterState redisClusterState, 
List inputs,
+TridentCollector collector) {
+long expireAt = System.currentTimeMillis() + expireIntervalMs;
+
+JedisCluster jedisCluster = null;
+try {
+jedisCluster = redisClusterState.getJedisCluster();
+for (TridentTuple input : inputs) {
+String key = 
this.tupleMapper.getKeyFromTridentTuple(input);
+String redisKey = key;
+if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) 
{
+redisKey = redisKeyPrefix + redisKey;
+}
+String value = 
this.tupleMapper.getValueFromTridentTuple(input);
+
+logger.debug("update key[" + key + "] redisKey[" + 
redisKey + "] value[" + value + "]");
+
+jedisCluster.set(redisKey, value);
--- End diff --

good idea, thx


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-609) add storm-redis to storm external

2015-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14264453#comment-14264453
 ] 

ASF GitHub Bot commented on STORM-609:
--

Github user dashengju commented on a diff in the pull request:

https://github.com/apache/storm/pull/365#discussion_r22454036
  
--- Diff: 
external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
 ---
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident.state;
+
+import backtype.storm.tuple.Values;
+import com.google.common.collect.Lists;
+import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseQueryFunction;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+/**
--- End diff --

thx


> add storm-redis to storm external
> -
>
> Key: STORM-609
> URL: https://issues.apache.org/jira/browse/STORM-609
> Project: Apache Storm
>  Issue Type: New Feature
>Affects Versions: 0.10.0
>Reporter: DashengJu
>Assignee: DashengJu
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-609] Add storm-redis to storm external

2015-01-05 Thread dashengju
Github user dashengju commented on a diff in the pull request:

https://github.com/apache/storm/pull/365#discussion_r22454036
  
--- Diff: 
external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java
 ---
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.redis.trident.state;
+
+import backtype.storm.tuple.Values;
+import com.google.common.collect.Lists;
+import org.apache.storm.redis.trident.mapper.TridentTupleMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import redis.clients.jedis.JedisCluster;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseQueryFunction;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+/**
--- End diff --

thx


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---