[jira] [Commented] (STORM-822) As a storm developer I’d like to use the new kafka consumer API (0.8.3) to reduce dependencies and use long term supported kafka apis

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192754#comment-15192754
 ] 

ASF GitHub Bot commented on STORM-822:
--

Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1131#discussion_r55957034
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
 ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents the output streams associated with each topic, and provides 
a public API to
+ * declare output streams and emmit tuples, on the appropriate stream, for 
all the topics specified.
+ */
+public class KafkaSpoutStreams implements Serializable {
+private final Map topicToStream;
+
+private KafkaSpoutStreams(Builder builder) {
+this.topicToStream = builder.topicToStream;
+}
+
+/**
+ * @param topic the topic for which to get output fields
+ * @return the output fields declared
+ */
+public Fields getOutputFields(String topic) {
+if (topicToStream.containsKey(topic)) {
+return topicToStream.get(topic).getOutputFields();
+}
+throw new IllegalStateException(this.getClass().getName() + " not 
configured for topic: " + topic);
+}
+
+/**
+ * @param topic the topic to for which to get the stream id
+ * @return the id of the stream to where the tuples are emitted
+ */
+public String getStreamId(String topic) {
+if (topicToStream.containsKey(topic)) {
+return topicToStream.get(topic).getStreamId();
+}
+throw new IllegalStateException(this.getClass().getName() + " not 
configured for topic: " + topic);
+}
+
+/**
+ * @return list of topics subscribed and emitting tuples to a stream 
as configured by {@link KafkaSpoutStream}
+ */
+public List getTopics() {
+return new ArrayList<>(topicToStream.keySet());
+}
+
+void declareOutputFields(OutputFieldsDeclarer declarer) {
+for (KafkaSpoutStream stream : topicToStream.values()) {
+declarer.declareStream(stream.getStreamId(), 
stream.getOutputFields());
+}
+}
+
+void emit(SpoutOutputCollector collector, MessageId messageId) {
+collector.emit(getStreamId(messageId.topic()), 
messageId.getTuple(), messageId);
--- End diff --

older consumer didn't call the fetch again till the _waitingToEmit queue 
became empty. And records were only stored in _waitingToEmit. Here however, 
they are also stored as a part of message id. The message id in turn is stored 
in two data structures - 
1. pending map of spout - It has an upper ceiling of 
topology.max.spout.pending * sizeof(message id) bytes
2. acked map - This stores the message ids that have been acked. It is 
cleared on a time basis and doesn't have any upper ceiling on number of entries 
per TopicPartition.
@hmcl @harshach Please let me know if above two are valid inferences. 
Nonetheless, I am +1 on resolving this discussion later without blocking the 
PR. 


> As a storm developer I’d like to use the new kafka consumer API (0.8.3) to 
> reduce dependencies and use long term supported kafka apis 
> --
>
> Key: STORM-8

[GitHub] storm pull request: STORM-822: Kafka Spout New Consumer API

2016-03-13 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1131#discussion_r55957034
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
 ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents the output streams associated with each topic, and provides 
a public API to
+ * declare output streams and emmit tuples, on the appropriate stream, for 
all the topics specified.
+ */
+public class KafkaSpoutStreams implements Serializable {
+private final Map topicToStream;
+
+private KafkaSpoutStreams(Builder builder) {
+this.topicToStream = builder.topicToStream;
+}
+
+/**
+ * @param topic the topic for which to get output fields
+ * @return the output fields declared
+ */
+public Fields getOutputFields(String topic) {
+if (topicToStream.containsKey(topic)) {
+return topicToStream.get(topic).getOutputFields();
+}
+throw new IllegalStateException(this.getClass().getName() + " not 
configured for topic: " + topic);
+}
+
+/**
+ * @param topic the topic to for which to get the stream id
+ * @return the id of the stream to where the tuples are emitted
+ */
+public String getStreamId(String topic) {
+if (topicToStream.containsKey(topic)) {
+return topicToStream.get(topic).getStreamId();
+}
+throw new IllegalStateException(this.getClass().getName() + " not 
configured for topic: " + topic);
+}
+
+/**
+ * @return list of topics subscribed and emitting tuples to a stream 
as configured by {@link KafkaSpoutStream}
+ */
+public List getTopics() {
+return new ArrayList<>(topicToStream.keySet());
+}
+
+void declareOutputFields(OutputFieldsDeclarer declarer) {
+for (KafkaSpoutStream stream : topicToStream.values()) {
+declarer.declareStream(stream.getStreamId(), 
stream.getOutputFields());
+}
+}
+
+void emit(SpoutOutputCollector collector, MessageId messageId) {
+collector.emit(getStreamId(messageId.topic()), 
messageId.getTuple(), messageId);
--- End diff --

older consumer didn't call the fetch again till the _waitingToEmit queue 
became empty. And records were only stored in _waitingToEmit. Here however, 
they are also stored as a part of message id. The message id in turn is stored 
in two data structures - 
1. pending map of spout - It has an upper ceiling of 
topology.max.spout.pending * sizeof(message id) bytes
2. acked map - This stores the message ids that have been acked. It is 
cleared on a time basis and doesn't have any upper ceiling on number of entries 
per TopicPartition.
@hmcl @harshach Please let me know if above two are valid inferences. 
Nonetheless, I am +1 on resolving this discussion later without blocking the 
PR. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Different topologies need different schedulers

2016-03-13 Thread devopts
Hi all,
I think it is a issue that all topologies only is scheduled by the same 
scheduler.
And it doesn't work when i set the "storm.scheduler" value in my topology,such 
as 
config.put(Config.STORM_SCHEDULE,xxx.xxx).
Now ,different topologies needs to be scheduled by different schedulers.
And how to solve the problem?

Re: Issues in Storm deployment

2016-03-13 Thread vibha goyal
It worked!!

Can you please help me about how to change the storm.yaml file ?


Thanks already!!


On Sun, Mar 13, 2016 at 5:55 PM, vibha goyal  wrote:

> Hi ,
>
> Thanks for prompt reply.
>
> But, I have to use on multi-node cluster, and changes in storm.yaml will
> be required.
>
> Will the below settings work?
>
>
>
> On Sun, Mar 13, 2016 at 4:15 PM, Harsha  wrote:
>
>>  Vibhay,
>>  You are using master branch which is under development to
>>  move to Java. This might be very unstable. I suggest you to
>>  use 1.x-branch in general as its getting ready for storm
>>  1.0 release. You don't need to add anything to storm.yaml ,
>>  defaults should be fine for a single node cluster.
>>
>> 1. git checkout 1.x-branch
>> 2. mvn -DskipTests clean install
>> 3. cd storm-dist/binary
>> 4. mvn clean package
>> 5. cd target
>> 6. find and unzip apache-storm-1.0.0-SNAPSHOT
>> 7. run required services
>>   bin/storm dev-zookeeper, bin/storm nimbus, bin/storm supervisor,
>>   bin/storm ui
>> 8. ./bin/storm jar
>> examples/storm-starter/storm-starter-topologies-1.0.0-SNAPSHOT.jar
>> org.apache.storm.starter.WordCountTopology wordcount
>>
>> For bit more details you can refer here
>> http://blog.harsha.io/setting-up-a-single-node-apache-storm-cluster/
>> its on a older version of storm but relevant for the latest as well.
>>
>> -Harsha
>>
>>
>>
>>
>> On Sun, Mar 13, 2016, at 01:43 PM, vibha goyal wrote:
>> > Hi,
>> >
>> > I am grad student, and I am working on Storm as a part of my course
>> > project.
>> >
>> > I have cloned the source code from git, and followed the instructions.
>> >
>> > I am using branch : 0.10.0
>> >
>> > After "mvn package", when I copy the apache-storm-.tar.gz in my
>> > home,
>> > untar it and try to run nimbus.
>> >
>> > I get error:
>> >
>> > Exception in thread "main" java.lang.ExceptionInInitializerError
>> > at
>> >
>> org.apache.storm.utils.ConfigUtils.readStormConfigImpl(ConfigUtils.java:138)
>> > at
>> > org.apache.storm.utils.ConfigUtils.readStormConfig(ConfigUtils.java:134)
>> > at org.apache.storm.command.ConfigValue.main(ConfigValue.java:27)
>> > Caused by: java.lang.RuntimeException: java.io.IOException: Found
>> > multiple
>> > storm.yaml resources. You're probably bundling the Storm jars with your
>> > topology jar.
>> >
>> [jar:file:/home/vgoyal5/apache-storm-2.0.0-SNAPSHOT/lib/storm-core-2.0.0-SNAPSHOT.jar!/storm.yaml,
>> > file:/home/vgoyal5/apache-storm-2.0.0-SNAPSHOT/conf/storm.yaml]
>> > at org.apache.storm.utils.Utils.findAndReadConfigFile(Utils.java:372)
>> > at org.apache.storm.utils.Utils.readStormConfig(Utils.java:456)
>> > at org.apache.storm.utils.Utils.(Utils.java:173)
>> > ... 3 more
>> >
>> >
>> > I deleted the
>> "/home/vgoyal5/apache-storm-2.0.0-SNAPSHOT/conf/storm.yaml"
>> > file and again tried.
>> >
>> > This time I was able to connect the nimbus and supervisors and could see
>> > the connections in UI.
>> >
>> > As soon as I tried to submit the topology with the command:
>> >
>> > storm jar examples/storm-starter/target/storm-starter-2.0.0-SNAPSHOT.jar
>> > org.apache.storm.starter.ExclamationTopology exclamation-topology
>> >
>> > I got error:
>> >
>> > org.apache.storm.utils.NimbusLeaderNotFoundException: Could not find
>> > leader
>> > nimbus from seed hosts [sp16-cs525-g05-01.cs.illinois.edu]. Did you
>> > specify
>> > a valid list of nimbus hosts for config nimbus.seeds?
>> >
>> > First ,I am not able to resolve this error,
>> >
>> > and second, I cannot make changes in storm.yaml as I deleted the
>> > conf/storm.yaml
>> >
>> > I am a beginner in Storm. I have been trying this for a day now. Any
>> help
>> > would be appreciated!!
>> >
>> >
>> >
>> > Thanks!
>> >
>> > Vibha Goyal
>>
>
>


Re: Issues in Storm deployment

2016-03-13 Thread vibha goyal
Hi ,

Thanks for prompt reply.

But, I have to use on multi-node cluster, and changes in storm.yaml will be
required.

Will the below settings work?



On Sun, Mar 13, 2016 at 4:15 PM, Harsha  wrote:

>  Vibhay,
>  You are using master branch which is under development to
>  move to Java. This might be very unstable. I suggest you to
>  use 1.x-branch in general as its getting ready for storm
>  1.0 release. You don't need to add anything to storm.yaml ,
>  defaults should be fine for a single node cluster.
>
> 1. git checkout 1.x-branch
> 2. mvn -DskipTests clean install
> 3. cd storm-dist/binary
> 4. mvn clean package
> 5. cd target
> 6. find and unzip apache-storm-1.0.0-SNAPSHOT
> 7. run required services
>   bin/storm dev-zookeeper, bin/storm nimbus, bin/storm supervisor,
>   bin/storm ui
> 8. ./bin/storm jar
> examples/storm-starter/storm-starter-topologies-1.0.0-SNAPSHOT.jar
> org.apache.storm.starter.WordCountTopology wordcount
>
> For bit more details you can refer here
> http://blog.harsha.io/setting-up-a-single-node-apache-storm-cluster/
> its on a older version of storm but relevant for the latest as well.
>
> -Harsha
>
>
>
>
> On Sun, Mar 13, 2016, at 01:43 PM, vibha goyal wrote:
> > Hi,
> >
> > I am grad student, and I am working on Storm as a part of my course
> > project.
> >
> > I have cloned the source code from git, and followed the instructions.
> >
> > I am using branch : 0.10.0
> >
> > After "mvn package", when I copy the apache-storm-.tar.gz in my
> > home,
> > untar it and try to run nimbus.
> >
> > I get error:
> >
> > Exception in thread "main" java.lang.ExceptionInInitializerError
> > at
> >
> org.apache.storm.utils.ConfigUtils.readStormConfigImpl(ConfigUtils.java:138)
> > at
> > org.apache.storm.utils.ConfigUtils.readStormConfig(ConfigUtils.java:134)
> > at org.apache.storm.command.ConfigValue.main(ConfigValue.java:27)
> > Caused by: java.lang.RuntimeException: java.io.IOException: Found
> > multiple
> > storm.yaml resources. You're probably bundling the Storm jars with your
> > topology jar.
> >
> [jar:file:/home/vgoyal5/apache-storm-2.0.0-SNAPSHOT/lib/storm-core-2.0.0-SNAPSHOT.jar!/storm.yaml,
> > file:/home/vgoyal5/apache-storm-2.0.0-SNAPSHOT/conf/storm.yaml]
> > at org.apache.storm.utils.Utils.findAndReadConfigFile(Utils.java:372)
> > at org.apache.storm.utils.Utils.readStormConfig(Utils.java:456)
> > at org.apache.storm.utils.Utils.(Utils.java:173)
> > ... 3 more
> >
> >
> > I deleted the "/home/vgoyal5/apache-storm-2.0.0-SNAPSHOT/conf/storm.yaml"
> > file and again tried.
> >
> > This time I was able to connect the nimbus and supervisors and could see
> > the connections in UI.
> >
> > As soon as I tried to submit the topology with the command:
> >
> > storm jar examples/storm-starter/target/storm-starter-2.0.0-SNAPSHOT.jar
> > org.apache.storm.starter.ExclamationTopology exclamation-topology
> >
> > I got error:
> >
> > org.apache.storm.utils.NimbusLeaderNotFoundException: Could not find
> > leader
> > nimbus from seed hosts [sp16-cs525-g05-01.cs.illinois.edu]. Did you
> > specify
> > a valid list of nimbus hosts for config nimbus.seeds?
> >
> > First ,I am not able to resolve this error,
> >
> > and second, I cannot make changes in storm.yaml as I deleted the
> > conf/storm.yaml
> >
> > I am a beginner in Storm. I have been trying this for a day now. Any help
> > would be appreciated!!
> >
> >
> >
> > Thanks!
> >
> > Vibha Goyal
>


[jira] [Commented] (STORM-1235) port backtype.storm.security.auth.ReqContext-test to java

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192534#comment-15192534
 ] 

ASF GitHub Bot commented on STORM-1235:
---

Github user harshach commented on the pull request:

https://github.com/apache/storm/pull/1200#issuecomment-196055599
  
@abhishekagarwal87 ok makes sense confused by the PR title.


> port  backtype.storm.security.auth.ReqContext-test to java
> --
>
> Key: STORM-1235
> URL: https://issues.apache.org/jira/browse/STORM-1235
> Project: Apache Storm
>  Issue Type: New Feature
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: Abhishek Agarwal
>  Labels: java-migration, jstorm-merger
>
> junit test migration



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: Tests - STORM-1235, STORM-1236, STORM-1237, ST...

2016-03-13 Thread harshach
Github user harshach commented on the pull request:

https://github.com/apache/storm/pull/1200#issuecomment-196055599
  
@abhishekagarwal87 ok makes sense confused by the PR title.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: Issues in Storm deployment

2016-03-13 Thread Harsha
 Vibhay,
 You are using master branch which is under development to
 move to Java. This might be very unstable. I suggest you to
 use 1.x-branch in general as its getting ready for storm
 1.0 release. You don't need to add anything to storm.yaml ,
 defaults should be fine for a single node cluster.

1. git checkout 1.x-branch
2. mvn -DskipTests clean install
3. cd storm-dist/binary
4. mvn clean package
5. cd target
6. find and unzip apache-storm-1.0.0-SNAPSHOT
7. run required services
  bin/storm dev-zookeeper, bin/storm nimbus, bin/storm supervisor,
  bin/storm ui
8. ./bin/storm jar
examples/storm-starter/storm-starter-topologies-1.0.0-SNAPSHOT.jar
org.apache.storm.starter.WordCountTopology wordcount

For bit more details you can refer here
http://blog.harsha.io/setting-up-a-single-node-apache-storm-cluster/
its on a older version of storm but relevant for the latest as well.

-Harsha




On Sun, Mar 13, 2016, at 01:43 PM, vibha goyal wrote:
> Hi,
> 
> I am grad student, and I am working on Storm as a part of my course
> project.
> 
> I have cloned the source code from git, and followed the instructions.
> 
> I am using branch : 0.10.0
> 
> After "mvn package", when I copy the apache-storm-.tar.gz in my
> home,
> untar it and try to run nimbus.
> 
> I get error:
> 
> Exception in thread "main" java.lang.ExceptionInInitializerError
> at
> org.apache.storm.utils.ConfigUtils.readStormConfigImpl(ConfigUtils.java:138)
> at
> org.apache.storm.utils.ConfigUtils.readStormConfig(ConfigUtils.java:134)
> at org.apache.storm.command.ConfigValue.main(ConfigValue.java:27)
> Caused by: java.lang.RuntimeException: java.io.IOException: Found
> multiple
> storm.yaml resources. You're probably bundling the Storm jars with your
> topology jar.
> [jar:file:/home/vgoyal5/apache-storm-2.0.0-SNAPSHOT/lib/storm-core-2.0.0-SNAPSHOT.jar!/storm.yaml,
> file:/home/vgoyal5/apache-storm-2.0.0-SNAPSHOT/conf/storm.yaml]
> at org.apache.storm.utils.Utils.findAndReadConfigFile(Utils.java:372)
> at org.apache.storm.utils.Utils.readStormConfig(Utils.java:456)
> at org.apache.storm.utils.Utils.(Utils.java:173)
> ... 3 more
> 
> 
> I deleted the "/home/vgoyal5/apache-storm-2.0.0-SNAPSHOT/conf/storm.yaml"
> file and again tried.
> 
> This time I was able to connect the nimbus and supervisors and could see
> the connections in UI.
> 
> As soon as I tried to submit the topology with the command:
> 
> storm jar examples/storm-starter/target/storm-starter-2.0.0-SNAPSHOT.jar
> org.apache.storm.starter.ExclamationTopology exclamation-topology
> 
> I got error:
> 
> org.apache.storm.utils.NimbusLeaderNotFoundException: Could not find
> leader
> nimbus from seed hosts [sp16-cs525-g05-01.cs.illinois.edu]. Did you
> specify
> a valid list of nimbus hosts for config nimbus.seeds?
> 
> First ,I am not able to resolve this error,
> 
> and second, I cannot make changes in storm.yaml as I deleted the
> conf/storm.yaml
> 
> I am a beginner in Storm. I have been trying this for a day now. Any help
> would be appreciated!!
> 
> 
> 
> Thanks!
> 
> Vibha Goyal


Issues in Storm deployment

2016-03-13 Thread vibha goyal
Hi,

I am grad student, and I am working on Storm as a part of my course project.

I have cloned the source code from git, and followed the instructions.

I am using branch : 0.10.0

After "mvn package", when I copy the apache-storm-.tar.gz in my
home,
untar it and try to run nimbus.

I get error:

Exception in thread "main" java.lang.ExceptionInInitializerError
at
org.apache.storm.utils.ConfigUtils.readStormConfigImpl(ConfigUtils.java:138)
at org.apache.storm.utils.ConfigUtils.readStormConfig(ConfigUtils.java:134)
at org.apache.storm.command.ConfigValue.main(ConfigValue.java:27)
Caused by: java.lang.RuntimeException: java.io.IOException: Found multiple
storm.yaml resources. You're probably bundling the Storm jars with your
topology jar.
[jar:file:/home/vgoyal5/apache-storm-2.0.0-SNAPSHOT/lib/storm-core-2.0.0-SNAPSHOT.jar!/storm.yaml,
file:/home/vgoyal5/apache-storm-2.0.0-SNAPSHOT/conf/storm.yaml]
at org.apache.storm.utils.Utils.findAndReadConfigFile(Utils.java:372)
at org.apache.storm.utils.Utils.readStormConfig(Utils.java:456)
at org.apache.storm.utils.Utils.(Utils.java:173)
... 3 more


I deleted the "/home/vgoyal5/apache-storm-2.0.0-SNAPSHOT/conf/storm.yaml"
file and again tried.

This time I was able to connect the nimbus and supervisors and could see
the connections in UI.

As soon as I tried to submit the topology with the command:

storm jar examples/storm-starter/target/storm-starter-2.0.0-SNAPSHOT.jar
org.apache.storm.starter.ExclamationTopology exclamation-topology

I got error:

org.apache.storm.utils.NimbusLeaderNotFoundException: Could not find leader
nimbus from seed hosts [sp16-cs525-g05-01.cs.illinois.edu]. Did you specify
a valid list of nimbus hosts for config nimbus.seeds?

First ,I am not able to resolve this error,

and second, I cannot make changes in storm.yaml as I deleted the
conf/storm.yaml

I am a beginner in Storm. I have been trying this for a day now. Any help
would be appreciated!!



Thanks!

Vibha Goyal


[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192518#comment-15192518
 ] 

ASF GitHub Bot commented on STORM-1279:
---

Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55942961
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java ---
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.ProcessSimulator;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.localizer.LocalResource;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URLDecoder;
+import java.util.*;
+
+public class SupervisorUtils {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SupervisorUtils.class);
+
+private static final SupervisorUtils INSTANCE = new SupervisorUtils();
+private static SupervisorUtils _instance = INSTANCE;
+public static void setInstance(SupervisorUtils u) {
+_instance = u;
+}
+public static void resetInstance() {
+_instance = INSTANCE;
+}
+
+public static Process workerLauncher(Map conf, String user, 
List args, Map environment, final String logPreFix,
+final Utils.ExitCodeCallable exitCodeCallback, File dir) 
throws IOException {
+if (StringUtils.isBlank(user)) {
+throw new IllegalArgumentException("User cannot be blank when 
calling workerLauncher.");
+}
+String wlinitial = (String) 
(conf.get(Config.SUPERVISOR_WORKER_LAUNCHER));
+String stormHome = 
ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
+String wl;
+if (StringUtils.isNotBlank(wlinitial)) {
+wl = wlinitial;
+} else {
+wl = stormHome + "/bin/worker-launcher";
+}
+List commands = new ArrayList<>();
+commands.add(wl);
+commands.add(user);
+commands.addAll(args);
+LOG.info("Running as user: {} command: {}", user, commands);
+return Utils.launchProcess(commands, environment, logPreFix, 
exitCodeCallback, dir);
+}
+
+public static int workerLauncherAndWait(Map conf, String user, 
List args, final Map environment, final String 
logPreFix)
+throws IOException {
+int ret = 0;
+Process process = workerLauncher(conf, user, args, environment, 
logPreFix, null, null);
+if (StringUtils.isNotBlank(logPreFix))
+Utils.readAndLogStream(logPreFix, process.getInputStream());
+try {
+process.waitFor();
+} catch (InterruptedException e) {
+LOG.info("{} interrupted.", logPreFix);
+}
+ret = process.exitValue();
+return ret;
+}
+
+public static void setupStormCodeDir(Map conf, Map stormConf, String 
dir) throws IOException {
+if 
(Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+String logPrefix = "setup conf for " + dir;
+List commands = new ArrayList<>();
+commands.add("code-dir");
+commands.add(dir);
+workerLauncherAndWait(conf, (String) 
(stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null

[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-13 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55942961
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java ---
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.ProcessSimulator;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.localizer.LocalResource;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URLDecoder;
+import java.util.*;
+
+public class SupervisorUtils {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SupervisorUtils.class);
+
+private static final SupervisorUtils INSTANCE = new SupervisorUtils();
+private static SupervisorUtils _instance = INSTANCE;
+public static void setInstance(SupervisorUtils u) {
+_instance = u;
+}
+public static void resetInstance() {
+_instance = INSTANCE;
+}
+
+public static Process workerLauncher(Map conf, String user, 
List args, Map environment, final String logPreFix,
+final Utils.ExitCodeCallable exitCodeCallback, File dir) 
throws IOException {
+if (StringUtils.isBlank(user)) {
+throw new IllegalArgumentException("User cannot be blank when 
calling workerLauncher.");
+}
+String wlinitial = (String) 
(conf.get(Config.SUPERVISOR_WORKER_LAUNCHER));
+String stormHome = 
ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
+String wl;
+if (StringUtils.isNotBlank(wlinitial)) {
+wl = wlinitial;
+} else {
+wl = stormHome + "/bin/worker-launcher";
+}
+List commands = new ArrayList<>();
+commands.add(wl);
+commands.add(user);
+commands.addAll(args);
+LOG.info("Running as user: {} command: {}", user, commands);
+return Utils.launchProcess(commands, environment, logPreFix, 
exitCodeCallback, dir);
+}
+
+public static int workerLauncherAndWait(Map conf, String user, 
List args, final Map environment, final String 
logPreFix)
+throws IOException {
+int ret = 0;
+Process process = workerLauncher(conf, user, args, environment, 
logPreFix, null, null);
+if (StringUtils.isNotBlank(logPreFix))
+Utils.readAndLogStream(logPreFix, process.getInputStream());
+try {
+process.waitFor();
+} catch (InterruptedException e) {
+LOG.info("{} interrupted.", logPreFix);
+}
+ret = process.exitValue();
+return ret;
+}
+
+public static void setupStormCodeDir(Map conf, Map stormConf, String 
dir) throws IOException {
+if 
(Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+String logPrefix = "setup conf for " + dir;
+List commands = new ArrayList<>();
+commands.add("code-dir");
+commands.add(dir);
+workerLauncherAndWait(conf, (String) 
(stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix);
+}
+}
+
+public static void rmrAsUser(Map conf, String id, String path) throws 
IOException {
+String user = Utils.getFileOwner(path);
+String logPreFix = "rmr " + id;
+List 

[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192514#comment-15192514
 ] 

ASF GitHub Bot commented on STORM-1279:
---

Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55942826
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java ---
@@ -0,0 +1,672 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. kill 
the ones that should be dead - read pids, kill -9 and individually remove file 
- rmr
+ * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. 
of the rest, figure out what assignments aren't yet satisfied 4. generate new 
worker
+ * ids, write new "approved workers" to LS 5. create local dir for worker 
id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for 
workers
+ * launch
+ */
+public class SyncProcessEvent implements Runnable {
+
+private static Logger LOG = 
LoggerFactory.getLogger(SyncProcessEvent.class);
+
+private  LocalState localState;
+private  SupervisorData supervisorData;
+public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new 
ExecutorInfo(-1, -1);
+
+private class ProcessExitCallback implements Utils.ExitCodeCallable {
+private final String logPrefix;
+private final String workerId;
+
+public ProcessExitCallback(String logPrefix, String workerId) {
+this.logPrefix = logPrefix;
+this.workerId = workerId;
+}
+
+@Override
+public Object call() throws Exception {
+return null;
+}
+
+@Override
+public Object call(int exitCode) {
+LOG.info("{} exited with code: {}", logPrefix, exitCode);
+supervisorData.getDeadWorkers().add(workerId);
+return null;
+}
+}
+
+public SyncProcessEvent(){
+
+}
+public SyncProcessEvent(SupervisorData supervisorData) {
+init(supervisorData);
+}
+
+//TODO: initData is intended to local supervisor, so we will remove 
them after porting worker.clj to java
+public void init(SupervisorData supervisorData){
+this.supervisorData = supervisorData;
+this.localState = supervisorData.getLocalState();
+}
+
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. 
kill the ones that should be dead - read pids, kill -9 and individually remove 
file -
+ * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and 
log) 3. of the rest, figure out what assignments aren't yet satisfied 4. 
generate new
+ * worker ids, write new "approved workers" to LS 5. create local dir 
for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 
6. wait
+ * for workers launch

[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192512#comment-15192512
 ] 

ASF GitHub Bot commented on STORM-1279:
---

Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55942812
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java ---
@@ -0,0 +1,672 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. kill 
the ones that should be dead - read pids, kill -9 and individually remove file 
- rmr
+ * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. 
of the rest, figure out what assignments aren't yet satisfied 4. generate new 
worker
+ * ids, write new "approved workers" to LS 5. create local dir for worker 
id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for 
workers
+ * launch
+ */
+public class SyncProcessEvent implements Runnable {
+
+private static Logger LOG = 
LoggerFactory.getLogger(SyncProcessEvent.class);
+
+private  LocalState localState;
+private  SupervisorData supervisorData;
+public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new 
ExecutorInfo(-1, -1);
+
+private class ProcessExitCallback implements Utils.ExitCodeCallable {
+private final String logPrefix;
+private final String workerId;
+
+public ProcessExitCallback(String logPrefix, String workerId) {
+this.logPrefix = logPrefix;
+this.workerId = workerId;
+}
+
+@Override
+public Object call() throws Exception {
+return null;
+}
+
+@Override
+public Object call(int exitCode) {
+LOG.info("{} exited with code: {}", logPrefix, exitCode);
+supervisorData.getDeadWorkers().add(workerId);
+return null;
+}
+}
+
+public SyncProcessEvent(){
+
+}
+public SyncProcessEvent(SupervisorData supervisorData) {
+init(supervisorData);
+}
+
+//TODO: initData is intended to local supervisor, so we will remove 
them after porting worker.clj to java
+public void init(SupervisorData supervisorData){
+this.supervisorData = supervisorData;
+this.localState = supervisorData.getLocalState();
+}
+
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. 
kill the ones that should be dead - read pids, kill -9 and individually remove 
file -
+ * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and 
log) 3. of the rest, figure out what assignments aren't yet satisfied 4. 
generate new
+ * worker ids, write new "approved workers" to LS 5. create local dir 
for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 
6. wait
+ * for workers launch

[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-13 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55942818
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java ---
@@ -0,0 +1,672 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. kill 
the ones that should be dead - read pids, kill -9 and individually remove file 
- rmr
+ * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. 
of the rest, figure out what assignments aren't yet satisfied 4. generate new 
worker
+ * ids, write new "approved workers" to LS 5. create local dir for worker 
id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for 
workers
+ * launch
+ */
+public class SyncProcessEvent implements Runnable {
+
+private static Logger LOG = 
LoggerFactory.getLogger(SyncProcessEvent.class);
+
+private  LocalState localState;
+private  SupervisorData supervisorData;
+public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new 
ExecutorInfo(-1, -1);
+
+private class ProcessExitCallback implements Utils.ExitCodeCallable {
+private final String logPrefix;
+private final String workerId;
+
+public ProcessExitCallback(String logPrefix, String workerId) {
+this.logPrefix = logPrefix;
+this.workerId = workerId;
+}
+
+@Override
+public Object call() throws Exception {
+return null;
+}
+
+@Override
+public Object call(int exitCode) {
+LOG.info("{} exited with code: {}", logPrefix, exitCode);
+supervisorData.getDeadWorkers().add(workerId);
+return null;
+}
+}
+
+public SyncProcessEvent(){
+
+}
+public SyncProcessEvent(SupervisorData supervisorData) {
+init(supervisorData);
+}
+
+//TODO: initData is intended to local supervisor, so we will remove 
them after porting worker.clj to java
+public void init(SupervisorData supervisorData){
+this.supervisorData = supervisorData;
+this.localState = supervisorData.getLocalState();
+}
+
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. 
kill the ones that should be dead - read pids, kill -9 and individually remove 
file -
+ * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and 
log) 3. of the rest, figure out what assignments aren't yet satisfied 4. 
generate new
+ * worker ids, write new "approved workers" to LS 5. create local dir 
for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 
6. wait
+ * for workers launch
+ */
+@Override
+public void run() {
+LOG.debug("Syncing processes");
+try {
+Map conf = supervisorData.getConf();
+Map assignedExecutors = 
localState.getLocalAssignmentsMap()

[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-13 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55942826
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java ---
@@ -0,0 +1,672 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. kill 
the ones that should be dead - read pids, kill -9 and individually remove file 
- rmr
+ * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. 
of the rest, figure out what assignments aren't yet satisfied 4. generate new 
worker
+ * ids, write new "approved workers" to LS 5. create local dir for worker 
id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for 
workers
+ * launch
+ */
+public class SyncProcessEvent implements Runnable {
+
+private static Logger LOG = 
LoggerFactory.getLogger(SyncProcessEvent.class);
+
+private  LocalState localState;
+private  SupervisorData supervisorData;
+public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new 
ExecutorInfo(-1, -1);
+
+private class ProcessExitCallback implements Utils.ExitCodeCallable {
+private final String logPrefix;
+private final String workerId;
+
+public ProcessExitCallback(String logPrefix, String workerId) {
+this.logPrefix = logPrefix;
+this.workerId = workerId;
+}
+
+@Override
+public Object call() throws Exception {
+return null;
+}
+
+@Override
+public Object call(int exitCode) {
+LOG.info("{} exited with code: {}", logPrefix, exitCode);
+supervisorData.getDeadWorkers().add(workerId);
+return null;
+}
+}
+
+public SyncProcessEvent(){
+
+}
+public SyncProcessEvent(SupervisorData supervisorData) {
+init(supervisorData);
+}
+
+//TODO: initData is intended to local supervisor, so we will remove 
them after porting worker.clj to java
+public void init(SupervisorData supervisorData){
+this.supervisorData = supervisorData;
+this.localState = supervisorData.getLocalState();
+}
+
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. 
kill the ones that should be dead - read pids, kill -9 and individually remove 
file -
+ * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and 
log) 3. of the rest, figure out what assignments aren't yet satisfied 4. 
generate new
+ * worker ids, write new "approved workers" to LS 5. create local dir 
for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 
6. wait
+ * for workers launch
+ */
+@Override
+public void run() {
+LOG.debug("Syncing processes");
+try {
+Map conf = supervisorData.getConf();
+Map assignedExecutors = 
localState.getLocalAssignmentsMap()

[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192513#comment-15192513
 ] 

ASF GitHub Bot commented on STORM-1279:
---

Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55942818
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java ---
@@ -0,0 +1,672 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. kill 
the ones that should be dead - read pids, kill -9 and individually remove file 
- rmr
+ * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. 
of the rest, figure out what assignments aren't yet satisfied 4. generate new 
worker
+ * ids, write new "approved workers" to LS 5. create local dir for worker 
id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for 
workers
+ * launch
+ */
+public class SyncProcessEvent implements Runnable {
+
+private static Logger LOG = 
LoggerFactory.getLogger(SyncProcessEvent.class);
+
+private  LocalState localState;
+private  SupervisorData supervisorData;
+public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new 
ExecutorInfo(-1, -1);
+
+private class ProcessExitCallback implements Utils.ExitCodeCallable {
+private final String logPrefix;
+private final String workerId;
+
+public ProcessExitCallback(String logPrefix, String workerId) {
+this.logPrefix = logPrefix;
+this.workerId = workerId;
+}
+
+@Override
+public Object call() throws Exception {
+return null;
+}
+
+@Override
+public Object call(int exitCode) {
+LOG.info("{} exited with code: {}", logPrefix, exitCode);
+supervisorData.getDeadWorkers().add(workerId);
+return null;
+}
+}
+
+public SyncProcessEvent(){
+
+}
+public SyncProcessEvent(SupervisorData supervisorData) {
+init(supervisorData);
+}
+
+//TODO: initData is intended to local supervisor, so we will remove 
them after porting worker.clj to java
+public void init(SupervisorData supervisorData){
+this.supervisorData = supervisorData;
+this.localState = supervisorData.getLocalState();
+}
+
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. 
kill the ones that should be dead - read pids, kill -9 and individually remove 
file -
+ * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and 
log) 3. of the rest, figure out what assignments aren't yet satisfied 4. 
generate new
+ * worker ids, write new "approved workers" to LS 5. create local dir 
for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 
6. wait
+ * for workers launch

[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-13 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55942812
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java ---
@@ -0,0 +1,672 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. kill 
the ones that should be dead - read pids, kill -9 and individually remove file 
- rmr
+ * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. 
of the rest, figure out what assignments aren't yet satisfied 4. generate new 
worker
+ * ids, write new "approved workers" to LS 5. create local dir for worker 
id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for 
workers
+ * launch
+ */
+public class SyncProcessEvent implements Runnable {
+
+private static Logger LOG = 
LoggerFactory.getLogger(SyncProcessEvent.class);
+
+private  LocalState localState;
+private  SupervisorData supervisorData;
+public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new 
ExecutorInfo(-1, -1);
+
+private class ProcessExitCallback implements Utils.ExitCodeCallable {
+private final String logPrefix;
+private final String workerId;
+
+public ProcessExitCallback(String logPrefix, String workerId) {
+this.logPrefix = logPrefix;
+this.workerId = workerId;
+}
+
+@Override
+public Object call() throws Exception {
+return null;
+}
+
+@Override
+public Object call(int exitCode) {
+LOG.info("{} exited with code: {}", logPrefix, exitCode);
+supervisorData.getDeadWorkers().add(workerId);
+return null;
+}
+}
+
+public SyncProcessEvent(){
+
+}
+public SyncProcessEvent(SupervisorData supervisorData) {
+init(supervisorData);
+}
+
+//TODO: initData is intended to local supervisor, so we will remove 
them after porting worker.clj to java
+public void init(SupervisorData supervisorData){
+this.supervisorData = supervisorData;
+this.localState = supervisorData.getLocalState();
+}
+
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. 
kill the ones that should be dead - read pids, kill -9 and individually remove 
file -
+ * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and 
log) 3. of the rest, figure out what assignments aren't yet satisfied 4. 
generate new
+ * worker ids, write new "approved workers" to LS 5. create local dir 
for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 
6. wait
+ * for workers launch
+ */
+@Override
+public void run() {
+LOG.debug("Syncing processes");
+try {
+Map conf = supervisorData.getConf();
+Map assignedExecutors = 
localState.getLocalAssignmentsMap()

[jira] [Commented] (STORM-822) As a storm developer I’d like to use the new kafka consumer API (0.8.3) to reduce dependencies and use long term supported kafka apis

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192509#comment-15192509
 ] 

ASF GitHub Bot commented on STORM-822:
--

Github user harshach commented on a diff in the pull request:

https://github.com/apache/storm/pull/1131#discussion_r55942709
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
 ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents the output streams associated with each topic, and provides 
a public API to
+ * declare output streams and emmit tuples, on the appropriate stream, for 
all the topics specified.
+ */
+public class KafkaSpoutStreams implements Serializable {
+private final Map topicToStream;
+
+private KafkaSpoutStreams(Builder builder) {
+this.topicToStream = builder.topicToStream;
+}
+
+/**
+ * @param topic the topic for which to get output fields
+ * @return the output fields declared
+ */
+public Fields getOutputFields(String topic) {
+if (topicToStream.containsKey(topic)) {
+return topicToStream.get(topic).getOutputFields();
+}
+throw new IllegalStateException(this.getClass().getName() + " not 
configured for topic: " + topic);
+}
+
+/**
+ * @param topic the topic to for which to get the stream id
+ * @return the id of the stream to where the tuples are emitted
+ */
+public String getStreamId(String topic) {
+if (topicToStream.containsKey(topic)) {
+return topicToStream.get(topic).getStreamId();
+}
+throw new IllegalStateException(this.getClass().getName() + " not 
configured for topic: " + topic);
+}
+
+/**
+ * @return list of topics subscribed and emitting tuples to a stream 
as configured by {@link KafkaSpoutStream}
+ */
+public List getTopics() {
+return new ArrayList<>(topicToStream.keySet());
+}
+
+void declareOutputFields(OutputFieldsDeclarer declarer) {
+for (KafkaSpoutStream stream : topicToStream.values()) {
+declarer.declareStream(stream.getStreamId(), 
stream.getOutputFields());
+}
+}
+
+void emit(SpoutOutputCollector collector, MessageId messageId) {
+collector.emit(getStreamId(messageId.topic()), 
messageId.getTuple(), messageId);
--- End diff --

@abhishekagarwal87 Yes with new consumer api this is limited by 
max.partition.fetch.bytes , so the max will be no.of partitions x 
max.partition.fetch.bytes. Given that we ask users to set the parallelism of 
kafkaSpout to that of no.of partitions in a topic.  So at max it will be 
partitions * max.partition.fetch.bytes in a single instance of kafka spout and 
in avg it wll max.partition.fetch.bytes per instance of kafka spout ( if users 
configures parallelism == topic.partitions)


> As a storm developer I’d like to use the new kafka consumer API (0.8.3) to 
> reduce dependencies and use long term supported kafka apis 
> --
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
>  Issue Type: Story
>  Components: storm-kafka
>Reporter: Thomas Becker
>A

[GitHub] storm pull request: STORM-822: Kafka Spout New Consumer API

2016-03-13 Thread harshach
Github user harshach commented on a diff in the pull request:

https://github.com/apache/storm/pull/1131#discussion_r55942709
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
 ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents the output streams associated with each topic, and provides 
a public API to
+ * declare output streams and emmit tuples, on the appropriate stream, for 
all the topics specified.
+ */
+public class KafkaSpoutStreams implements Serializable {
+private final Map topicToStream;
+
+private KafkaSpoutStreams(Builder builder) {
+this.topicToStream = builder.topicToStream;
+}
+
+/**
+ * @param topic the topic for which to get output fields
+ * @return the output fields declared
+ */
+public Fields getOutputFields(String topic) {
+if (topicToStream.containsKey(topic)) {
+return topicToStream.get(topic).getOutputFields();
+}
+throw new IllegalStateException(this.getClass().getName() + " not 
configured for topic: " + topic);
+}
+
+/**
+ * @param topic the topic to for which to get the stream id
+ * @return the id of the stream to where the tuples are emitted
+ */
+public String getStreamId(String topic) {
+if (topicToStream.containsKey(topic)) {
+return topicToStream.get(topic).getStreamId();
+}
+throw new IllegalStateException(this.getClass().getName() + " not 
configured for topic: " + topic);
+}
+
+/**
+ * @return list of topics subscribed and emitting tuples to a stream 
as configured by {@link KafkaSpoutStream}
+ */
+public List getTopics() {
+return new ArrayList<>(topicToStream.keySet());
+}
+
+void declareOutputFields(OutputFieldsDeclarer declarer) {
+for (KafkaSpoutStream stream : topicToStream.values()) {
+declarer.declareStream(stream.getStreamId(), 
stream.getOutputFields());
+}
+}
+
+void emit(SpoutOutputCollector collector, MessageId messageId) {
+collector.emit(getStreamId(messageId.topic()), 
messageId.getTuple(), messageId);
--- End diff --

@abhishekagarwal87 Yes with new consumer api this is limited by 
max.partition.fetch.bytes , so the max will be no.of partitions x 
max.partition.fetch.bytes. Given that we ask users to set the parallelism of 
kafkaSpout to that of no.of partitions in a topic.  So at max it will be 
partitions * max.partition.fetch.bytes in a single instance of kafka spout and 
in avg it wll max.partition.fetch.bytes per instance of kafka spout ( if users 
configures parallelism == topic.partitions)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-822: Kafka Spout New Consumer API

2016-03-13 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1131#discussion_r55942561
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
 ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents the output streams associated with each topic, and provides 
a public API to
+ * declare output streams and emmit tuples, on the appropriate stream, for 
all the topics specified.
+ */
+public class KafkaSpoutStreams implements Serializable {
+private final Map topicToStream;
+
+private KafkaSpoutStreams(Builder builder) {
+this.topicToStream = builder.topicToStream;
+}
+
+/**
+ * @param topic the topic for which to get output fields
+ * @return the output fields declared
+ */
+public Fields getOutputFields(String topic) {
+if (topicToStream.containsKey(topic)) {
+return topicToStream.get(topic).getOutputFields();
+}
+throw new IllegalStateException(this.getClass().getName() + " not 
configured for topic: " + topic);
+}
+
+/**
+ * @param topic the topic to for which to get the stream id
+ * @return the id of the stream to where the tuples are emitted
+ */
+public String getStreamId(String topic) {
+if (topicToStream.containsKey(topic)) {
+return topicToStream.get(topic).getStreamId();
+}
+throw new IllegalStateException(this.getClass().getName() + " not 
configured for topic: " + topic);
+}
+
+/**
+ * @return list of topics subscribed and emitting tuples to a stream 
as configured by {@link KafkaSpoutStream}
+ */
+public List getTopics() {
+return new ArrayList<>(topicToStream.keySet());
+}
+
+void declareOutputFields(OutputFieldsDeclarer declarer) {
+for (KafkaSpoutStream stream : topicToStream.values()) {
+declarer.declareStream(stream.getStreamId(), 
stream.getOutputFields());
+}
+}
+
+void emit(SpoutOutputCollector collector, MessageId messageId) {
+collector.emit(getStreamId(messageId.topic()), 
messageId.getTuple(), messageId);
--- End diff --

if my understanding is correct, that too is bounded by 
KafkaConfig.fetchSizeBytes (1 MB by default). And they are not stored in buffer 
for long, but emitted at the rate, nextTuple is called. In this code, message 
ids are not cleared from the state after the ack. They are removed only when 
the commit timer expires. so now message ids will be stored for longer in 
memory. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-822) As a storm developer I’d like to use the new kafka consumer API (0.8.3) to reduce dependencies and use long term supported kafka apis

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192505#comment-15192505
 ] 

ASF GitHub Bot commented on STORM-822:
--

Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1131#discussion_r55942561
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
 ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents the output streams associated with each topic, and provides 
a public API to
+ * declare output streams and emmit tuples, on the appropriate stream, for 
all the topics specified.
+ */
+public class KafkaSpoutStreams implements Serializable {
+private final Map topicToStream;
+
+private KafkaSpoutStreams(Builder builder) {
+this.topicToStream = builder.topicToStream;
+}
+
+/**
+ * @param topic the topic for which to get output fields
+ * @return the output fields declared
+ */
+public Fields getOutputFields(String topic) {
+if (topicToStream.containsKey(topic)) {
+return topicToStream.get(topic).getOutputFields();
+}
+throw new IllegalStateException(this.getClass().getName() + " not 
configured for topic: " + topic);
+}
+
+/**
+ * @param topic the topic to for which to get the stream id
+ * @return the id of the stream to where the tuples are emitted
+ */
+public String getStreamId(String topic) {
+if (topicToStream.containsKey(topic)) {
+return topicToStream.get(topic).getStreamId();
+}
+throw new IllegalStateException(this.getClass().getName() + " not 
configured for topic: " + topic);
+}
+
+/**
+ * @return list of topics subscribed and emitting tuples to a stream 
as configured by {@link KafkaSpoutStream}
+ */
+public List getTopics() {
+return new ArrayList<>(topicToStream.keySet());
+}
+
+void declareOutputFields(OutputFieldsDeclarer declarer) {
+for (KafkaSpoutStream stream : topicToStream.values()) {
+declarer.declareStream(stream.getStreamId(), 
stream.getOutputFields());
+}
+}
+
+void emit(SpoutOutputCollector collector, MessageId messageId) {
+collector.emit(getStreamId(messageId.topic()), 
messageId.getTuple(), messageId);
--- End diff --

if my understanding is correct, that too is bounded by 
KafkaConfig.fetchSizeBytes (1 MB by default). And they are not stored in buffer 
for long, but emitted at the rate, nextTuple is called. In this code, message 
ids are not cleared from the state after the ack. They are removed only when 
the commit timer expires. so now message ids will be stored for longer in 
memory. 


> As a storm developer I’d like to use the new kafka consumer API (0.8.3) to 
> reduce dependencies and use long term supported kafka apis 
> --
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
>  Issue Type: Story
>  Components: storm-kafka
>Reporter: Thomas Becker
>Assignee: Hugo Louro
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-956) When the execute() or nextTuple() hang on external resources, stop the Worker's heartbeat

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192494#comment-15192494
 ] 

ASF GitHub Bot commented on STORM-956:
--

Github user srdo commented on the pull request:

https://github.com/apache/storm/pull/1209#issuecomment-196028832
  
I don't want this merged yet, just posted to get feedback :)


> When the execute() or nextTuple() hang on external resources, stop the 
> Worker's heartbeat
> -
>
> Key: STORM-956
> URL: https://issues.apache.org/jira/browse/STORM-956
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Chuanlei Ni
>Assignee: Chuanlei Ni
>Priority: Minor
>   Original Estimate: 6h
>  Remaining Estimate: 6h
>
> Sometimes the work threads produced by mk-threads in executor.clj hang on 
> external resources or other unknown reasons. This makes the workers stop 
> processing the tuples.  I think it is better to kill this worker to resolve 
> the "hang". I plan to :
> 1. like `setup-ticks`, send a system-tick to receive-queue
> 2. the tuple-action-fn deal with this system-tick and remember the time that 
> processes this tuple in the executor-data
> 3. when worker do local heartbeat, check the time the executor writes to 
> executor-data. If the time is long from current (for example, 3 minutes), the 
> worker does not do the heartbeat.  So the supervisor could deal with this 
> problem.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-956: When the execute() or nextTuple() h...

2016-03-13 Thread srdo
Github user srdo commented on the pull request:

https://github.com/apache/storm/pull/1209#issuecomment-196028832
  
I don't want this merged yet, just posted to get feedback :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-956: When the execute() or nextTuple() h...

2016-03-13 Thread srdo
GitHub user srdo opened a pull request:

https://github.com/apache/storm/pull/1209

STORM-956: When the execute() or nextTuple() hang on external resources, 
stop the Worker's heartbeat

The previous PR at https://github.com/apache/storm/pull/647 doesn't look 
active anymore. Having Storm tell you which components are backing up would 
still be a nice feature to have.

I've taken a look at implementing the suggestions from the previous PR, but 
I have a few questions.

The previous discussion seemed to point toward shutting down the worker 
when an executor is hanging. I'm guessing there's no nice way to just restart 
the hanging executors? Is it sufficient to call shutdown on the worker object 
from do-executor-heartbeats?

I'm not really sure what Constants/SYSTEM_EXECUTOR_ID is for? Should it be 
ignored when checking for hanging executors?

I'm hoping to add the zookeeper/metrics logging and shutdown functionality 
soon if this PR looks like it's going in the right direction.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/srdo/storm STORM-956

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1209.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1209


commit c0d1c4ef6ae0d1e144f5af85174d68d5a93eb06a
Author: chuanlei 
Date:   2015-07-22T07:37:28Z

stop worker heartbeat, when the executor threads hang-on

commit 16980a3e4e015865348afee7661157cc9a21525a
Author: chuanlei 
Date:   2015-07-22T08:55:39Z

add the setup-check! to mk-threads

commit 9884c578fe8fa85197b1e5d4118598425160bb3f
Author: Stig Døssing 
Date:   2016-03-13T14:57:27Z

Merge branch 'master' of https://github.com/apache/storm into STORM-956

commit 9dd030396b0d921f25c5269e17c58b649387211d
Author: Stig Døssing 
Date:   2016-03-13T18:58:29Z

STORM-956: Add support for warning about hanging executors




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-956) When the execute() or nextTuple() hang on external resources, stop the Worker's heartbeat

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192484#comment-15192484
 ] 

ASF GitHub Bot commented on STORM-956:
--

GitHub user srdo opened a pull request:

https://github.com/apache/storm/pull/1209

STORM-956: When the execute() or nextTuple() hang on external resources, 
stop the Worker's heartbeat

The previous PR at https://github.com/apache/storm/pull/647 doesn't look 
active anymore. Having Storm tell you which components are backing up would 
still be a nice feature to have.

I've taken a look at implementing the suggestions from the previous PR, but 
I have a few questions.

The previous discussion seemed to point toward shutting down the worker 
when an executor is hanging. I'm guessing there's no nice way to just restart 
the hanging executors? Is it sufficient to call shutdown on the worker object 
from do-executor-heartbeats?

I'm not really sure what Constants/SYSTEM_EXECUTOR_ID is for? Should it be 
ignored when checking for hanging executors?

I'm hoping to add the zookeeper/metrics logging and shutdown functionality 
soon if this PR looks like it's going in the right direction.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/srdo/storm STORM-956

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1209.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1209


commit c0d1c4ef6ae0d1e144f5af85174d68d5a93eb06a
Author: chuanlei 
Date:   2015-07-22T07:37:28Z

stop worker heartbeat, when the executor threads hang-on

commit 16980a3e4e015865348afee7661157cc9a21525a
Author: chuanlei 
Date:   2015-07-22T08:55:39Z

add the setup-check! to mk-threads

commit 9884c578fe8fa85197b1e5d4118598425160bb3f
Author: Stig Døssing 
Date:   2016-03-13T14:57:27Z

Merge branch 'master' of https://github.com/apache/storm into STORM-956

commit 9dd030396b0d921f25c5269e17c58b649387211d
Author: Stig Døssing 
Date:   2016-03-13T18:58:29Z

STORM-956: Add support for warning about hanging executors




> When the execute() or nextTuple() hang on external resources, stop the 
> Worker's heartbeat
> -
>
> Key: STORM-956
> URL: https://issues.apache.org/jira/browse/STORM-956
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Chuanlei Ni
>Assignee: Chuanlei Ni
>Priority: Minor
>   Original Estimate: 6h
>  Remaining Estimate: 6h
>
> Sometimes the work threads produced by mk-threads in executor.clj hang on 
> external resources or other unknown reasons. This makes the workers stop 
> processing the tuples.  I think it is better to kill this worker to resolve 
> the "hang". I plan to :
> 1. like `setup-ticks`, send a system-tick to receive-queue
> 2. the tuple-action-fn deal with this system-tick and remember the time that 
> processes this tuple in the executor-data
> 3. when worker do local heartbeat, check the time the executor writes to 
> executor-data. If the time is long from current (for example, 3 minutes), the 
> worker does not do the heartbeat.  So the supervisor could deal with this 
> problem.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (STORM-1030) Hive Connector Fixes

2016-03-13 Thread Sriharsha Chintalapani (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-1030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani updated STORM-1030:
--
Priority: Blocker  (was: Major)

> Hive Connector Fixes
> 
>
> Key: STORM-1030
> URL: https://issues.apache.org/jira/browse/STORM-1030
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-hive
>Reporter: Sriharsha Chintalapani
>Assignee: Sriharsha Chintalapani
>Priority: Blocker
> Fix For: 1.0.0
>
>
> 1. Schedule Hive transaction heartbeats outside of execute method.
> 2. Fix retiring idleWriters
> 3. Do not call flush if there is no data added to a txnbatch
> 4. Catch any exception and abort transaction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (STORM-1604) Delayed transition should handle NotALeaderException

2016-03-13 Thread Sriharsha Chintalapani (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-1604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani updated STORM-1604:
--
Priority: Blocker  (was: Major)

> Delayed transition should handle NotALeaderException
> 
>
> Key: STORM-1604
> URL: https://issues.apache.org/jira/browse/STORM-1604
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-core
>Affects Versions: 1.0.0
>Reporter: Parth Brahmbhatt
>Assignee: Parth Brahmbhatt
>Priority: Blocker
> Fix For: 1.0.0
>
>
> Currently if an action(kill, rebalance) is scheduled with delay, nimbus 
> stores the state in zookeeper and then schedules a delayed event to do final 
> transition. If during this wait time, leader nimbus loses the leadership, 
> when the delayed operation is executed it receives a NotALeaderException 
> which it does not handle causing the nimbus to die. We should catch the 
> exception and  ignore it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-822) As a storm developer I’d like to use the new kafka consumer API (0.8.3) to reduce dependencies and use long term supported kafka apis

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192462#comment-15192462
 ] 

ASF GitHub Bot commented on STORM-822:
--

Github user harshach commented on a diff in the pull request:

https://github.com/apache/storm/pull/1131#discussion_r55941248
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
 ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents the output streams associated with each topic, and provides 
a public API to
+ * declare output streams and emmit tuples, on the appropriate stream, for 
all the topics specified.
+ */
+public class KafkaSpoutStreams implements Serializable {
+private final Map topicToStream;
+
+private KafkaSpoutStreams(Builder builder) {
+this.topicToStream = builder.topicToStream;
+}
+
+/**
+ * @param topic the topic for which to get output fields
+ * @return the output fields declared
+ */
+public Fields getOutputFields(String topic) {
+if (topicToStream.containsKey(topic)) {
+return topicToStream.get(topic).getOutputFields();
+}
+throw new IllegalStateException(this.getClass().getName() + " not 
configured for topic: " + topic);
+}
+
+/**
+ * @param topic the topic to for which to get the stream id
+ * @return the id of the stream to where the tuples are emitted
+ */
+public String getStreamId(String topic) {
+if (topicToStream.containsKey(topic)) {
+return topicToStream.get(topic).getStreamId();
+}
+throw new IllegalStateException(this.getClass().getName() + " not 
configured for topic: " + topic);
+}
+
+/**
+ * @return list of topics subscribed and emitting tuples to a stream 
as configured by {@link KafkaSpoutStream}
+ */
+public List getTopics() {
+return new ArrayList<>(topicToStream.keySet());
+}
+
+void declareOutputFields(OutputFieldsDeclarer declarer) {
+for (KafkaSpoutStream stream : topicToStream.values()) {
+declarer.declareStream(stream.getStreamId(), 
stream.getOutputFields());
+}
+}
+
+void emit(SpoutOutputCollector collector, MessageId messageId) {
+collector.emit(getStreamId(messageId.topic()), 
messageId.getTuple(), messageId);
--- End diff --

@abhishekagarwal87 it does keep both message and offset 
https://github.com/apache/storm/blob/master/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java#L54


> As a storm developer I’d like to use the new kafka consumer API (0.8.3) to 
> reduce dependencies and use long term supported kafka apis 
> --
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
>  Issue Type: Story
>  Components: storm-kafka
>Reporter: Thomas Becker
>Assignee: Hugo Louro
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-822: Kafka Spout New Consumer API

2016-03-13 Thread harshach
Github user harshach commented on a diff in the pull request:

https://github.com/apache/storm/pull/1131#discussion_r55941248
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
 ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents the output streams associated with each topic, and provides 
a public API to
+ * declare output streams and emmit tuples, on the appropriate stream, for 
all the topics specified.
+ */
+public class KafkaSpoutStreams implements Serializable {
+private final Map topicToStream;
+
+private KafkaSpoutStreams(Builder builder) {
+this.topicToStream = builder.topicToStream;
+}
+
+/**
+ * @param topic the topic for which to get output fields
+ * @return the output fields declared
+ */
+public Fields getOutputFields(String topic) {
+if (topicToStream.containsKey(topic)) {
+return topicToStream.get(topic).getOutputFields();
+}
+throw new IllegalStateException(this.getClass().getName() + " not 
configured for topic: " + topic);
+}
+
+/**
+ * @param topic the topic to for which to get the stream id
+ * @return the id of the stream to where the tuples are emitted
+ */
+public String getStreamId(String topic) {
+if (topicToStream.containsKey(topic)) {
+return topicToStream.get(topic).getStreamId();
+}
+throw new IllegalStateException(this.getClass().getName() + " not 
configured for topic: " + topic);
+}
+
+/**
+ * @return list of topics subscribed and emitting tuples to a stream 
as configured by {@link KafkaSpoutStream}
+ */
+public List getTopics() {
+return new ArrayList<>(topicToStream.keySet());
+}
+
+void declareOutputFields(OutputFieldsDeclarer declarer) {
+for (KafkaSpoutStream stream : topicToStream.values()) {
+declarer.declareStream(stream.getStreamId(), 
stream.getOutputFields());
+}
+}
+
+void emit(SpoutOutputCollector collector, MessageId messageId) {
+collector.emit(getStreamId(messageId.topic()), 
messageId.getTuple(), messageId);
--- End diff --

@abhishekagarwal87 it does keep both message and offset 
https://github.com/apache/storm/blob/master/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java#L54


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192457#comment-15192457
 ] 

ASF GitHub Bot commented on STORM-676:
--

Github user harshach commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r55941142
  
--- Diff: 
external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
 ---
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.hbase.trident.windowing;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.storm.trident.windowing.WindowsStore;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+
+import java.util.Map;
+
+public class HBaseWindowsStoreFactory implements WindowsStoreFactory {
+private final Map config;
+private final String tableName;
+private final byte[] family;
+private final byte[] qualifier;
+
+public HBaseWindowsStoreFactory(Map config, String 
tableName, byte[] family, byte[] qualifier) {
--- End diff --

@satishd isn't table name, cf are specific to Hbase. In that case a config 
method can take a map and do its validation and this validation will specific 
to HbaseWindowStoreFactory


> Storm Trident support for sliding/tumbling windows
> --
>
> Key: STORM-676
> URL: https://issues.apache.org/jira/browse/STORM-676
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Sriharsha Chintalapani
>Assignee: Satish Duggana
> Fix For: 1.0.0, 2.0.0
>
> Attachments: StormTrident_windowing_support-676.pdf
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-13 Thread harshach
Github user harshach commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r55941142
  
--- Diff: 
external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
 ---
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.hbase.trident.windowing;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.storm.trident.windowing.WindowsStore;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+
+import java.util.Map;
+
+public class HBaseWindowsStoreFactory implements WindowsStoreFactory {
+private final Map config;
+private final String tableName;
+private final byte[] family;
+private final byte[] qualifier;
+
+public HBaseWindowsStoreFactory(Map config, String 
tableName, byte[] family, byte[] qualifier) {
--- End diff --

@satishd isn't table name, cf are specific to Hbase. In that case a config 
method can take a map and do its validation and this validation will specific 
to HbaseWindowStoreFactory


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192456#comment-15192456
 ] 

ASF GitHub Bot commented on STORM-676:
--

Github user harshach commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r55941127
  
--- Diff: 
external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
 ---
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.hbase.trident.windowing;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.storm.trident.windowing.WindowsStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * This class stores entries into hbase instance of the given 
configuration.
+ *
+ */
+public class HBaseWindowsStore implements WindowsStore {
+private static final Logger log = 
LoggerFactory.getLogger(HBaseWindowsStore.class);
+public static final String UTF_8 = "utf-8";
+
+private final ThreadLocal threadLocalHtable;
+private Queue htables = new ConcurrentLinkedQueue<>();
+private final byte[] family;
+private final byte[] qualifier;
+
+public HBaseWindowsStore(final Configuration config, final String 
tableName, byte[] family, byte[] qualifier) {
+this.family = family;
+this.qualifier = qualifier;
+
+threadLocalHtable = new ThreadLocal() {
+@Override
+protected HTable initialValue() {
+try {
+HTable hTable = new HTable(config, tableName);
+htables.add(hTable);
+return hTable;
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+}
+};
+
+}
+
+private HTable htable() {
+return threadLocalHtable.get();
+}
+
+private byte[] effectiveKey(String key) {
+try {
+return key.getBytes(UTF_8);
+} catch (UnsupportedEncodingException e) {
+throw new RuntimeException(e);
+}
+}
+
+@Override
+public Object get(String key) {
+WindowsStore.Entry.nonNullCheckForKey(key);
+
+byte[] effectiveKey = effectiveKey(key);
+Get get = new Get(effectiveKey);
+Result result = null;
+try {
+result = htable().get(get);
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+
+if(result.isEmpty()) {
+return null;
+}
+
+Kryo kryo = new Kryo();
+Input input = new Input(result.getValue(family, qualifier));
+Object resultObject = kryo.readClassAndObject(input);
+return resultObject;
+
+}

[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-13 Thread harshach
Github user harshach commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r55941127
  
--- Diff: 
external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStore.java
 ---
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.hbase.trident.windowing;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.storm.trident.windowing.WindowsStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * This class stores entries into hbase instance of the given 
configuration.
+ *
+ */
+public class HBaseWindowsStore implements WindowsStore {
+private static final Logger log = 
LoggerFactory.getLogger(HBaseWindowsStore.class);
+public static final String UTF_8 = "utf-8";
+
+private final ThreadLocal threadLocalHtable;
+private Queue htables = new ConcurrentLinkedQueue<>();
+private final byte[] family;
+private final byte[] qualifier;
+
+public HBaseWindowsStore(final Configuration config, final String 
tableName, byte[] family, byte[] qualifier) {
+this.family = family;
+this.qualifier = qualifier;
+
+threadLocalHtable = new ThreadLocal() {
+@Override
+protected HTable initialValue() {
+try {
+HTable hTable = new HTable(config, tableName);
+htables.add(hTable);
+return hTable;
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+}
+};
+
+}
+
+private HTable htable() {
+return threadLocalHtable.get();
+}
+
+private byte[] effectiveKey(String key) {
+try {
+return key.getBytes(UTF_8);
+} catch (UnsupportedEncodingException e) {
+throw new RuntimeException(e);
+}
+}
+
+@Override
+public Object get(String key) {
+WindowsStore.Entry.nonNullCheckForKey(key);
+
+byte[] effectiveKey = effectiveKey(key);
+Get get = new Get(effectiveKey);
+Result result = null;
+try {
+result = htable().get(get);
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+
+if(result.isEmpty()) {
+return null;
+}
+
+Kryo kryo = new Kryo();
+Input input = new Input(result.getValue(family, qualifier));
+Object resultObject = kryo.readClassAndObject(input);
+return resultObject;
+
+}
+
+@Override
+public Iterable get(List keys) {
+List gets = new ArrayList<>();
+for (String key : keys) {
+WindowsStore.Entry.nonNullCheckForKey(key);
+
+byte[] effectiveKe

[jira] [Commented] (STORM-886) Automatic Back Pressure

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192403#comment-15192403
 ] 

ASF GitHub Bot commented on STORM-886:
--

Github user acommuni commented on the pull request:

https://github.com/apache/storm/pull/700#issuecomment-195988246
  
In case external system is down, it could be interesting to pause the Spout 
for an amount of time. The state "OPEN" of the CB could be directly linked to 
the spout. I don't know if the implementation of the back pressure is 
manageable through public API. But it would be a nice enhancement to be able to 
implement Circuit Breaker algorithm with bolt and spout. 

Error count and timeout generated by request to external system can be 
detected by the bolt using those external component and could be propagated to 
the spout. In case the amount of consecutive error reaches a defined value the 
spout could be paused by the bolt for an amout of time (CLOSE to OPEN state). 
After sleeping the spout is considered in HALF OPEN state. If new error occurs 
spout sleeps for another amount of time else it goes to CLOSE state and 
continue to read new tuples.

Being able to use Circuit breaker framework like Hystrix could be a nice 
enhancement of the back pressure feature.

https://github.com/Netflix/Hystrix
https://github.com/Netflix/Hystrix/wiki/How-it-Works




> Automatic Back Pressure
> ---
>
> Key: STORM-886
> URL: https://issues.apache.org/jira/browse/STORM-886
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: Zhuo Liu
> Fix For: 1.0.0
>
> Attachments: aSimpleExampleOfBackpressure.png, backpressure.png
>
>
> This new feature is aimed for automatic flow control through the topology DAG 
> since different components may have unmatched tuple processing speed. 
> Currently, the tuples may get dropped if the downstream components can not 
> process as quickly, thereby causing a waste of network bandwidth and 
> processing capability. In addition, it is difficult to tune the 
> max.spout.pending parameter for best backpressure performance. Another big 
> motivation is that using max.spout.pending for flow control forces users to 
> enable acking, which does not make sense.
> Therefore, an automatic back pressure scheme is highly desirable. 
> Heron proposed a form of back pressure that  does not rely on acking or max 
> spout pending.  Instead spouts throttle not only when max.spout.pending is 
> hit, but also if any bolt has gone over a high water mark in their input 
> queue, and has not yet gone below a low water mark again.  There is a lot of 
> room for potential improvement here around control theory and having spouts 
> only respond to downstream bolts backing up, but a simple bang-bang 
> controller like this is a great start.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-886] Automatic Back Pressure (ABP)

2016-03-13 Thread acommuni
Github user acommuni commented on the pull request:

https://github.com/apache/storm/pull/700#issuecomment-195988246
  
In case external system is down, it could be interesting to pause the Spout 
for an amount of time. The state "OPEN" of the CB could be directly linked to 
the spout. I don't know if the implementation of the back pressure is 
manageable through public API. But it would be a nice enhancement to be able to 
implement Circuit Breaker algorithm with bolt and spout. 

Error count and timeout generated by request to external system can be 
detected by the bolt using those external component and could be propagated to 
the spout. In case the amount of consecutive error reaches a defined value the 
spout could be paused by the bolt for an amout of time (CLOSE to OPEN state). 
After sleeping the spout is considered in HALF OPEN state. If new error occurs 
spout sleeps for another amount of time else it goes to CLOSE state and 
continue to read new tuples.

Being able to use Circuit breaker framework like Hystrix could be a nice 
enhancement of the back pressure feature.

https://github.com/Netflix/Hystrix
https://github.com/Netflix/Hystrix/wiki/How-it-Works




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-13 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55939157
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java ---
@@ -0,0 +1,672 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. kill 
the ones that should be dead - read pids, kill -9 and individually remove file 
- rmr
+ * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. 
of the rest, figure out what assignments aren't yet satisfied 4. generate new 
worker
+ * ids, write new "approved workers" to LS 5. create local dir for worker 
id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for 
workers
+ * launch
+ */
+public class SyncProcessEvent implements Runnable {
+
+private static Logger LOG = 
LoggerFactory.getLogger(SyncProcessEvent.class);
+
+private  LocalState localState;
+private  SupervisorData supervisorData;
+public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new 
ExecutorInfo(-1, -1);
+
+private class ProcessExitCallback implements Utils.ExitCodeCallable {
+private final String logPrefix;
+private final String workerId;
+
+public ProcessExitCallback(String logPrefix, String workerId) {
+this.logPrefix = logPrefix;
+this.workerId = workerId;
+}
+
+@Override
+public Object call() throws Exception {
+return null;
+}
+
+@Override
+public Object call(int exitCode) {
+LOG.info("{} exited with code: {}", logPrefix, exitCode);
+supervisorData.getDeadWorkers().add(workerId);
+return null;
+}
+}
+
+public SyncProcessEvent(){
+
+}
+public SyncProcessEvent(SupervisorData supervisorData) {
+init(supervisorData);
+}
+
+//TODO: initData is intended to local supervisor, so we will remove 
them after porting worker.clj to java
+public void init(SupervisorData supervisorData){
+this.supervisorData = supervisorData;
+this.localState = supervisorData.getLocalState();
+}
+
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. 
kill the ones that should be dead - read pids, kill -9 and individually remove 
file -
+ * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and 
log) 3. of the rest, figure out what assignments aren't yet satisfied 4. 
generate new
+ * worker ids, write new "approved workers" to LS 5. create local dir 
for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 
6. wait
+ * for workers launch
+ */
+@Override
+public void run() {
+LOG.debug("Syncing processes");
+try {
+Map conf = supervisorData.getConf();
+Map assignedExecutors = 
localState.getLocalAssignmentsMap()

[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192400#comment-15192400
 ] 

ASF GitHub Bot commented on STORM-1279:
---

Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55939157
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java ---
@@ -0,0 +1,672 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. kill 
the ones that should be dead - read pids, kill -9 and individually remove file 
- rmr
+ * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. 
of the rest, figure out what assignments aren't yet satisfied 4. generate new 
worker
+ * ids, write new "approved workers" to LS 5. create local dir for worker 
id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for 
workers
+ * launch
+ */
+public class SyncProcessEvent implements Runnable {
+
+private static Logger LOG = 
LoggerFactory.getLogger(SyncProcessEvent.class);
+
+private  LocalState localState;
+private  SupervisorData supervisorData;
+public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new 
ExecutorInfo(-1, -1);
+
+private class ProcessExitCallback implements Utils.ExitCodeCallable {
+private final String logPrefix;
+private final String workerId;
+
+public ProcessExitCallback(String logPrefix, String workerId) {
+this.logPrefix = logPrefix;
+this.workerId = workerId;
+}
+
+@Override
+public Object call() throws Exception {
+return null;
+}
+
+@Override
+public Object call(int exitCode) {
+LOG.info("{} exited with code: {}", logPrefix, exitCode);
+supervisorData.getDeadWorkers().add(workerId);
+return null;
+}
+}
+
+public SyncProcessEvent(){
+
+}
+public SyncProcessEvent(SupervisorData supervisorData) {
+init(supervisorData);
+}
+
+//TODO: initData is intended to local supervisor, so we will remove 
them after porting worker.clj to java
+public void init(SupervisorData supervisorData){
+this.supervisorData = supervisorData;
+this.localState = supervisorData.getLocalState();
+}
+
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. 
kill the ones that should be dead - read pids, kill -9 and individually remove 
file -
+ * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and 
log) 3. of the rest, figure out what assignments aren't yet satisfied 4. 
generate new
+ * worker ids, write new "approved workers" to LS 5. create local dir 
for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 
6. wait
+ * for workers launch

[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-13 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55939032
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java ---
@@ -0,0 +1,672 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. kill 
the ones that should be dead - read pids, kill -9 and individually remove file 
- rmr
+ * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. 
of the rest, figure out what assignments aren't yet satisfied 4. generate new 
worker
+ * ids, write new "approved workers" to LS 5. create local dir for worker 
id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for 
workers
+ * launch
+ */
+public class SyncProcessEvent implements Runnable {
+
+private static Logger LOG = 
LoggerFactory.getLogger(SyncProcessEvent.class);
+
+private  LocalState localState;
+private  SupervisorData supervisorData;
+public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new 
ExecutorInfo(-1, -1);
+
+private class ProcessExitCallback implements Utils.ExitCodeCallable {
+private final String logPrefix;
+private final String workerId;
+
+public ProcessExitCallback(String logPrefix, String workerId) {
+this.logPrefix = logPrefix;
+this.workerId = workerId;
+}
+
+@Override
+public Object call() throws Exception {
+return null;
+}
+
+@Override
+public Object call(int exitCode) {
+LOG.info("{} exited with code: {}", logPrefix, exitCode);
+supervisorData.getDeadWorkers().add(workerId);
+return null;
+}
+}
+
+public SyncProcessEvent(){
+
+}
+public SyncProcessEvent(SupervisorData supervisorData) {
+init(supervisorData);
+}
+
+//TODO: initData is intended to local supervisor, so we will remove 
them after porting worker.clj to java
+public void init(SupervisorData supervisorData){
+this.supervisorData = supervisorData;
+this.localState = supervisorData.getLocalState();
+}
+
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. 
kill the ones that should be dead - read pids, kill -9 and individually remove 
file -
+ * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and 
log) 3. of the rest, figure out what assignments aren't yet satisfied 4. 
generate new
+ * worker ids, write new "approved workers" to LS 5. create local dir 
for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 
6. wait
+ * for workers launch
+ */
+@Override
+public void run() {
+LOG.debug("Syncing processes");
+try {
+Map conf = supervisorData.getConf();
+Map assignedExecutors = 
localState.getLocalAssignmentsMap()

[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192396#comment-15192396
 ] 

ASF GitHub Bot commented on STORM-1279:
---

Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55939032
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java ---
@@ -0,0 +1,672 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. kill 
the ones that should be dead - read pids, kill -9 and individually remove file 
- rmr
+ * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. 
of the rest, figure out what assignments aren't yet satisfied 4. generate new 
worker
+ * ids, write new "approved workers" to LS 5. create local dir for worker 
id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for 
workers
+ * launch
+ */
+public class SyncProcessEvent implements Runnable {
+
+private static Logger LOG = 
LoggerFactory.getLogger(SyncProcessEvent.class);
+
+private  LocalState localState;
+private  SupervisorData supervisorData;
+public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new 
ExecutorInfo(-1, -1);
+
+private class ProcessExitCallback implements Utils.ExitCodeCallable {
+private final String logPrefix;
+private final String workerId;
+
+public ProcessExitCallback(String logPrefix, String workerId) {
+this.logPrefix = logPrefix;
+this.workerId = workerId;
+}
+
+@Override
+public Object call() throws Exception {
+return null;
+}
+
+@Override
+public Object call(int exitCode) {
+LOG.info("{} exited with code: {}", logPrefix, exitCode);
+supervisorData.getDeadWorkers().add(workerId);
+return null;
+}
+}
+
+public SyncProcessEvent(){
+
+}
+public SyncProcessEvent(SupervisorData supervisorData) {
+init(supervisorData);
+}
+
+//TODO: initData is intended to local supervisor, so we will remove 
them after porting worker.clj to java
+public void init(SupervisorData supervisorData){
+this.supervisorData = supervisorData;
+this.localState = supervisorData.getLocalState();
+}
+
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. 
kill the ones that should be dead - read pids, kill -9 and individually remove 
file -
+ * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and 
log) 3. of the rest, figure out what assignments aren't yet satisfied 4. 
generate new
+ * worker ids, write new "approved workers" to LS 5. create local dir 
for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 
6. wait
+ * for workers launch

[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-13 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55939026
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java ---
@@ -0,0 +1,672 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. kill 
the ones that should be dead - read pids, kill -9 and individually remove file 
- rmr
+ * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. 
of the rest, figure out what assignments aren't yet satisfied 4. generate new 
worker
+ * ids, write new "approved workers" to LS 5. create local dir for worker 
id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for 
workers
+ * launch
+ */
+public class SyncProcessEvent implements Runnable {
+
+private static Logger LOG = 
LoggerFactory.getLogger(SyncProcessEvent.class);
+
+private  LocalState localState;
+private  SupervisorData supervisorData;
+public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new 
ExecutorInfo(-1, -1);
+
+private class ProcessExitCallback implements Utils.ExitCodeCallable {
+private final String logPrefix;
+private final String workerId;
+
+public ProcessExitCallback(String logPrefix, String workerId) {
+this.logPrefix = logPrefix;
+this.workerId = workerId;
+}
+
+@Override
+public Object call() throws Exception {
+return null;
+}
+
+@Override
+public Object call(int exitCode) {
+LOG.info("{} exited with code: {}", logPrefix, exitCode);
+supervisorData.getDeadWorkers().add(workerId);
+return null;
+}
+}
+
+public SyncProcessEvent(){
+
+}
+public SyncProcessEvent(SupervisorData supervisorData) {
+init(supervisorData);
+}
+
+//TODO: initData is intended to local supervisor, so we will remove 
them after porting worker.clj to java
+public void init(SupervisorData supervisorData){
+this.supervisorData = supervisorData;
+this.localState = supervisorData.getLocalState();
+}
+
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. 
kill the ones that should be dead - read pids, kill -9 and individually remove 
file -
+ * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and 
log) 3. of the rest, figure out what assignments aren't yet satisfied 4. 
generate new
+ * worker ids, write new "approved workers" to LS 5. create local dir 
for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 
6. wait
+ * for workers launch
+ */
+@Override
+public void run() {
+LOG.debug("Syncing processes");
+try {
+Map conf = supervisorData.getConf();
+Map assignedExecutors = 
localState.getLocalAssignmentsMap()

[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192395#comment-15192395
 ] 

ASF GitHub Bot commented on STORM-1279:
---

Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55939026
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java ---
@@ -0,0 +1,672 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. kill 
the ones that should be dead - read pids, kill -9 and individually remove file 
- rmr
+ * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. 
of the rest, figure out what assignments aren't yet satisfied 4. generate new 
worker
+ * ids, write new "approved workers" to LS 5. create local dir for worker 
id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for 
workers
+ * launch
+ */
+public class SyncProcessEvent implements Runnable {
+
+private static Logger LOG = 
LoggerFactory.getLogger(SyncProcessEvent.class);
+
+private  LocalState localState;
+private  SupervisorData supervisorData;
+public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new 
ExecutorInfo(-1, -1);
+
+private class ProcessExitCallback implements Utils.ExitCodeCallable {
+private final String logPrefix;
+private final String workerId;
+
+public ProcessExitCallback(String logPrefix, String workerId) {
+this.logPrefix = logPrefix;
+this.workerId = workerId;
+}
+
+@Override
+public Object call() throws Exception {
+return null;
+}
+
+@Override
+public Object call(int exitCode) {
+LOG.info("{} exited with code: {}", logPrefix, exitCode);
+supervisorData.getDeadWorkers().add(workerId);
+return null;
+}
+}
+
+public SyncProcessEvent(){
+
+}
+public SyncProcessEvent(SupervisorData supervisorData) {
+init(supervisorData);
+}
+
+//TODO: initData is intended to local supervisor, so we will remove 
them after porting worker.clj to java
+public void init(SupervisorData supervisorData){
+this.supervisorData = supervisorData;
+this.localState = supervisorData.getLocalState();
+}
+
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. 
kill the ones that should be dead - read pids, kill -9 and individually remove 
file -
+ * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and 
log) 3. of the rest, figure out what assignments aren't yet satisfied 4. 
generate new
+ * worker ids, write new "approved workers" to LS 5. create local dir 
for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 
6. wait
+ * for workers launch

[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-13 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55938368
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java ---
@@ -0,0 +1,672 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. kill 
the ones that should be dead - read pids, kill -9 and individually remove file 
- rmr
+ * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. 
of the rest, figure out what assignments aren't yet satisfied 4. generate new 
worker
+ * ids, write new "approved workers" to LS 5. create local dir for worker 
id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for 
workers
+ * launch
+ */
+public class SyncProcessEvent implements Runnable {
+
+private static Logger LOG = 
LoggerFactory.getLogger(SyncProcessEvent.class);
+
+private  LocalState localState;
+private  SupervisorData supervisorData;
+public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new 
ExecutorInfo(-1, -1);
+
+private class ProcessExitCallback implements Utils.ExitCodeCallable {
+private final String logPrefix;
+private final String workerId;
+
+public ProcessExitCallback(String logPrefix, String workerId) {
+this.logPrefix = logPrefix;
+this.workerId = workerId;
+}
+
+@Override
+public Object call() throws Exception {
+return null;
+}
+
+@Override
+public Object call(int exitCode) {
+LOG.info("{} exited with code: {}", logPrefix, exitCode);
+supervisorData.getDeadWorkers().add(workerId);
+return null;
+}
+}
+
+public SyncProcessEvent(){
+
+}
+public SyncProcessEvent(SupervisorData supervisorData) {
+init(supervisorData);
+}
+
+//TODO: initData is intended to local supervisor, so we will remove 
them after porting worker.clj to java
+public void init(SupervisorData supervisorData){
+this.supervisorData = supervisorData;
+this.localState = supervisorData.getLocalState();
+}
+
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. 
kill the ones that should be dead - read pids, kill -9 and individually remove 
file -
+ * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and 
log) 3. of the rest, figure out what assignments aren't yet satisfied 4. 
generate new
+ * worker ids, write new "approved workers" to LS 5. create local dir 
for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 
6. wait
+ * for workers launch
+ */
+@Override
+public void run() {
+LOG.debug("Syncing processes");
--- End diff --

this can be removed. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on G

[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192382#comment-15192382
 ] 

ASF GitHub Bot commented on STORM-1279:
---

Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55938368
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java ---
@@ -0,0 +1,672 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. kill 
the ones that should be dead - read pids, kill -9 and individually remove file 
- rmr
+ * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. 
of the rest, figure out what assignments aren't yet satisfied 4. generate new 
worker
+ * ids, write new "approved workers" to LS 5. create local dir for worker 
id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for 
workers
+ * launch
+ */
+public class SyncProcessEvent implements Runnable {
+
+private static Logger LOG = 
LoggerFactory.getLogger(SyncProcessEvent.class);
+
+private  LocalState localState;
+private  SupervisorData supervisorData;
+public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new 
ExecutorInfo(-1, -1);
+
+private class ProcessExitCallback implements Utils.ExitCodeCallable {
+private final String logPrefix;
+private final String workerId;
+
+public ProcessExitCallback(String logPrefix, String workerId) {
+this.logPrefix = logPrefix;
+this.workerId = workerId;
+}
+
+@Override
+public Object call() throws Exception {
+return null;
+}
+
+@Override
+public Object call(int exitCode) {
+LOG.info("{} exited with code: {}", logPrefix, exitCode);
+supervisorData.getDeadWorkers().add(workerId);
+return null;
+}
+}
+
+public SyncProcessEvent(){
+
+}
+public SyncProcessEvent(SupervisorData supervisorData) {
+init(supervisorData);
+}
+
+//TODO: initData is intended to local supervisor, so we will remove 
them after porting worker.clj to java
+public void init(SupervisorData supervisorData){
+this.supervisorData = supervisorData;
+this.localState = supervisorData.getLocalState();
+}
+
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. 
kill the ones that should be dead - read pids, kill -9 and individually remove 
file -
+ * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and 
log) 3. of the rest, figure out what assignments aren't yet satisfied 4. 
generate new
+ * worker ids, write new "approved workers" to LS 5. create local dir 
for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 
6. wait
+ * for workers launch

[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192381#comment-15192381
 ] 

ASF GitHub Bot commented on STORM-1279:
---

Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55938351
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java ---
@@ -0,0 +1,672 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. kill 
the ones that should be dead - read pids, kill -9 and individually remove file 
- rmr
+ * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. 
of the rest, figure out what assignments aren't yet satisfied 4. generate new 
worker
+ * ids, write new "approved workers" to LS 5. create local dir for worker 
id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for 
workers
+ * launch
+ */
+public class SyncProcessEvent implements Runnable {
+
+private static Logger LOG = 
LoggerFactory.getLogger(SyncProcessEvent.class);
+
+private  LocalState localState;
+private  SupervisorData supervisorData;
+public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new 
ExecutorInfo(-1, -1);
+
+private class ProcessExitCallback implements Utils.ExitCodeCallable {
+private final String logPrefix;
+private final String workerId;
+
+public ProcessExitCallback(String logPrefix, String workerId) {
+this.logPrefix = logPrefix;
+this.workerId = workerId;
+}
+
+@Override
+public Object call() throws Exception {
+return null;
+}
+
+@Override
+public Object call(int exitCode) {
+LOG.info("{} exited with code: {}", logPrefix, exitCode);
+supervisorData.getDeadWorkers().add(workerId);
+return null;
+}
+}
+
+public SyncProcessEvent(){
+
+}
+public SyncProcessEvent(SupervisorData supervisorData) {
+init(supervisorData);
+}
+
+//TODO: initData is intended to local supervisor, so we will remove 
them after porting worker.clj to java
--- End diff --

is this a valid to-do?


> port backtype.storm.daemon.supervisor to java
> -
>
> Key: STORM-1279
> URL: https://issues.apache.org/jira/browse/STORM-1279
> Project: Apache Storm
>  Issue Type: New Feature
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: John Fang
>  Labels: java-migration, jstorm-merger
> Attachments: Discussion about supervisor.pdf
>
>
> https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor
>  as an example
> backtype.storm.event usage should be replaced with bu

[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-13 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55938351
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java ---
@@ -0,0 +1,672 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. kill 
the ones that should be dead - read pids, kill -9 and individually remove file 
- rmr
+ * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. 
of the rest, figure out what assignments aren't yet satisfied 4. generate new 
worker
+ * ids, write new "approved workers" to LS 5. create local dir for worker 
id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for 
workers
+ * launch
+ */
+public class SyncProcessEvent implements Runnable {
+
+private static Logger LOG = 
LoggerFactory.getLogger(SyncProcessEvent.class);
+
+private  LocalState localState;
+private  SupervisorData supervisorData;
+public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new 
ExecutorInfo(-1, -1);
+
+private class ProcessExitCallback implements Utils.ExitCodeCallable {
+private final String logPrefix;
+private final String workerId;
+
+public ProcessExitCallback(String logPrefix, String workerId) {
+this.logPrefix = logPrefix;
+this.workerId = workerId;
+}
+
+@Override
+public Object call() throws Exception {
+return null;
+}
+
+@Override
+public Object call(int exitCode) {
+LOG.info("{} exited with code: {}", logPrefix, exitCode);
+supervisorData.getDeadWorkers().add(workerId);
+return null;
+}
+}
+
+public SyncProcessEvent(){
+
+}
+public SyncProcessEvent(SupervisorData supervisorData) {
+init(supervisorData);
+}
+
+//TODO: initData is intended to local supervisor, so we will remove 
them after porting worker.clj to java
--- End diff --

is this a valid to-do?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192379#comment-15192379
 ] 

ASF GitHub Bot commented on STORM-1279:
---

Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55938337
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java ---
@@ -0,0 +1,672 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. kill 
the ones that should be dead - read pids, kill -9 and individually remove file 
- rmr
+ * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. 
of the rest, figure out what assignments aren't yet satisfied 4. generate new 
worker
+ * ids, write new "approved workers" to LS 5. create local dir for worker 
id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for 
workers
+ * launch
+ */
+public class SyncProcessEvent implements Runnable {
+
+private static Logger LOG = 
LoggerFactory.getLogger(SyncProcessEvent.class);
+
+private  LocalState localState;
+private  SupervisorData supervisorData;
+public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new 
ExecutorInfo(-1, -1);
+
+private class ProcessExitCallback implements Utils.ExitCodeCallable {
+private final String logPrefix;
+private final String workerId;
+
+public ProcessExitCallback(String logPrefix, String workerId) {
+this.logPrefix = logPrefix;
+this.workerId = workerId;
+}
+
+@Override
+public Object call() throws Exception {
+return null;
+}
+
+@Override
+public Object call(int exitCode) {
+LOG.info("{} exited with code: {}", logPrefix, exitCode);
+supervisorData.getDeadWorkers().add(workerId);
+return null;
+}
+}
+
+public SyncProcessEvent(){
+
+}
+public SyncProcessEvent(SupervisorData supervisorData) {
+init(supervisorData);
+}
+
+//TODO: initData is intended to local supervisor, so we will remove 
them after porting worker.clj to java
+public void init(SupervisorData supervisorData){
+this.supervisorData = supervisorData;
+this.localState = supervisorData.getLocalState();
+}
+
+
+/**
--- End diff --

same comment is at the class level also which can be removed.


> port backtype.storm.daemon.supervisor to java
> -
>
> Key: STORM-1279
> URL: https://issues.apache.org/jira/browse/STORM-1279
> Project: Apache Storm
>  Issue Type: New Feature
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: John Fang
>  Labels: java-migration, jstorm-merger

[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-13 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55938337
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java ---
@@ -0,0 +1,672 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. kill 
the ones that should be dead - read pids, kill -9 and individually remove file 
- rmr
+ * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. 
of the rest, figure out what assignments aren't yet satisfied 4. generate new 
worker
+ * ids, write new "approved workers" to LS 5. create local dir for worker 
id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for 
workers
+ * launch
+ */
+public class SyncProcessEvent implements Runnable {
+
+private static Logger LOG = 
LoggerFactory.getLogger(SyncProcessEvent.class);
+
+private  LocalState localState;
+private  SupervisorData supervisorData;
+public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new 
ExecutorInfo(-1, -1);
+
+private class ProcessExitCallback implements Utils.ExitCodeCallable {
+private final String logPrefix;
+private final String workerId;
+
+public ProcessExitCallback(String logPrefix, String workerId) {
+this.logPrefix = logPrefix;
+this.workerId = workerId;
+}
+
+@Override
+public Object call() throws Exception {
+return null;
+}
+
+@Override
+public Object call(int exitCode) {
+LOG.info("{} exited with code: {}", logPrefix, exitCode);
+supervisorData.getDeadWorkers().add(workerId);
+return null;
+}
+}
+
+public SyncProcessEvent(){
+
+}
+public SyncProcessEvent(SupervisorData supervisorData) {
+init(supervisorData);
+}
+
+//TODO: initData is intended to local supervisor, so we will remove 
them after porting worker.clj to java
+public void init(SupervisorData supervisorData){
+this.supervisorData = supervisorData;
+this.localState = supervisorData.getLocalState();
+}
+
+
+/**
--- End diff --

same comment is at the class level also which can be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192377#comment-15192377
 ] 

ASF GitHub Bot commented on STORM-1279:
---

Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55938211
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java ---
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.ProcessSimulator;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.localizer.LocalResource;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URLDecoder;
+import java.util.*;
+
+public class SupervisorUtils {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SupervisorUtils.class);
+
+private static final SupervisorUtils INSTANCE = new SupervisorUtils();
+private static SupervisorUtils _instance = INSTANCE;
+public static void setInstance(SupervisorUtils u) {
+_instance = u;
+}
+public static void resetInstance() {
+_instance = INSTANCE;
+}
+
+public static Process workerLauncher(Map conf, String user, 
List args, Map environment, final String logPreFix,
+final Utils.ExitCodeCallable exitCodeCallback, File dir) 
throws IOException {
+if (StringUtils.isBlank(user)) {
+throw new IllegalArgumentException("User cannot be blank when 
calling workerLauncher.");
+}
+String wlinitial = (String) 
(conf.get(Config.SUPERVISOR_WORKER_LAUNCHER));
+String stormHome = 
ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
+String wl;
+if (StringUtils.isNotBlank(wlinitial)) {
+wl = wlinitial;
+} else {
+wl = stormHome + "/bin/worker-launcher";
+}
+List commands = new ArrayList<>();
+commands.add(wl);
+commands.add(user);
+commands.addAll(args);
+LOG.info("Running as user: {} command: {}", user, commands);
+return Utils.launchProcess(commands, environment, logPreFix, 
exitCodeCallback, dir);
+}
+
+public static int workerLauncherAndWait(Map conf, String user, 
List args, final Map environment, final String 
logPreFix)
+throws IOException {
+int ret = 0;
+Process process = workerLauncher(conf, user, args, environment, 
logPreFix, null, null);
+if (StringUtils.isNotBlank(logPreFix))
+Utils.readAndLogStream(logPreFix, process.getInputStream());
+try {
+process.waitFor();
+} catch (InterruptedException e) {
+LOG.info("{} interrupted.", logPreFix);
+}
+ret = process.exitValue();
+return ret;
+}
+
+public static void setupStormCodeDir(Map conf, Map stormConf, String 
dir) throws IOException {
+if 
(Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+String logPrefix = "setup conf for " + dir;
+List commands = new ArrayList<>();
+commands.add("code-dir");
+commands.add(dir);
+workerLauncherAndWait(conf, (String) 
(stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null

[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-13 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55938211
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java ---
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.ProcessSimulator;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.localizer.LocalResource;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URLDecoder;
+import java.util.*;
+
+public class SupervisorUtils {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SupervisorUtils.class);
+
+private static final SupervisorUtils INSTANCE = new SupervisorUtils();
+private static SupervisorUtils _instance = INSTANCE;
+public static void setInstance(SupervisorUtils u) {
+_instance = u;
+}
+public static void resetInstance() {
+_instance = INSTANCE;
+}
+
+public static Process workerLauncher(Map conf, String user, 
List args, Map environment, final String logPreFix,
+final Utils.ExitCodeCallable exitCodeCallback, File dir) 
throws IOException {
+if (StringUtils.isBlank(user)) {
+throw new IllegalArgumentException("User cannot be blank when 
calling workerLauncher.");
+}
+String wlinitial = (String) 
(conf.get(Config.SUPERVISOR_WORKER_LAUNCHER));
+String stormHome = 
ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
+String wl;
+if (StringUtils.isNotBlank(wlinitial)) {
+wl = wlinitial;
+} else {
+wl = stormHome + "/bin/worker-launcher";
+}
+List commands = new ArrayList<>();
+commands.add(wl);
+commands.add(user);
+commands.addAll(args);
+LOG.info("Running as user: {} command: {}", user, commands);
+return Utils.launchProcess(commands, environment, logPreFix, 
exitCodeCallback, dir);
+}
+
+public static int workerLauncherAndWait(Map conf, String user, 
List args, final Map environment, final String 
logPreFix)
+throws IOException {
+int ret = 0;
+Process process = workerLauncher(conf, user, args, environment, 
logPreFix, null, null);
+if (StringUtils.isNotBlank(logPreFix))
+Utils.readAndLogStream(logPreFix, process.getInputStream());
+try {
+process.waitFor();
+} catch (InterruptedException e) {
+LOG.info("{} interrupted.", logPreFix);
+}
+ret = process.exitValue();
+return ret;
+}
+
+public static void setupStormCodeDir(Map conf, Map stormConf, String 
dir) throws IOException {
+if 
(Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+String logPrefix = "setup conf for " + dir;
+List commands = new ArrayList<>();
+commands.add("code-dir");
+commands.add(dir);
+workerLauncherAndWait(conf, (String) 
(stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix);
+}
+}
+
+public static void rmrAsUser(Map conf, String id, String path) throws 
IOException {
+String user = Utils.getFileOwner(path);
+String logPreFix = "rmr " + id;
+List 

[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192376#comment-15192376
 ] 

ASF GitHub Bot commented on STORM-1279:
---

Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55938186
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java ---
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.ProcessSimulator;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.localizer.LocalResource;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URLDecoder;
+import java.util.*;
+
+public class SupervisorUtils {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SupervisorUtils.class);
+
+private static final SupervisorUtils INSTANCE = new SupervisorUtils();
+private static SupervisorUtils _instance = INSTANCE;
+public static void setInstance(SupervisorUtils u) {
+_instance = u;
+}
+public static void resetInstance() {
+_instance = INSTANCE;
+}
+
+public static Process workerLauncher(Map conf, String user, 
List args, Map environment, final String logPreFix,
+final Utils.ExitCodeCallable exitCodeCallback, File dir) 
throws IOException {
+if (StringUtils.isBlank(user)) {
+throw new IllegalArgumentException("User cannot be blank when 
calling workerLauncher.");
+}
+String wlinitial = (String) 
(conf.get(Config.SUPERVISOR_WORKER_LAUNCHER));
+String stormHome = 
ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
+String wl;
+if (StringUtils.isNotBlank(wlinitial)) {
+wl = wlinitial;
+} else {
+wl = stormHome + "/bin/worker-launcher";
+}
+List commands = new ArrayList<>();
+commands.add(wl);
+commands.add(user);
+commands.addAll(args);
+LOG.info("Running as user: {} command: {}", user, commands);
+return Utils.launchProcess(commands, environment, logPreFix, 
exitCodeCallback, dir);
+}
+
+public static int workerLauncherAndWait(Map conf, String user, 
List args, final Map environment, final String 
logPreFix)
+throws IOException {
+int ret = 0;
+Process process = workerLauncher(conf, user, args, environment, 
logPreFix, null, null);
+if (StringUtils.isNotBlank(logPreFix))
+Utils.readAndLogStream(logPreFix, process.getInputStream());
+try {
+process.waitFor();
+} catch (InterruptedException e) {
+LOG.info("{} interrupted.", logPreFix);
+}
+ret = process.exitValue();
+return ret;
+}
+
+public static void setupStormCodeDir(Map conf, Map stormConf, String 
dir) throws IOException {
+if 
(Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+String logPrefix = "setup conf for " + dir;
+List commands = new ArrayList<>();
+commands.add("code-dir");
+commands.add(dir);
+workerLauncherAndWait(conf, (String) 
(stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null

[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-13 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55938186
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java ---
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.ProcessSimulator;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.localizer.LocalResource;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URLDecoder;
+import java.util.*;
+
+public class SupervisorUtils {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SupervisorUtils.class);
+
+private static final SupervisorUtils INSTANCE = new SupervisorUtils();
+private static SupervisorUtils _instance = INSTANCE;
+public static void setInstance(SupervisorUtils u) {
+_instance = u;
+}
+public static void resetInstance() {
+_instance = INSTANCE;
+}
+
+public static Process workerLauncher(Map conf, String user, 
List args, Map environment, final String logPreFix,
+final Utils.ExitCodeCallable exitCodeCallback, File dir) 
throws IOException {
+if (StringUtils.isBlank(user)) {
+throw new IllegalArgumentException("User cannot be blank when 
calling workerLauncher.");
+}
+String wlinitial = (String) 
(conf.get(Config.SUPERVISOR_WORKER_LAUNCHER));
+String stormHome = 
ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
+String wl;
+if (StringUtils.isNotBlank(wlinitial)) {
+wl = wlinitial;
+} else {
+wl = stormHome + "/bin/worker-launcher";
+}
+List commands = new ArrayList<>();
+commands.add(wl);
+commands.add(user);
+commands.addAll(args);
+LOG.info("Running as user: {} command: {}", user, commands);
+return Utils.launchProcess(commands, environment, logPreFix, 
exitCodeCallback, dir);
+}
+
+public static int workerLauncherAndWait(Map conf, String user, 
List args, final Map environment, final String 
logPreFix)
+throws IOException {
+int ret = 0;
+Process process = workerLauncher(conf, user, args, environment, 
logPreFix, null, null);
+if (StringUtils.isNotBlank(logPreFix))
+Utils.readAndLogStream(logPreFix, process.getInputStream());
+try {
+process.waitFor();
+} catch (InterruptedException e) {
+LOG.info("{} interrupted.", logPreFix);
+}
+ret = process.exitValue();
+return ret;
+}
+
+public static void setupStormCodeDir(Map conf, Map stormConf, String 
dir) throws IOException {
+if 
(Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+String logPrefix = "setup conf for " + dir;
+List commands = new ArrayList<>();
+commands.add("code-dir");
+commands.add(dir);
+workerLauncherAndWait(conf, (String) 
(stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix);
+}
+}
+
+public static void rmrAsUser(Map conf, String id, String path) throws 
IOException {
+String user = Utils.getFileOwner(path);
+String logPreFix = "rmr " + id;
+List 

[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-13 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55938170
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java ---
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.StormTimer;
+import org.apache.storm.daemon.supervisor.timer.RunProfilerActions;
+import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck;
+import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat;
+import org.apache.storm.daemon.supervisor.timer.UpdateBlobs;
+import org.apache.storm.event.EventManagerImp;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.VersionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.InterruptedIOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+public class Supervisor {
+private static Logger LOG = LoggerFactory.getLogger(Supervisor.class);
+
+// TODO: to be removed after porting worker.clj. localSyncProcess is 
intended to start local supervisor
+private SyncProcessEvent localSyncProcess;
+
+public void setLocalSyncProcess(SyncProcessEvent localSyncProcess) {
+this.localSyncProcess = localSyncProcess;
+}
+
+/**
+ * in local state, supervisor stores who its current assignments are 
another thread launches events to restart any dead processes if necessary
+ * 
+ * @param conf
+ * @param sharedContext
+ * @param iSupervisor
+ * @return
+ * @throws Exception
+ */
+public SupervisorManger mkSupervisor(final Map conf, IContext 
sharedContext, ISupervisor iSupervisor) throws Exception {
+SupervisorManger supervisorManger = null;
+try {
+LOG.info("Starting Supervisor with conf {}", conf);
+iSupervisor.prepare(conf, 
ConfigUtils.supervisorIsupervisorDir(conf));
+String path = ConfigUtils.supervisorTmpDir(conf);
+FileUtils.cleanDirectory(new File(path));
+
+final SupervisorData supervisorData = new SupervisorData(conf, 
sharedContext, iSupervisor);
+Localizer localizer = supervisorData.getLocalizer();
+
+SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, 
supervisorData);
+hb.run();
+// should synchronize supervisor so it doesn't launch anything 
after being down (optimization)
+Integer heartbeatFrequency = 
Utils.getInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS));
+supervisorData.getHeartbeatTimer().scheduleRecurring(0, 
heartbeatFrequency, hb);
+
+Set downdedStormId = 
SupervisorUtils.readDownLoadedStormIds(conf);
--- End diff --

nit. downloadedStormIds


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192374#comment-15192374
 ] 

ASF GitHub Bot commented on STORM-1279:
---

Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55938170
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java ---
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.StormTimer;
+import org.apache.storm.daemon.supervisor.timer.RunProfilerActions;
+import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck;
+import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat;
+import org.apache.storm.daemon.supervisor.timer.UpdateBlobs;
+import org.apache.storm.event.EventManagerImp;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.VersionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.InterruptedIOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+public class Supervisor {
+private static Logger LOG = LoggerFactory.getLogger(Supervisor.class);
+
+// TODO: to be removed after porting worker.clj. localSyncProcess is 
intended to start local supervisor
+private SyncProcessEvent localSyncProcess;
+
+public void setLocalSyncProcess(SyncProcessEvent localSyncProcess) {
+this.localSyncProcess = localSyncProcess;
+}
+
+/**
+ * in local state, supervisor stores who its current assignments are 
another thread launches events to restart any dead processes if necessary
+ * 
+ * @param conf
+ * @param sharedContext
+ * @param iSupervisor
+ * @return
+ * @throws Exception
+ */
+public SupervisorManger mkSupervisor(final Map conf, IContext 
sharedContext, ISupervisor iSupervisor) throws Exception {
+SupervisorManger supervisorManger = null;
+try {
+LOG.info("Starting Supervisor with conf {}", conf);
+iSupervisor.prepare(conf, 
ConfigUtils.supervisorIsupervisorDir(conf));
+String path = ConfigUtils.supervisorTmpDir(conf);
+FileUtils.cleanDirectory(new File(path));
+
+final SupervisorData supervisorData = new SupervisorData(conf, 
sharedContext, iSupervisor);
+Localizer localizer = supervisorData.getLocalizer();
+
+SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, 
supervisorData);
+hb.run();
+// should synchronize supervisor so it doesn't launch anything 
after being down (optimization)
+Integer heartbeatFrequency = 
Utils.getInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS));
+supervisorData.getHeartbeatTimer().scheduleRecurring(0, 
heartbeatFrequency, hb);
+
+Set downdedStormId = 
SupervisorUtils.readDownLoadedStormIds(conf);
--- End diff --

nit. downloadedStormIds


> port backtype.storm.daemon.supervisor to java
> -
>
> Key: STORM-1279
> URL: https://issues.apache.org/jira/browse/STORM-1279
> Project: Apache Storm
>  Issue Type: New Feature
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: John Fang
>  Labels: java-mig

[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-13 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55938152
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java ---
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.ProcessSimulator;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.localizer.LocalResource;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URLDecoder;
+import java.util.*;
+
+public class SupervisorUtils {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SupervisorUtils.class);
+
+private static final SupervisorUtils INSTANCE = new SupervisorUtils();
+private static SupervisorUtils _instance = INSTANCE;
+public static void setInstance(SupervisorUtils u) {
+_instance = u;
+}
+public static void resetInstance() {
+_instance = INSTANCE;
+}
+
+public static Process workerLauncher(Map conf, String user, 
List args, Map environment, final String logPreFix,
+final Utils.ExitCodeCallable exitCodeCallback, File dir) 
throws IOException {
+if (StringUtils.isBlank(user)) {
+throw new IllegalArgumentException("User cannot be blank when 
calling workerLauncher.");
+}
+String wlinitial = (String) 
(conf.get(Config.SUPERVISOR_WORKER_LAUNCHER));
+String stormHome = 
ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
+String wl;
+if (StringUtils.isNotBlank(wlinitial)) {
+wl = wlinitial;
+} else {
+wl = stormHome + "/bin/worker-launcher";
+}
+List commands = new ArrayList<>();
+commands.add(wl);
+commands.add(user);
+commands.addAll(args);
+LOG.info("Running as user: {} command: {}", user, commands);
+return Utils.launchProcess(commands, environment, logPreFix, 
exitCodeCallback, dir);
+}
+
+public static int workerLauncherAndWait(Map conf, String user, 
List args, final Map environment, final String 
logPreFix)
+throws IOException {
+int ret = 0;
+Process process = workerLauncher(conf, user, args, environment, 
logPreFix, null, null);
+if (StringUtils.isNotBlank(logPreFix))
+Utils.readAndLogStream(logPreFix, process.getInputStream());
+try {
+process.waitFor();
+} catch (InterruptedException e) {
+LOG.info("{} interrupted.", logPreFix);
+}
+ret = process.exitValue();
+return ret;
+}
+
+public static void setupStormCodeDir(Map conf, Map stormConf, String 
dir) throws IOException {
+if 
(Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+String logPrefix = "setup conf for " + dir;
+List commands = new ArrayList<>();
+commands.add("code-dir");
+commands.add(dir);
+workerLauncherAndWait(conf, (String) 
(stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix);
+}
+}
+
+public static void rmrAsUser(Map conf, String id, String path) throws 
IOException {
+String user = Utils.getFileOwner(path);
+String logPreFix = "rmr " + id;
+List 

[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192372#comment-15192372
 ] 

ASF GitHub Bot commented on STORM-1279:
---

Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55938152
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java ---
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.ProcessSimulator;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.localizer.LocalResource;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URLDecoder;
+import java.util.*;
+
+public class SupervisorUtils {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SupervisorUtils.class);
+
+private static final SupervisorUtils INSTANCE = new SupervisorUtils();
+private static SupervisorUtils _instance = INSTANCE;
+public static void setInstance(SupervisorUtils u) {
+_instance = u;
+}
+public static void resetInstance() {
+_instance = INSTANCE;
+}
+
+public static Process workerLauncher(Map conf, String user, 
List args, Map environment, final String logPreFix,
+final Utils.ExitCodeCallable exitCodeCallback, File dir) 
throws IOException {
+if (StringUtils.isBlank(user)) {
+throw new IllegalArgumentException("User cannot be blank when 
calling workerLauncher.");
+}
+String wlinitial = (String) 
(conf.get(Config.SUPERVISOR_WORKER_LAUNCHER));
+String stormHome = 
ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
+String wl;
+if (StringUtils.isNotBlank(wlinitial)) {
+wl = wlinitial;
+} else {
+wl = stormHome + "/bin/worker-launcher";
+}
+List commands = new ArrayList<>();
+commands.add(wl);
+commands.add(user);
+commands.addAll(args);
+LOG.info("Running as user: {} command: {}", user, commands);
+return Utils.launchProcess(commands, environment, logPreFix, 
exitCodeCallback, dir);
+}
+
+public static int workerLauncherAndWait(Map conf, String user, 
List args, final Map environment, final String 
logPreFix)
+throws IOException {
+int ret = 0;
+Process process = workerLauncher(conf, user, args, environment, 
logPreFix, null, null);
+if (StringUtils.isNotBlank(logPreFix))
+Utils.readAndLogStream(logPreFix, process.getInputStream());
+try {
+process.waitFor();
+} catch (InterruptedException e) {
+LOG.info("{} interrupted.", logPreFix);
+}
+ret = process.exitValue();
+return ret;
+}
+
+public static void setupStormCodeDir(Map conf, Map stormConf, String 
dir) throws IOException {
+if 
(Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+String logPrefix = "setup conf for " + dir;
+List commands = new ArrayList<>();
+commands.add("code-dir");
+commands.add(dir);
+workerLauncherAndWait(conf, (String) 
(stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null

[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-13 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55938035
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java ---
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.StormTimer;
+import org.apache.storm.daemon.supervisor.timer.RunProfilerActions;
+import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck;
+import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat;
+import org.apache.storm.daemon.supervisor.timer.UpdateBlobs;
+import org.apache.storm.event.EventManagerImp;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.VersionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.InterruptedIOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+public class Supervisor {
+private static Logger LOG = LoggerFactory.getLogger(Supervisor.class);
+
+// TODO: to be removed after porting worker.clj. localSyncProcess is 
intended to start local supervisor
+private SyncProcessEvent localSyncProcess;
+
+public void setLocalSyncProcess(SyncProcessEvent localSyncProcess) {
+this.localSyncProcess = localSyncProcess;
+}
+
+/**
+ * in local state, supervisor stores who its current assignments are 
another thread launches events to restart any dead processes if necessary
+ * 
+ * @param conf
+ * @param sharedContext
+ * @param iSupervisor
+ * @return
+ * @throws Exception
+ */
+public SupervisorManger mkSupervisor(final Map conf, IContext 
sharedContext, ISupervisor iSupervisor) throws Exception {
+SupervisorManger supervisorManger = null;
+try {
+LOG.info("Starting Supervisor with conf {}", conf);
+iSupervisor.prepare(conf, 
ConfigUtils.supervisorIsupervisorDir(conf));
+String path = ConfigUtils.supervisorTmpDir(conf);
+FileUtils.cleanDirectory(new File(path));
+
+final SupervisorData supervisorData = new SupervisorData(conf, 
sharedContext, iSupervisor);
+Localizer localizer = supervisorData.getLocalizer();
+
+SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, 
supervisorData);
+hb.run();
+// should synchronize supervisor so it doesn't launch anything 
after being down (optimization)
+Integer heartbeatFrequency = 
Utils.getInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS));
+supervisorData.getHeartbeatTimer().scheduleRecurring(0, 
heartbeatFrequency, hb);
+
+Set downdedStormId = 
SupervisorUtils.readDownLoadedStormIds(conf);
+for (String stormId : downdedStormId) {
+SupervisorUtils.addBlobReferences(localizer, stormId, 
conf);
+}
+// do this after adding the references so we don't try to 
clean things being used
+localizer.startCleaner();
+
+EventManagerImp syncSupEventManager = new 
EventManagerImp(false);
+EventManagerImp syncProcessManager = new 
EventManagerImp(false);
+
+SyncProcessEvent syncProcessEvent = null;
+if (ConfigUtils.isLocalMode(conf)) {
+localSyncProcess.init(supervisorData);
+syncProcessEvent = localSyncProcess;
+  

[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192370#comment-15192370
 ] 

ASF GitHub Bot commented on STORM-1279:
---

Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55938035
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java ---
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.StormTimer;
+import org.apache.storm.daemon.supervisor.timer.RunProfilerActions;
+import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck;
+import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat;
+import org.apache.storm.daemon.supervisor.timer.UpdateBlobs;
+import org.apache.storm.event.EventManagerImp;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.VersionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.InterruptedIOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+public class Supervisor {
+private static Logger LOG = LoggerFactory.getLogger(Supervisor.class);
+
+// TODO: to be removed after porting worker.clj. localSyncProcess is 
intended to start local supervisor
+private SyncProcessEvent localSyncProcess;
+
+public void setLocalSyncProcess(SyncProcessEvent localSyncProcess) {
+this.localSyncProcess = localSyncProcess;
+}
+
+/**
+ * in local state, supervisor stores who its current assignments are 
another thread launches events to restart any dead processes if necessary
+ * 
+ * @param conf
+ * @param sharedContext
+ * @param iSupervisor
+ * @return
+ * @throws Exception
+ */
+public SupervisorManger mkSupervisor(final Map conf, IContext 
sharedContext, ISupervisor iSupervisor) throws Exception {
+SupervisorManger supervisorManger = null;
+try {
+LOG.info("Starting Supervisor with conf {}", conf);
+iSupervisor.prepare(conf, 
ConfigUtils.supervisorIsupervisorDir(conf));
+String path = ConfigUtils.supervisorTmpDir(conf);
+FileUtils.cleanDirectory(new File(path));
+
+final SupervisorData supervisorData = new SupervisorData(conf, 
sharedContext, iSupervisor);
+Localizer localizer = supervisorData.getLocalizer();
+
+SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, 
supervisorData);
+hb.run();
+// should synchronize supervisor so it doesn't launch anything 
after being down (optimization)
+Integer heartbeatFrequency = 
Utils.getInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS));
+supervisorData.getHeartbeatTimer().scheduleRecurring(0, 
heartbeatFrequency, hb);
+
+Set downdedStormId = 
SupervisorUtils.readDownLoadedStormIds(conf);
+for (String stormId : downdedStormId) {
+SupervisorUtils.addBlobReferences(localizer, stormId, 
conf);
+}
+// do this after adding the references so we don't try to 
clean things being used
+localizer.startCleaner();
+
+EventManagerImp syncSupEventManager = new 
EventManagerImp(false);
+EventManagerImp syncProcessManager = new 
EventManagerImp(fals

[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192368#comment-15192368
 ] 

ASF GitHub Bot commented on STORM-1279:
---

Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55938008
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java ---
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.storm.event.EventManager;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Map;
+
+public class SupervisorManger implements SupervisorDaemon, DaemonCommon, 
Runnable {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SupervisorManger.class);
+private final EventManager eventManager;
+private final EventManager processesEventManager;
+private SupervisorData supervisorData;
+
+public SupervisorManger(SupervisorData supervisorData, EventManager 
eventManager, EventManager processesEventManager) {
+this.eventManager = eventManager;
+this.supervisorData = supervisorData;
+this.processesEventManager = processesEventManager;
+}
+
+public void shutdown() {
+LOG.info("Shutting down supervisor{}", 
supervisorData.getSupervisorId());
+supervisorData.setActive(false);
+try {
+supervisorData.getHeartbeatTimer().close();
+supervisorData.getEventTimer().close();
+supervisorData.getBlobUpdateTimer().close();
+eventManager.close();
+processesEventManager.close();
+} catch (Exception e) {
+throw Utils.wrapInRuntime(e);
+}
+supervisorData.getStormClusterState().disconnect();
+}
+
+@Override
+public void shutdownAllWorkers() {
+
+Collection workerIds = 
SupervisorUtils.supervisorWorkerIds(supervisorData.getConf());
+try {
+for (String workerId : workerIds) {
+SupervisorUtils.shutWorker(supervisorData, workerId);
+}
+} catch (Exception e) {
+LOG.error("shutWorker failed");
+throw Utils.wrapInRuntime(e);
+}
+}
+
+@Override
+public Map getConf() {
+return supervisorData.getConf();
+}
+
+@Override
+public String getId() {
+return supervisorData.getSupervisorId();
+}
+
+@Override
+public boolean isWaiting() {
+if (!supervisorData.isActive()) {
+return true;
+}
+
+if (supervisorData.getHeartbeatTimer().isTimerWaiting() && 
supervisorData.getEventTimer().isTimerWaiting() && eventManager.waiting()
+&& processesEventManager.waiting()) {
+return true;
+}
+return false;
+}
+
+public void run() {
--- End diff --

it is not very intuitive that SupervisorManager is also a shutdown hook. 
You can remove this from here and in the use an anonymous shutdown hook


> port backtype.storm.daemon.supervisor to java
> -
>
> Key: STORM-1279
> URL: https://issues.apache.org/jira/browse/STORM-1279
> Project: Apache Storm
>  Issue Type: New Feature
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: John Fang
>  Labels: java-migration, jstorm-merger
> Attachments: Discussion about supervisor.pdf
>
>
> https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor
>  as an

[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-13 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55938008
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java ---
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.storm.event.EventManager;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Map;
+
+public class SupervisorManger implements SupervisorDaemon, DaemonCommon, 
Runnable {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SupervisorManger.class);
+private final EventManager eventManager;
+private final EventManager processesEventManager;
+private SupervisorData supervisorData;
+
+public SupervisorManger(SupervisorData supervisorData, EventManager 
eventManager, EventManager processesEventManager) {
+this.eventManager = eventManager;
+this.supervisorData = supervisorData;
+this.processesEventManager = processesEventManager;
+}
+
+public void shutdown() {
+LOG.info("Shutting down supervisor{}", 
supervisorData.getSupervisorId());
+supervisorData.setActive(false);
+try {
+supervisorData.getHeartbeatTimer().close();
+supervisorData.getEventTimer().close();
+supervisorData.getBlobUpdateTimer().close();
+eventManager.close();
+processesEventManager.close();
+} catch (Exception e) {
+throw Utils.wrapInRuntime(e);
+}
+supervisorData.getStormClusterState().disconnect();
+}
+
+@Override
+public void shutdownAllWorkers() {
+
+Collection workerIds = 
SupervisorUtils.supervisorWorkerIds(supervisorData.getConf());
+try {
+for (String workerId : workerIds) {
+SupervisorUtils.shutWorker(supervisorData, workerId);
+}
+} catch (Exception e) {
+LOG.error("shutWorker failed");
+throw Utils.wrapInRuntime(e);
+}
+}
+
+@Override
+public Map getConf() {
+return supervisorData.getConf();
+}
+
+@Override
+public String getId() {
+return supervisorData.getSupervisorId();
+}
+
+@Override
+public boolean isWaiting() {
+if (!supervisorData.isActive()) {
+return true;
+}
+
+if (supervisorData.getHeartbeatTimer().isTimerWaiting() && 
supervisorData.getEventTimer().isTimerWaiting() && eventManager.waiting()
+&& processesEventManager.waiting()) {
+return true;
+}
+return false;
+}
+
+public void run() {
--- End diff --

it is not very intuitive that SupervisorManager is also a shutdown hook. 
You can remove this from here and in the use an anonymous shutdown hook


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192367#comment-15192367
 ] 

ASF GitHub Bot commented on STORM-1279:
---

Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55937985
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java ---
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.storm.event.EventManager;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Map;
+
+public class SupervisorManger implements SupervisorDaemon, DaemonCommon, 
Runnable {
--- End diff --

nit. class name should be SupervisorManager. 


> port backtype.storm.daemon.supervisor to java
> -
>
> Key: STORM-1279
> URL: https://issues.apache.org/jira/browse/STORM-1279
> Project: Apache Storm
>  Issue Type: New Feature
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: John Fang
>  Labels: java-migration, jstorm-merger
> Attachments: Discussion about supervisor.pdf
>
>
> https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor
>  as an example
> backtype.storm.event usage should be replaced with built-in java threadpools.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-13 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55937985
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java ---
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.storm.event.EventManager;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Map;
+
+public class SupervisorManger implements SupervisorDaemon, DaemonCommon, 
Runnable {
--- End diff --

nit. class name should be SupervisorManager. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-13 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55937914
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java ---
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.storm.event.EventManager;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Map;
+
+public class SupervisorManger implements SupervisorDaemon, DaemonCommon, 
Runnable {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SupervisorManger.class);
+private final EventManager eventManager;
+private final EventManager processesEventManager;
+private SupervisorData supervisorData;
+
+public SupervisorManger(SupervisorData supervisorData, EventManager 
eventManager, EventManager processesEventManager) {
+this.eventManager = eventManager;
+this.supervisorData = supervisorData;
+this.processesEventManager = processesEventManager;
+}
+
+public void shutdown() {
+LOG.info("Shutting down supervisor{}", 
supervisorData.getSupervisorId());
--- End diff --

minor. space after supervisor. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192365#comment-15192365
 ] 

ASF GitHub Bot commented on STORM-1279:
---

Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55937914
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java ---
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.storm.event.EventManager;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Map;
+
+public class SupervisorManger implements SupervisorDaemon, DaemonCommon, 
Runnable {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SupervisorManger.class);
+private final EventManager eventManager;
+private final EventManager processesEventManager;
+private SupervisorData supervisorData;
+
+public SupervisorManger(SupervisorData supervisorData, EventManager 
eventManager, EventManager processesEventManager) {
+this.eventManager = eventManager;
+this.supervisorData = supervisorData;
+this.processesEventManager = processesEventManager;
+}
+
+public void shutdown() {
+LOG.info("Shutting down supervisor{}", 
supervisorData.getSupervisorId());
--- End diff --

minor. space after supervisor. 


> port backtype.storm.daemon.supervisor to java
> -
>
> Key: STORM-1279
> URL: https://issues.apache.org/jira/browse/STORM-1279
> Project: Apache Storm
>  Issue Type: New Feature
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: John Fang
>  Labels: java-migration, jstorm-merger
> Attachments: Discussion about supervisor.pdf
>
>
> https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor
>  as an example
> backtype.storm.event usage should be replaced with built-in java threadpools.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192361#comment-15192361
 ] 

ASF GitHub Bot commented on STORM-1279:
---

Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55937791
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java ---
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.StormTimer;
+import org.apache.storm.daemon.supervisor.timer.RunProfilerActions;
+import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck;
+import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat;
+import org.apache.storm.daemon.supervisor.timer.UpdateBlobs;
+import org.apache.storm.event.EventManagerImp;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.VersionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.InterruptedIOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+public class Supervisor {
+private static Logger LOG = LoggerFactory.getLogger(Supervisor.class);
+
+// TODO: to be removed after porting worker.clj. localSyncProcess is 
intended to start local supervisor
+private SyncProcessEvent localSyncProcess;
+
+public void setLocalSyncProcess(SyncProcessEvent localSyncProcess) {
+this.localSyncProcess = localSyncProcess;
+}
+
+/**
+ * in local state, supervisor stores who its current assignments are 
another thread launches events to restart any dead processes if necessary
+ * 
+ * @param conf
+ * @param sharedContext
+ * @param iSupervisor
+ * @return
+ * @throws Exception
+ */
+public SupervisorManger mkSupervisor(final Map conf, IContext 
sharedContext, ISupervisor iSupervisor) throws Exception {
+SupervisorManger supervisorManger = null;
+try {
+LOG.info("Starting Supervisor with conf {}", conf);
+iSupervisor.prepare(conf, 
ConfigUtils.supervisorIsupervisorDir(conf));
+String path = ConfigUtils.supervisorTmpDir(conf);
+FileUtils.cleanDirectory(new File(path));
+
+final SupervisorData supervisorData = new SupervisorData(conf, 
sharedContext, iSupervisor);
+Localizer localizer = supervisorData.getLocalizer();
+
+SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, 
supervisorData);
+hb.run();
+// should synchronize supervisor so it doesn't launch anything 
after being down (optimization)
+Integer heartbeatFrequency = 
Utils.getInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS));
+supervisorData.getHeartbeatTimer().scheduleRecurring(0, 
heartbeatFrequency, hb);
+
+Set downdedStormId = 
SupervisorUtils.readDownLoadedStormIds(conf);
+for (String stormId : downdedStormId) {
+SupervisorUtils.addBlobReferences(localizer, stormId, 
conf);
+}
+// do this after adding the references so we don't try to 
clean things being used
+localizer.startCleaner();
+
+EventManagerImp syncSupEventManager = new 
EventManagerImp(false);
+EventManagerImp syncProcessManager = new 
EventManagerImp(fals

[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-13 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55937791
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/Supervisor.java ---
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.StormTimer;
+import org.apache.storm.daemon.supervisor.timer.RunProfilerActions;
+import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck;
+import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat;
+import org.apache.storm.daemon.supervisor.timer.UpdateBlobs;
+import org.apache.storm.event.EventManagerImp;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.VersionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.InterruptedIOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+public class Supervisor {
+private static Logger LOG = LoggerFactory.getLogger(Supervisor.class);
+
+// TODO: to be removed after porting worker.clj. localSyncProcess is 
intended to start local supervisor
+private SyncProcessEvent localSyncProcess;
+
+public void setLocalSyncProcess(SyncProcessEvent localSyncProcess) {
+this.localSyncProcess = localSyncProcess;
+}
+
+/**
+ * in local state, supervisor stores who its current assignments are 
another thread launches events to restart any dead processes if necessary
+ * 
+ * @param conf
+ * @param sharedContext
+ * @param iSupervisor
+ * @return
+ * @throws Exception
+ */
+public SupervisorManger mkSupervisor(final Map conf, IContext 
sharedContext, ISupervisor iSupervisor) throws Exception {
+SupervisorManger supervisorManger = null;
+try {
+LOG.info("Starting Supervisor with conf {}", conf);
+iSupervisor.prepare(conf, 
ConfigUtils.supervisorIsupervisorDir(conf));
+String path = ConfigUtils.supervisorTmpDir(conf);
+FileUtils.cleanDirectory(new File(path));
+
+final SupervisorData supervisorData = new SupervisorData(conf, 
sharedContext, iSupervisor);
+Localizer localizer = supervisorData.getLocalizer();
+
+SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, 
supervisorData);
+hb.run();
+// should synchronize supervisor so it doesn't launch anything 
after being down (optimization)
+Integer heartbeatFrequency = 
Utils.getInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS));
+supervisorData.getHeartbeatTimer().scheduleRecurring(0, 
heartbeatFrequency, hb);
+
+Set downdedStormId = 
SupervisorUtils.readDownLoadedStormIds(conf);
+for (String stormId : downdedStormId) {
+SupervisorUtils.addBlobReferences(localizer, stormId, 
conf);
+}
+// do this after adding the references so we don't try to 
clean things being used
+localizer.startCleaner();
+
+EventManagerImp syncSupEventManager = new 
EventManagerImp(false);
+EventManagerImp syncProcessManager = new 
EventManagerImp(false);
+
+SyncProcessEvent syncProcessEvent = null;
+if (ConfigUtils.isLocalMode(conf)) {
+localSyncProcess.init(supervisorData);
+syncProcessEvent = localSyncProcess;
+  

[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192357#comment-15192357
 ] 

ASF GitHub Bot commented on STORM-1279:
---

Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55937648
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
 ---
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor.timer;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.supervisor.SupervisorData;
+import org.apache.storm.generated.SupervisorInfo;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SupervisorHeartbeat implements Runnable {
--- End diff --

Please ignore my above comment. I see that it is being scheduled. 


> port backtype.storm.daemon.supervisor to java
> -
>
> Key: STORM-1279
> URL: https://issues.apache.org/jira/browse/STORM-1279
> Project: Apache Storm
>  Issue Type: New Feature
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: John Fang
>  Labels: java-migration, jstorm-merger
> Attachments: Discussion about supervisor.pdf
>
>
> https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor
>  as an example
> backtype.storm.event usage should be replaced with built-in java threadpools.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-13 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55937648
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
 ---
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor.timer;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.supervisor.SupervisorData;
+import org.apache.storm.generated.SupervisorInfo;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SupervisorHeartbeat implements Runnable {
--- End diff --

Please ignore my above comment. I see that it is being scheduled. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192354#comment-15192354
 ] 

ASF GitHub Bot commented on STORM-1279:
---

Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55937586
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
 ---
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor.timer;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.supervisor.SupervisorData;
+import org.apache.storm.generated.SupervisorInfo;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SupervisorHeartbeat implements Runnable {
--- End diff --

does it need to be Runnable?


> port backtype.storm.daemon.supervisor to java
> -
>
> Key: STORM-1279
> URL: https://issues.apache.org/jira/browse/STORM-1279
> Project: Apache Storm
>  Issue Type: New Feature
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: John Fang
>  Labels: java-migration, jstorm-merger
> Attachments: Discussion about supervisor.pdf
>
>
> https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor
>  as an example
> backtype.storm.event usage should be replaced with built-in java threadpools.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-13 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55937596
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
 ---
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor.timer;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.supervisor.SupervisorData;
+import org.apache.storm.generated.SupervisorInfo;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SupervisorHeartbeat implements Runnable {
+
+ private final IStormClusterState stormClusterState;
+ private final String supervisorId;
+ private final Map conf;
+ private final SupervisorData supervisorData;
+
+public SupervisorHeartbeat(Map conf, SupervisorData supervisorData) {
+this.stormClusterState = supervisorData.getStormClusterState();
+this.supervisorId = supervisorData.getSupervisorId();
+this.supervisorData = supervisorData;
+this.conf = conf;
+}
+
+private SupervisorInfo update(Map conf, SupervisorData supervisorData) 
{
--- End diff --

can this be renamed to buildSupervisorInfo or constructSupervisorInfo?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192355#comment-15192355
 ] 

ASF GitHub Bot commented on STORM-1279:
---

Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55937596
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
 ---
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor.timer;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.supervisor.SupervisorData;
+import org.apache.storm.generated.SupervisorInfo;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SupervisorHeartbeat implements Runnable {
+
+ private final IStormClusterState stormClusterState;
+ private final String supervisorId;
+ private final Map conf;
+ private final SupervisorData supervisorData;
+
+public SupervisorHeartbeat(Map conf, SupervisorData supervisorData) {
+this.stormClusterState = supervisorData.getStormClusterState();
+this.supervisorId = supervisorData.getSupervisorId();
+this.supervisorData = supervisorData;
+this.conf = conf;
+}
+
+private SupervisorInfo update(Map conf, SupervisorData supervisorData) 
{
--- End diff --

can this be renamed to buildSupervisorInfo or constructSupervisorInfo?


> port backtype.storm.daemon.supervisor to java
> -
>
> Key: STORM-1279
> URL: https://issues.apache.org/jira/browse/STORM-1279
> Project: Apache Storm
>  Issue Type: New Feature
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: John Fang
>  Labels: java-migration, jstorm-merger
> Attachments: Discussion about supervisor.pdf
>
>
> https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor
>  as an example
> backtype.storm.event usage should be replaced with built-in java threadpools.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-13 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55937586
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
 ---
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor.timer;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.supervisor.SupervisorData;
+import org.apache.storm.generated.SupervisorInfo;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SupervisorHeartbeat implements Runnable {
--- End diff --

does it need to be Runnable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1279) port backtype.storm.daemon.supervisor to java

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192351#comment-15192351
 ] 

ASF GitHub Bot commented on STORM-1279:
---

Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55937502
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java 
---
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.storm.Config;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Utils;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+public class StandaloneSupervisor implements ISupervisor {
+private String supervisorId;
+private Map conf;
+
+@Override
+public void prepare(Map stormConf, String schedulerLocalDir) {
+try {
+LocalState localState = new LocalState(schedulerLocalDir);
+String supervisorId = localState.getSupervisorId();
+if (supervisorId == null) {
+supervisorId = generateSupervisorId();
+localState.setSupervisorId(supervisorId);
+}
+this.conf = stormConf;
+this.supervisorId = supervisorId;
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+}
+
+@Override
+public String getSupervisorId() {
+return supervisorId;
+}
+
+@Override
+public String getAssignmentId() {
+return supervisorId;
+}
+
+@Override
+// @return is vector which need be converted to be int
--- End diff --

this comment can be removed


> port backtype.storm.daemon.supervisor to java
> -
>
> Key: STORM-1279
> URL: https://issues.apache.org/jira/browse/STORM-1279
> Project: Apache Storm
>  Issue Type: New Feature
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: John Fang
>  Labels: java-migration, jstorm-merger
> Attachments: Discussion about supervisor.pdf
>
>
> https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor
>  as an example
> backtype.storm.event usage should be replaced with built-in java threadpools.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-13 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55937502
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java 
---
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.storm.Config;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Utils;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+public class StandaloneSupervisor implements ISupervisor {
+private String supervisorId;
+private Map conf;
+
+@Override
+public void prepare(Map stormConf, String schedulerLocalDir) {
+try {
+LocalState localState = new LocalState(schedulerLocalDir);
+String supervisorId = localState.getSupervisorId();
+if (supervisorId == null) {
+supervisorId = generateSupervisorId();
+localState.setSupervisorId(supervisorId);
+}
+this.conf = stormConf;
+this.supervisorId = supervisorId;
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+}
+
+@Override
+public String getSupervisorId() {
+return supervisorId;
+}
+
+@Override
+public String getAssignmentId() {
+return supervisorId;
+}
+
+@Override
+// @return is vector which need be converted to be int
--- End diff --

this comment can be removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1235) port backtype.storm.security.auth.ReqContext-test to java

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192345#comment-15192345
 ] 

ASF GitHub Bot commented on STORM-1235:
---

Github user abhishekagarwal87 commented on the pull request:

https://github.com/apache/storm/pull/1200#issuecomment-195953877
  
Thanks @harshach. Right now, we have one JIRA for each clojure file that is 
to be ported into Java. All the JIRA tasks have already been created. Do you 
want me to file a single JIRA and resolve others as duplicate? The tests, by 
the way, aren't strictly related to each other.


> port  backtype.storm.security.auth.ReqContext-test to java
> --
>
> Key: STORM-1235
> URL: https://issues.apache.org/jira/browse/STORM-1235
> Project: Apache Storm
>  Issue Type: New Feature
>  Components: storm-core
>Reporter: Robert Joseph Evans
>Assignee: Abhishek Agarwal
>  Labels: java-migration, jstorm-merger
>
> junit test migration



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: Tests - STORM-1235, STORM-1236, STORM-1237, ST...

2016-03-13 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on the pull request:

https://github.com/apache/storm/pull/1200#issuecomment-195953877
  
Thanks @harshach. Right now, we have one JIRA for each clojure file that is 
to be ported into Java. All the JIRA tasks have already been created. Do you 
want me to file a single JIRA and resolve others as duplicate? The tests, by 
the way, aren't strictly related to each other.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-1614) Clean backpressure zk node in do-cleanup

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-1614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192343#comment-15192343
 ] 

ASF GitHub Bot commented on STORM-1614:
---

Github user vesense commented on the pull request:

https://github.com/apache/storm/pull/1206#issuecomment-195952996
  
LGTM


> Clean backpressure zk node in do-cleanup
> 
>
> Key: STORM-1614
> URL: https://issues.apache.org/jira/browse/STORM-1614
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-core
>Reporter: Alessandro Bellina
>Assignee: Alessandro Bellina
>Priority: Minor
>
> Currently the backpressure node is being removed in killTopologyWithOpts in 
> nimbus. Remove instead like the other ZK nodes in do-cleanup for inactive 
> topos.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-1614: backpressure init and cleanup chan...

2016-03-13 Thread vesense
Github user vesense commented on the pull request:

https://github.com/apache/storm/pull/1206#issuecomment-195952996
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-822: Kafka Spout New Consumer API

2016-03-13 Thread abhishekagarwal87
Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1131#discussion_r55936769
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
 ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents the output streams associated with each topic, and provides 
a public API to
+ * declare output streams and emmit tuples, on the appropriate stream, for 
all the topics specified.
+ */
+public class KafkaSpoutStreams implements Serializable {
+private final Map topicToStream;
+
+private KafkaSpoutStreams(Builder builder) {
+this.topicToStream = builder.topicToStream;
+}
+
+/**
+ * @param topic the topic for which to get output fields
+ * @return the output fields declared
+ */
+public Fields getOutputFields(String topic) {
+if (topicToStream.containsKey(topic)) {
+return topicToStream.get(topic).getOutputFields();
+}
+throw new IllegalStateException(this.getClass().getName() + " not 
configured for topic: " + topic);
+}
+
+/**
+ * @param topic the topic to for which to get the stream id
+ * @return the id of the stream to where the tuples are emitted
+ */
+public String getStreamId(String topic) {
+if (topicToStream.containsKey(topic)) {
+return topicToStream.get(topic).getStreamId();
+}
+throw new IllegalStateException(this.getClass().getName() + " not 
configured for topic: " + topic);
+}
+
+/**
+ * @return list of topics subscribed and emitting tuples to a stream 
as configured by {@link KafkaSpoutStream}
+ */
+public List getTopics() {
+return new ArrayList<>(topicToStream.keySet());
+}
+
+void declareOutputFields(OutputFieldsDeclarer declarer) {
+for (KafkaSpoutStream stream : topicToStream.values()) {
+declarer.declareStream(stream.getStreamId(), 
stream.getOutputFields());
+}
+}
+
+void emit(SpoutOutputCollector collector, MessageId messageId) {
+collector.emit(getStreamId(messageId.topic()), 
messageId.getTuple(), messageId);
--- End diff --

@harshach current kafka spout only keeps offsets in memory. This new spout 
keeps the actual record as well in memory. Offsets are 64 bytes long but there 
is no limit on the record size. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-822) As a storm developer I’d like to use the new kafka consumer API (0.8.3) to reduce dependencies and use long term supported kafka apis

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192341#comment-15192341
 ] 

ASF GitHub Bot commented on STORM-822:
--

Github user abhishekagarwal87 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1131#discussion_r55936769
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
 ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents the output streams associated with each topic, and provides 
a public API to
+ * declare output streams and emmit tuples, on the appropriate stream, for 
all the topics specified.
+ */
+public class KafkaSpoutStreams implements Serializable {
+private final Map topicToStream;
+
+private KafkaSpoutStreams(Builder builder) {
+this.topicToStream = builder.topicToStream;
+}
+
+/**
+ * @param topic the topic for which to get output fields
+ * @return the output fields declared
+ */
+public Fields getOutputFields(String topic) {
+if (topicToStream.containsKey(topic)) {
+return topicToStream.get(topic).getOutputFields();
+}
+throw new IllegalStateException(this.getClass().getName() + " not 
configured for topic: " + topic);
+}
+
+/**
+ * @param topic the topic to for which to get the stream id
+ * @return the id of the stream to where the tuples are emitted
+ */
+public String getStreamId(String topic) {
+if (topicToStream.containsKey(topic)) {
+return topicToStream.get(topic).getStreamId();
+}
+throw new IllegalStateException(this.getClass().getName() + " not 
configured for topic: " + topic);
+}
+
+/**
+ * @return list of topics subscribed and emitting tuples to a stream 
as configured by {@link KafkaSpoutStream}
+ */
+public List getTopics() {
+return new ArrayList<>(topicToStream.keySet());
+}
+
+void declareOutputFields(OutputFieldsDeclarer declarer) {
+for (KafkaSpoutStream stream : topicToStream.values()) {
+declarer.declareStream(stream.getStreamId(), 
stream.getOutputFields());
+}
+}
+
+void emit(SpoutOutputCollector collector, MessageId messageId) {
+collector.emit(getStreamId(messageId.topic()), 
messageId.getTuple(), messageId);
--- End diff --

@harshach current kafka spout only keeps offsets in memory. This new spout 
keeps the actual record as well in memory. Offsets are 64 bytes long but there 
is no limit on the record size. 


> As a storm developer I’d like to use the new kafka consumer API (0.8.3) to 
> reduce dependencies and use long term supported kafka apis 
> --
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
>  Issue Type: Story
>  Components: storm-kafka
>Reporter: Thomas Becker
>Assignee: Hugo Louro
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: Fix minor bug in RAS Tests

2016-03-13 Thread vesense
Github user vesense commented on the pull request:

https://github.com/apache/storm/pull/1207#issuecomment-195952132
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-971) Storm-Kafka: Emit metric for messages lost due to kafka retention

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192337#comment-15192337
 ] 

ASF GitHub Bot commented on STORM-971:
--

Github user vesense commented on the pull request:

https://github.com/apache/storm/pull/1208#issuecomment-195951022
  
LGTM


> Storm-Kafka: Emit metric for messages lost due to kafka retention
> -
>
> Key: STORM-971
> URL: https://issues.apache.org/jira/browse/STORM-971
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-kafka
>Reporter: Abhishek Agarwal
>Assignee: Abhishek Agarwal
>
> In case of TopicOffsetOutOfRange exception, it is useful to know just how 
> many unread messages were lost. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-971: Metric for messages lost due to kaf...

2016-03-13 Thread vesense
Github user vesense commented on the pull request:

https://github.com/apache/storm/pull/1208#issuecomment-195951022
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-13 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r55934087
  
--- Diff: 
external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
 ---
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.hbase.trident.windowing;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.storm.trident.windowing.WindowsStore;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+
+import java.util.Map;
+
+public class HBaseWindowsStoreFactory implements WindowsStoreFactory {
+private final Map config;
+private final String tableName;
+private final byte[] family;
+private final byte[] qualifier;
+
+public HBaseWindowsStoreFactory(Map config, String 
tableName, byte[] family, byte[] qualifier) {
--- End diff --

@harshach All these fields should be available for windows-store-factory 
and they are not optional, that is why they are passed with the constructor. We 
may enclose them in a Configuration class(with immutable fields) and pass it to 
the constructor like the way it is done for HBaseStateFactory(Options). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192230#comment-15192230
 ] 

ASF GitHub Bot commented on STORM-676:
--

Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r55934087
  
--- Diff: 
external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/windowing/HBaseWindowsStoreFactory.java
 ---
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.hbase.trident.windowing;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.storm.trident.windowing.WindowsStore;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+
+import java.util.Map;
+
+public class HBaseWindowsStoreFactory implements WindowsStoreFactory {
+private final Map config;
+private final String tableName;
+private final byte[] family;
+private final byte[] qualifier;
+
+public HBaseWindowsStoreFactory(Map config, String 
tableName, byte[] family, byte[] qualifier) {
--- End diff --

@harshach All these fields should be available for windows-store-factory 
and they are not optional, that is why they are passed with the constructor. We 
may enclose them in a Configuration class(with immutable fields) and pass it to 
the constructor like the way it is done for HBaseStateFactory(Options). 


> Storm Trident support for sliding/tumbling windows
> --
>
> Key: STORM-676
> URL: https://issues.apache.org/jira/browse/STORM-676
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Sriharsha Chintalapani
>Assignee: Satish Duggana
> Fix For: 1.0.0, 2.0.0
>
> Attachments: StormTrident_windowing_support-676.pdf
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-676) Storm Trident support for sliding/tumbling windows

2016-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192224#comment-15192224
 ] 

ASF GitHub Bot commented on STORM-676:
--

Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r55933892
  
--- Diff: 
examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
 ---
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.starter.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseAggregator;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.testing.CountAsAggregator;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+import org.apache.storm.trident.windowing.config.SlidingCountWindow;
+import org.apache.storm.trident.windowing.config.SlidingDurationWindow;
+import org.apache.storm.trident.windowing.config.TumblingCountWindow;
+import org.apache.storm.trident.windowing.config.TumblingDurationWindow;
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
--- End diff --

Updated with missing java doc.


> Storm Trident support for sliding/tumbling windows
> --
>
> Key: STORM-676
> URL: https://issues.apache.org/jira/browse/STORM-676
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Sriharsha Chintalapani
>Assignee: Satish Duggana
> Fix For: 1.0.0, 2.0.0
>
> Attachments: StormTrident_windowing_support-676.pdf
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-676 Trident implementation for sliding a...

2016-03-13 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/1072#discussion_r55933892
  
--- Diff: 
examples/storm-starter/src/jvm/org/apache/storm/starter/trident/TridentWindowingInmemoryStoreTopology.java
 ---
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.starter.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseAggregator;
+import org.apache.storm.trident.operation.BaseFunction;
+import org.apache.storm.trident.operation.Function;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.operation.TridentOperationContext;
+import org.apache.storm.trident.operation.builtin.Debug;
+import org.apache.storm.trident.testing.CountAsAggregator;
+import org.apache.storm.trident.testing.FixedBatchSpout;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.windowing.InMemoryWindowsStoreFactory;
+import org.apache.storm.trident.windowing.WindowsStoreFactory;
+import org.apache.storm.trident.windowing.config.SlidingCountWindow;
+import org.apache.storm.trident.windowing.config.SlidingDurationWindow;
+import org.apache.storm.trident.windowing.config.TumblingCountWindow;
+import org.apache.storm.trident.windowing.config.TumblingDurationWindow;
+import org.apache.storm.trident.windowing.config.WindowConfig;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
--- End diff --

Updated with missing java doc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---