[jira] [Comment Edited] (STORM-561) Add ability to create topologies dynamically

2014-12-03 Thread Nathan Marz (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14233947#comment-14233947
 ] 

Nathan Marz edited comment on STORM-561 at 12/4/14 6:36 AM:


[~amontalenti] Why are you using the Clojure DSL for streamparse? I think 
you'll find things a lot easier to just create the Thrift objects directly in 
Python code. Storm is designed so that you don't even have to touch Java or any 
JVM language in order to construct and submit topologies.

Storm already has the ability to create topologies completely dynamically, 
where we define "dynamically" as not needing any recompilation. Storm's Thrift 
interfaces allow you to:

1. Create spout and bolt object of any language implementation from any other 
language. The ComponentObject struct can be specified either as a serialized 
java object, a java class name + arguments, or as a ShellComponent (for running 
spouts/bolts via subprocesses to allow for other languages).
2. Define topologies from any language. Storm gets this for free by the nature 
of topologies being Thrift objects.
3. Submit topologies from any language. Again, Storm gets this for free by the 
nature of Nimbus being a Thrift server and topologies being Thrift objects.

This means that the majority of what's been proposed in this issue is redundant 
with what's already in Storm. The exception is Trident, but that should have 
its own issue opened for this feature.

The direction I'd like to see a patch for this issue go is making a nice 
*library* in an interpreted language (like Python) that makes a pretty wrapper 
interface around the Thrift stuff, since manipulating Thrift objects directly 
is a little verbose. In addition, it can handle packaging of any artifacts the 
topology will need (like spout and bolt implementations) into a .jar file. The 
generated Python code for manipulating the Thrift structures is packaged with 
Storm at storm-core/src/py/. 

For reference, here's the storm.thrift file: 
https://github.com/apache/storm/blob/master/storm-core/src/storm.thrift



was (Author: marz):
[~amontalenti] Why are you using the Clojure DSL for streamparse? I think 
you'll find things a lot easier to just create the Thrift objects directly in 
Python code. Storm is designed so that you don't even have to touch Java or any 
JVM language in order to construct and submit topologies.

Storm already has the ability to create topologies completely dynamically, 
where we define "dynamically" as not needing any recompilation. Storm's Thrift 
interfaces allow you to:

1. Create spout and bolt object of any language implementation from any other 
language. The ComponentObject struct can be specified either as a serialized 
java object, a java class name + arguments, or as a ShellComponent (for running 
spouts/bolts via subprocesses to allow for other languages).
2. Define topologies from any language. Storm gets this for free by the nature 
of topologies being Thrift objects.
3. Submit topologies from any language. Again, Storm gets this for free by the 
nature of Nimbus being a Thrift server and topologies being Thrift objects.

This means that the majority of what's been proposed in this issue is redundant 
with what's already in Storm. The exception is Trident, but that should have 
its own issue opened for this feature.

The direction I'd like to see a patch for this issue go is making a nice 
*library* in an interpreted language (like Python) that makes a pretty wrapper 
interface around the Thrift stuff, since manipulating Thrift objects directly 
is a little verbose. In addition, it can handle packaging of any artifacts the 
topology will need (like spout and bolt implementations) into a .jar file. The 
generated Python code for manipulating the Thrift structures is packaged with 
Storm at storm-core/src/py/. 


> Add ability to create topologies dynamically
> 
>
> Key: STORM-561
> URL: https://issues.apache.org/jira/browse/STORM-561
> Project: Apache Storm
>  Issue Type: Improvement
>Reporter: Nathan Leung
>Assignee: Nathan Leung
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> It would be nice if a storm topology could be built dynamically, instead of 
> requiring a recompile to change parameters (e.g. number of workers, number of 
> tasks, layout, etc).
> I would propose the following data structures for building core storm 
> topologies.  I haven't done a design for trident yet but the intention would 
> be to add trident support when core storm support is complete (or in parallel 
> if there are other people working on it):
> {code}
> // fields value and arguments are mutually exclusive
> class Argument {
> String argumentType;  // Class used to lookup arguments in 
> method/constructor
> String implementationTy

[jira] [Commented] (STORM-561) Add ability to create topologies dynamically

2014-12-03 Thread Nathan Marz (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14233947#comment-14233947
 ] 

Nathan Marz commented on STORM-561:
---

[~amontalenti] Why are you using the Clojure DSL for streamparse? I think 
you'll find things a lot easier to just create the Thrift objects directly in 
Python code. Storm is designed so that you don't even have to touch Java or any 
JVM language in order to construct and submit topologies.

Storm already has the ability to create topologies completely dynamically, 
where we define "dynamically" as not needing any recompilation. Storm's Thrift 
interfaces allow you to:

1. Create spout and bolt object of any language implementation from any other 
language. The ComponentObject struct can be specified either as a serialized 
java object, a java class name + arguments, or as a ShellComponent (for running 
spouts/bolts via subprocesses to allow for other languages).
2. Define topologies from any language. Storm gets this for free by the nature 
of topologies being Thrift objects.
3. Submit topologies from any language. Again, Storm gets this for free by the 
nature of Nimbus being a Thrift server and topologies being Thrift objects.

This means that the majority of what's been proposed in this issue is redundant 
with what's already in Storm. The exception is Trident, but that should have 
its own issue opened for this feature.

The direction I'd like to see a patch for this issue go is making a nice 
*library* in an interpreted language (like Python) that makes a pretty wrapper 
interface around the Thrift stuff, since manipulating Thrift objects directly 
is a little verbose. In addition, it can handle packaging of any artifacts the 
topology will need (like spout and bolt implementations) into a .jar file. The 
generated Python code for manipulating the Thrift structures is packaged with 
Storm at storm-core/src/py/. 


> Add ability to create topologies dynamically
> 
>
> Key: STORM-561
> URL: https://issues.apache.org/jira/browse/STORM-561
> Project: Apache Storm
>  Issue Type: Improvement
>Reporter: Nathan Leung
>Assignee: Nathan Leung
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> It would be nice if a storm topology could be built dynamically, instead of 
> requiring a recompile to change parameters (e.g. number of workers, number of 
> tasks, layout, etc).
> I would propose the following data structures for building core storm 
> topologies.  I haven't done a design for trident yet but the intention would 
> be to add trident support when core storm support is complete (or in parallel 
> if there are other people working on it):
> {code}
> // fields value and arguments are mutually exclusive
> class Argument {
> String argumentType;  // Class used to lookup arguments in 
> method/constructor
> String implementationType; // Class used to create this argument
> String value; // String used to construct this argument
> List arguments; // arguments used to build this argument
> }
> class Dependency {
> String upstreamComponent; // name of upstream component
> String grouping;
> List arguments; // arguments for the grouping
> }
> class StormSpout {
> String name;
> String klazz;  // Class of this spout
> List  arguments;
> int numTasks;
> int numExecutors;
> }
> class StormBolt {
> String name;
> String klazz; // Class of this bolt
> List  arguments;
> int numTasks;
> int numExecutors;
> List dependencies;
> }
> class StormTopologyRepresentation {
> String name;
> List spouts;
> List bolts;
> Map config;
> int numWorkers;
> }
> {code}
> Topology creation will be built on top of the data structures above.  The 
> benefits:
> * Dependency free.  Code to unmarshal from json, xml, etc, can be kept in 
> extensions, or as examples, and users can write a different unmarshaller if 
> they want to use a different text representation.
> * support arbitrary spout and bolts types
> * support of all groupings, streams, via reflections
> * ability to specify configuration map via config file
> * reification of spout / bolt / dependency arguments
> ** recursive argument reification for complex objects



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-561) Add ability to create topologies dynamically

2014-12-03 Thread John Reilly (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14233795#comment-14233795
 ] 

John Reilly commented on STORM-561:
---

I was using spring as an example, but it should be equally applicable to any 
other config like mechanism that already allows you to instantiate objects.

I see this as possibily being 2 problems. 
* instantiation of objects to create the bolts, spouts and constructor params 
* creation of the storm topology using these objects.  

I see your Argument class as defined above to be something similar to the way 
that spring uses a "bean" declaration to create an object.  All the other 
classes are related to how a storm topology is defined.  If object 
instantiation is something that someones config framework of choice provides, 
it would be nice to be able to use that existing mechanism and just use the 
classes related to topology definition.  

This is just my 2c though, so take it with a grain of salt since it's not a 
feature that I see myself using (at least in the near future).

cheers,
John

> Add ability to create topologies dynamically
> 
>
> Key: STORM-561
> URL: https://issues.apache.org/jira/browse/STORM-561
> Project: Apache Storm
>  Issue Type: Improvement
>Reporter: Nathan Leung
>Assignee: Nathan Leung
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> It would be nice if a storm topology could be built dynamically, instead of 
> requiring a recompile to change parameters (e.g. number of workers, number of 
> tasks, layout, etc).
> I would propose the following data structures for building core storm 
> topologies.  I haven't done a design for trident yet but the intention would 
> be to add trident support when core storm support is complete (or in parallel 
> if there are other people working on it):
> {code}
> // fields value and arguments are mutually exclusive
> class Argument {
> String argumentType;  // Class used to lookup arguments in 
> method/constructor
> String implementationType; // Class used to create this argument
> String value; // String used to construct this argument
> List arguments; // arguments used to build this argument
> }
> class Dependency {
> String upstreamComponent; // name of upstream component
> String grouping;
> List arguments; // arguments for the grouping
> }
> class StormSpout {
> String name;
> String klazz;  // Class of this spout
> List  arguments;
> int numTasks;
> int numExecutors;
> }
> class StormBolt {
> String name;
> String klazz; // Class of this bolt
> List  arguments;
> int numTasks;
> int numExecutors;
> List dependencies;
> }
> class StormTopologyRepresentation {
> String name;
> List spouts;
> List bolts;
> Map config;
> int numWorkers;
> }
> {code}
> Topology creation will be built on top of the data structures above.  The 
> benefits:
> * Dependency free.  Code to unmarshal from json, xml, etc, can be kept in 
> extensions, or as examples, and users can write a different unmarshaller if 
> they want to use a different text representation.
> * support arbitrary spout and bolts types
> * support of all groupings, streams, via reflections
> * ability to specify configuration map via config file
> * reification of spout / bolt / dependency arguments
> ** recursive argument reification for complex objects



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (STORM-391) KafkaSpout to await for the topic

2014-12-03 Thread Kai Sasaki (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kai Sasaki reassigned STORM-391:


Assignee: Kai Sasaki

> KafkaSpout to await for the topic
> -
>
> Key: STORM-391
> URL: https://issues.apache.org/jira/browse/STORM-391
> Project: Apache Storm
>  Issue Type: Improvement
>Affects Versions: 0.9.2-incubating
>Reporter: Alexey Raga
>Assignee: Kai Sasaki
>  Labels: features
>
> When topic does not yet exist and the consumer is asked to consume from it, 
> the default behaviour for Kafka heigh-level consumer is to "await" for the 
> topic without a failure.
> KafkaSpout currently fails trying to get the partition information about the 
> topic that does not exist.
> It may be a good idea to have the same common behaviour in KafkaSpout and it 
> can probably be implemented through the zookeeper watchers: if topic does not 
> exist, then set up a watcher and don't do anything until it yields.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-391) KafkaSpout to await for the topic

2014-12-03 Thread Kai Sasaki (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14233722#comment-14233722
 ] 

Kai Sasaki commented on STORM-391:
--

[~sriharsha] I'm implementing this option with a event watcher for ZK now. I'll 
send PR for this patch. Then please review it.

> KafkaSpout to await for the topic
> -
>
> Key: STORM-391
> URL: https://issues.apache.org/jira/browse/STORM-391
> Project: Apache Storm
>  Issue Type: Improvement
>Affects Versions: 0.9.2-incubating
>Reporter: Alexey Raga
>  Labels: features
>
> When topic does not yet exist and the consumer is asked to consume from it, 
> the default behaviour for Kafka heigh-level consumer is to "await" for the 
> topic without a failure.
> KafkaSpout currently fails trying to get the partition information about the 
> topic that does not exist.
> It may be a good idea to have the same common behaviour in KafkaSpout and it 
> can probably be implemented through the zookeeper watchers: if topic does not 
> exist, then set up a watcher and don't do anything until it yields.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (STORM-511) Storm-Kafka spout keeps sending fetch requests with invalid offset

2014-12-03 Thread Sriharsha Chintalapani (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani updated STORM-511:
-
Assignee: Viktor Taranenko  (was: Sriharsha Chintalapani)

> Storm-Kafka spout keeps sending fetch requests with invalid offset
> --
>
> Key: STORM-511
> URL: https://issues.apache.org/jira/browse/STORM-511
> Project: Apache Storm
>  Issue Type: Bug
>Affects Versions: 0.9.3
>Reporter: Viktor Taranenko
>Assignee: Viktor Taranenko
>  Labels: kafka
> Fix For: 0.9.3, 0.9.3-rc2
>
>
> With default behaviour (KafkaConfig.useStartOffsetTimeIfOffsetOutOfRange == 
> true) when Kafka returns the error about offset being out of range, 
> storm.kafka.KafkaUtils.fetchMessages tries to fix offset in local scope and 
> retry fetch request. But if there are no more messages appeared under that 
> specified partition it will never update the PartitionManager, but keep 
> sending tons of requests with invalid offset to Kafka broker. On both sides 
> Storm and Kafka logs grow extremely quick during that time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-511) Storm-Kafka spout keeps sending fetch requests with invalid offset

2014-12-03 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14233711#comment-14233711
 ] 

Sriharsha Chintalapani commented on STORM-511:
--

Sorry keyboard shortcuts acted up and changed the assignee. Changed it back .

> Storm-Kafka spout keeps sending fetch requests with invalid offset
> --
>
> Key: STORM-511
> URL: https://issues.apache.org/jira/browse/STORM-511
> Project: Apache Storm
>  Issue Type: Bug
>Affects Versions: 0.9.3
>Reporter: Viktor Taranenko
>Assignee: Viktor Taranenko
>  Labels: kafka
> Fix For: 0.9.3, 0.9.3-rc2
>
>
> With default behaviour (KafkaConfig.useStartOffsetTimeIfOffsetOutOfRange == 
> true) when Kafka returns the error about offset being out of range, 
> storm.kafka.KafkaUtils.fetchMessages tries to fix offset in local scope and 
> retry fetch request. But if there are no more messages appeared under that 
> specified partition it will never update the PartitionManager, but keep 
> sending tons of requests with invalid offset to Kafka broker. On both sides 
> Storm and Kafka logs grow extremely quick during that time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (STORM-511) Storm-Kafka spout keeps sending fetch requests with invalid offset

2014-12-03 Thread Sriharsha Chintalapani (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani reassigned STORM-511:


Assignee: Sriharsha Chintalapani

> Storm-Kafka spout keeps sending fetch requests with invalid offset
> --
>
> Key: STORM-511
> URL: https://issues.apache.org/jira/browse/STORM-511
> Project: Apache Storm
>  Issue Type: Bug
>Affects Versions: 0.9.3
>Reporter: Viktor Taranenko
>Assignee: Sriharsha Chintalapani
>  Labels: kafka
> Fix For: 0.9.3, 0.9.3-rc2
>
>
> With default behaviour (KafkaConfig.useStartOffsetTimeIfOffsetOutOfRange == 
> true) when Kafka returns the error about offset being out of range, 
> storm.kafka.KafkaUtils.fetchMessages tries to fix offset in local scope and 
> retry fetch request. But if there are no more messages appeared under that 
> specified partition it will never update the PartitionManager, but keep 
> sending tons of requests with invalid offset to Kafka broker. On both sides 
> Storm and Kafka logs grow extremely quick during that time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-583) Add spout and bolt implementation for Azure Eventhubs

2014-12-03 Thread shanyu zhao (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14233635#comment-14233635
 ] 

shanyu zhao commented on STORM-583:
---

This is the pull request:
https://github.com/apache/storm/pull/336


> Add spout and bolt implementation for Azure Eventhubs
> -
>
> Key: STORM-583
> URL: https://issues.apache.org/jira/browse/STORM-583
> Project: Apache Storm
>  Issue Type: Bug
>Affects Versions: 0.9.3
>Reporter: shanyu zhao
>Assignee: shanyu zhao
> Fix For: 0.10.0
>
>
> Add spout and bolt implementations for Azure Eventhubs - a messaging service 
> that supports AMQP protocol. Just like storm-kafka/storm-hbase, we need to 
> add the project to the /external folder.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (STORM-583) Add spout and bolt implementation for Azure Eventhubs

2014-12-03 Thread shanyu zhao (JIRA)
shanyu zhao created STORM-583:
-

 Summary: Add spout and bolt implementation for Azure Eventhubs
 Key: STORM-583
 URL: https://issues.apache.org/jira/browse/STORM-583
 Project: Apache Storm
  Issue Type: Bug
Affects Versions: 0.9.3
Reporter: shanyu zhao
Assignee: shanyu zhao
 Fix For: 0.10.0


Add spout and bolt implementations for Azure Eventhubs - a messaging service 
that supports AMQP protocol. Just like storm-kafka/storm-hbase, we need to add 
the project to the /external folder.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (STORM-561) Add ability to create topologies dynamically

2014-12-03 Thread Nathan Leung (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14233550#comment-14233550
 ] 

Nathan Leung edited comment on STORM-561 at 12/3/14 9:29 PM:
-

Can you clarify?  In this case do you mean you would use the spring framework 
to create the spouts and bolts?  I was trying to avoid tying it to a specific 
framework or format, and something that may be easy in spring may not be as 
easy for other representations such as json.  Also, I don't think your example 
is doing this, but in general I was trying to avoid putting the onus on 
spout/bolt creation on the user.

To clarify myself, I haven't used spring before so maybe a better understanding 
of it would help me to better understand your example.  I'll take a look at the 
docs when I have a chance.


was (Author: ncleung):
Can you clarify?  In this case do you mean you would use the spring framework 
to create the spouts and bolts?  I was trying to avoid tying it to a specific 
framework or format, and something that may be easy in spring may not be as 
easy for other representations such as json.  Also, I don't think your example 
is doing this, but in general I was trying to avoid putting the onus on 
spout/bolt creation on the user.

> Add ability to create topologies dynamically
> 
>
> Key: STORM-561
> URL: https://issues.apache.org/jira/browse/STORM-561
> Project: Apache Storm
>  Issue Type: Improvement
>Reporter: Nathan Leung
>Assignee: Nathan Leung
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> It would be nice if a storm topology could be built dynamically, instead of 
> requiring a recompile to change parameters (e.g. number of workers, number of 
> tasks, layout, etc).
> I would propose the following data structures for building core storm 
> topologies.  I haven't done a design for trident yet but the intention would 
> be to add trident support when core storm support is complete (or in parallel 
> if there are other people working on it):
> {code}
> // fields value and arguments are mutually exclusive
> class Argument {
> String argumentType;  // Class used to lookup arguments in 
> method/constructor
> String implementationType; // Class used to create this argument
> String value; // String used to construct this argument
> List arguments; // arguments used to build this argument
> }
> class Dependency {
> String upstreamComponent; // name of upstream component
> String grouping;
> List arguments; // arguments for the grouping
> }
> class StormSpout {
> String name;
> String klazz;  // Class of this spout
> List  arguments;
> int numTasks;
> int numExecutors;
> }
> class StormBolt {
> String name;
> String klazz; // Class of this bolt
> List  arguments;
> int numTasks;
> int numExecutors;
> List dependencies;
> }
> class StormTopologyRepresentation {
> String name;
> List spouts;
> List bolts;
> Map config;
> int numWorkers;
> }
> {code}
> Topology creation will be built on top of the data structures above.  The 
> benefits:
> * Dependency free.  Code to unmarshal from json, xml, etc, can be kept in 
> extensions, or as examples, and users can write a different unmarshaller if 
> they want to use a different text representation.
> * support arbitrary spout and bolts types
> * support of all groupings, streams, via reflections
> * ability to specify configuration map via config file
> * reification of spout / bolt / dependency arguments
> ** recursive argument reification for complex objects



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-561) Add ability to create topologies dynamically

2014-12-03 Thread Nathan Leung (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14233550#comment-14233550
 ] 

Nathan Leung commented on STORM-561:


Can you clarify?  In this case do you mean you would use the spring framework 
to create the spouts and bolts?  I was trying to avoid tying it to a specific 
framework or format, and something that may be easy in spring may not be as 
easy for other representations such as json.  Also, I don't think your example 
is doing this, but in general I was trying to avoid putting the onus on 
spout/bolt creation on the user.

> Add ability to create topologies dynamically
> 
>
> Key: STORM-561
> URL: https://issues.apache.org/jira/browse/STORM-561
> Project: Apache Storm
>  Issue Type: Improvement
>Reporter: Nathan Leung
>Assignee: Nathan Leung
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> It would be nice if a storm topology could be built dynamically, instead of 
> requiring a recompile to change parameters (e.g. number of workers, number of 
> tasks, layout, etc).
> I would propose the following data structures for building core storm 
> topologies.  I haven't done a design for trident yet but the intention would 
> be to add trident support when core storm support is complete (or in parallel 
> if there are other people working on it):
> {code}
> // fields value and arguments are mutually exclusive
> class Argument {
> String argumentType;  // Class used to lookup arguments in 
> method/constructor
> String implementationType; // Class used to create this argument
> String value; // String used to construct this argument
> List arguments; // arguments used to build this argument
> }
> class Dependency {
> String upstreamComponent; // name of upstream component
> String grouping;
> List arguments; // arguments for the grouping
> }
> class StormSpout {
> String name;
> String klazz;  // Class of this spout
> List  arguments;
> int numTasks;
> int numExecutors;
> }
> class StormBolt {
> String name;
> String klazz; // Class of this bolt
> List  arguments;
> int numTasks;
> int numExecutors;
> List dependencies;
> }
> class StormTopologyRepresentation {
> String name;
> List spouts;
> List bolts;
> Map config;
> int numWorkers;
> }
> {code}
> Topology creation will be built on top of the data structures above.  The 
> benefits:
> * Dependency free.  Code to unmarshal from json, xml, etc, can be kept in 
> extensions, or as examples, and users can write a different unmarshaller if 
> they want to use a different text representation.
> * support arbitrary spout and bolts types
> * support of all groupings, streams, via reflections
> * ability to specify configuration map via config file
> * reification of spout / bolt / dependency arguments
> ** recursive argument reification for complex objects



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-391) KafkaSpout to await for the topic

2014-12-03 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14233227#comment-14233227
 ] 

Sriharsha Chintalapani commented on STORM-391:
--

[~lewuathe] can you make this as optional using KafkaConfig by asking users if 
they want to wait for the topic incase if it doesn't exists and provide a retry 
mechanism?. I am not sure on zookeeper watcher , is the plan is put a watcher 
on kafka zookeepers /topics node?

> KafkaSpout to await for the topic
> -
>
> Key: STORM-391
> URL: https://issues.apache.org/jira/browse/STORM-391
> Project: Apache Storm
>  Issue Type: Improvement
>Affects Versions: 0.9.2-incubating
>Reporter: Alexey Raga
>  Labels: features
>
> When topic does not yet exist and the consumer is asked to consume from it, 
> the default behaviour for Kafka heigh-level consumer is to "await" for the 
> topic without a failure.
> KafkaSpout currently fails trying to get the partition information about the 
> topic that does not exist.
> It may be a good idea to have the same common behaviour in KafkaSpout and it 
> can probably be implemented through the zookeeper watchers: if topic does not 
> exist, then set up a watcher and don't do anything until it yields.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


bolt stop receiving tuples

2014-12-03 Thread clay teahouse
Hello All,

I have this configuration:

spout -> Bolt A (emits tuples) -> Bolt B

Bolt A emits tuples successfully but bolt B stops receiving tuples after
the first time (it never enters the execute after the first time). The
first time execution seems to be successful. Any idea what the issue could
be or how trouble shoot the issue?

thanks,
Clay


[jira] [Comment Edited] (STORM-582) Nimbus Halt with FileNotFoundException: '../nimbus/../stormconf.ser' dose not exists

2014-12-03 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14233065#comment-14233065
 ] 

Sriharsha Chintalapani edited comment on STORM-582 at 12/3/14 2:48 PM:
---

[~Gvain] any reason for you to use /tmp for storm-local dir? . Also check if 
TMPTIME is set whcih can cause tmp directory to be cleaned up by the OS  
automatically. 


was (Author: sriharsha):
[~Gvain] any reason for you to use /tmp for storm-local dir?

> Nimbus Halt with FileNotFoundException: '../nimbus/../stormconf.ser' dose not 
> exists
> 
>
> Key: STORM-582
> URL: https://issues.apache.org/jira/browse/STORM-582
> Project: Apache Storm
>  Issue Type: Bug
>Affects Versions: 0.9.2-incubating
>Reporter: Jiahong Li
>
>  To notice, it is different from STORM-130. It is nimbus to halt. We ran into 
> this problem several times, every time it happens is after several days of 
> stable running. Here is the stacktrace
> ==
> 2014-11-03 14:30:56 b.s.d.nimbus [INFO] Cleaning inbox ... deleted: 
> stormjar-c1c856f0-cf8b-4299-9c20-712f169802b7.jar
> 2014-11-12 19:32:33 b.s.d.nimbus [ERROR] Error when processing event
> java.io.FileNotFoundException: File 
> '/tmp/storm-0.9.3/nimbus/stormdist/DNSAnalyse-7-1414992073/stormconf.ser' 
> does not exist
> at 
> org.apache.commons.io.FileUtils.openInputStream(FileUtils.java:299) 
> ~[commons-io-2.4.jar:2.4]
> at 
> org.apache.commons.io.FileUtils.readFileToByteArray(FileUtils.java:1763) 
> ~[commons-io-2.4.jar:2.4]
> at backtype.storm.daemon.nimbus$read_storm_conf.invoke(nimbus.clj:89) 
> ~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
> at 
> backtype.storm.daemon.nimbus$read_topology_details.invoke(nimbus.clj:324) 
> ~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
> at 
> backtype.storm.daemon.nimbus$mk_assignments$iter__3100__3104$fn__3105.invoke(nimbus.clj:649)
>  ~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
> at clojure.lang.LazySeq.sval(LazySeq.java:42) ~[clojure-1.5.1.jar:na]
> at clojure.lang.LazySeq.seq(LazySeq.java:60) ~[clojure-1.5.1.jar:na]
> at clojure.lang.RT.seq(RT.java:484) ~[clojure-1.5.1.jar:na]
> at clojure.core$seq.invoke(core.clj:133) ~[clojure-1.5.1.jar:na]
> at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30) 
> ~[clojure-1.5.1.jar:na]
> at clojure.core.protocols$fn__6026.invoke(protocols.clj:54) 
> ~[clojure-1.5.1.jar:na]
> at 
> clojure.core.protocols$fn__5979$G__5974__5992.invoke(protocols.clj:13) 
> ~[clojure-1.5.1.jar:na]
> at clojure.core$reduce.invoke(core.clj:6177) ~[clojure-1.5.1.jar:na]
> at clojure.core$into.invoke(core.clj:6229) ~[clojure-1.5.1.jar:na]
> at 
> backtype.storm.daemon.nimbus$mk_assignments.doInvoke(nimbus.clj:648) 
> ~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
> at clojure.lang.RestFn.invoke(RestFn.java:410) ~[clojure-1.5.1.jar:na]
> at 
> backtype.storm.daemon.nimbus$fn__3281$exec_fn__1205__auto3282$fn__3287$fn__3288.invoke(nimbus.clj:907)
>  ~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
> at 
> backtype.storm.daemon.nimbus$fn__3281$exec_fn__1205__auto3282$fn__3287.invoke(nimbus.clj:906)
>  ~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
> at 
> backtype.storm.timer$schedule_recurring$this__2169.invoke(timer.clj:99) 
> ~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
> at 
> backtype.storm.timer$mk_timer$fn__2152$fn__2153.invoke(timer.clj:50) 
> ~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT] 
> at backtype.storm.timer$mk_timer$fn__2152.invoke(timer.clj:42) 
> [storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
> at java.lang.Thread.run(Thread.java:679) [na:1.6.0_22]
> 2014-11-12 19:32:33 b.s.util [INFO] Halting process: ("Error when processing 
> an event")
> 2014-11-12 19:32:33 b.s.d.nimbus [INFO] Shutting down master
> =
> To notice, after stable running for 9 days, without printing "Clean up 
> {storm-id}" logs, nimbus halt.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-582) Nimbus Halt with FileNotFoundException: '../nimbus/../stormconf.ser' dose not exists

2014-12-03 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14233065#comment-14233065
 ] 

Sriharsha Chintalapani commented on STORM-582:
--

[~Gvain] any reason for you to use /tmp for storm-local dir?

> Nimbus Halt with FileNotFoundException: '../nimbus/../stormconf.ser' dose not 
> exists
> 
>
> Key: STORM-582
> URL: https://issues.apache.org/jira/browse/STORM-582
> Project: Apache Storm
>  Issue Type: Bug
>Affects Versions: 0.9.2-incubating
>Reporter: Jiahong Li
>
>  To notice, it is different from STORM-130. It is nimbus to halt. We ran into 
> this problem several times, every time it happens is after several days of 
> stable running. Here is the stacktrace
> ==
> 2014-11-03 14:30:56 b.s.d.nimbus [INFO] Cleaning inbox ... deleted: 
> stormjar-c1c856f0-cf8b-4299-9c20-712f169802b7.jar
> 2014-11-12 19:32:33 b.s.d.nimbus [ERROR] Error when processing event
> java.io.FileNotFoundException: File 
> '/tmp/storm-0.9.3/nimbus/stormdist/DNSAnalyse-7-1414992073/stormconf.ser' 
> does not exist
> at 
> org.apache.commons.io.FileUtils.openInputStream(FileUtils.java:299) 
> ~[commons-io-2.4.jar:2.4]
> at 
> org.apache.commons.io.FileUtils.readFileToByteArray(FileUtils.java:1763) 
> ~[commons-io-2.4.jar:2.4]
> at backtype.storm.daemon.nimbus$read_storm_conf.invoke(nimbus.clj:89) 
> ~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
> at 
> backtype.storm.daemon.nimbus$read_topology_details.invoke(nimbus.clj:324) 
> ~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
> at 
> backtype.storm.daemon.nimbus$mk_assignments$iter__3100__3104$fn__3105.invoke(nimbus.clj:649)
>  ~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
> at clojure.lang.LazySeq.sval(LazySeq.java:42) ~[clojure-1.5.1.jar:na]
> at clojure.lang.LazySeq.seq(LazySeq.java:60) ~[clojure-1.5.1.jar:na]
> at clojure.lang.RT.seq(RT.java:484) ~[clojure-1.5.1.jar:na]
> at clojure.core$seq.invoke(core.clj:133) ~[clojure-1.5.1.jar:na]
> at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30) 
> ~[clojure-1.5.1.jar:na]
> at clojure.core.protocols$fn__6026.invoke(protocols.clj:54) 
> ~[clojure-1.5.1.jar:na]
> at 
> clojure.core.protocols$fn__5979$G__5974__5992.invoke(protocols.clj:13) 
> ~[clojure-1.5.1.jar:na]
> at clojure.core$reduce.invoke(core.clj:6177) ~[clojure-1.5.1.jar:na]
> at clojure.core$into.invoke(core.clj:6229) ~[clojure-1.5.1.jar:na]
> at 
> backtype.storm.daemon.nimbus$mk_assignments.doInvoke(nimbus.clj:648) 
> ~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
> at clojure.lang.RestFn.invoke(RestFn.java:410) ~[clojure-1.5.1.jar:na]
> at 
> backtype.storm.daemon.nimbus$fn__3281$exec_fn__1205__auto3282$fn__3287$fn__3288.invoke(nimbus.clj:907)
>  ~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
> at 
> backtype.storm.daemon.nimbus$fn__3281$exec_fn__1205__auto3282$fn__3287.invoke(nimbus.clj:906)
>  ~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
> at 
> backtype.storm.timer$schedule_recurring$this__2169.invoke(timer.clj:99) 
> ~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
> at 
> backtype.storm.timer$mk_timer$fn__2152$fn__2153.invoke(timer.clj:50) 
> ~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT] 
> at backtype.storm.timer$mk_timer$fn__2152.invoke(timer.clj:42) 
> [storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
> at java.lang.Thread.run(Thread.java:679) [na:1.6.0_22]
> 2014-11-12 19:32:33 b.s.util [INFO] Halting process: ("Error when processing 
> an event")
> 2014-11-12 19:32:33 b.s.d.nimbus [INFO] Shutting down master
> =
> To notice, after stable running for 9 days, without printing "Clean up 
> {storm-id}" logs, nimbus halt.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (STORM-582) Nimbus Halt with FileNotFoundException: '../nimbus/../stormconf.ser' dose not exists

2014-12-03 Thread Jiahong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiahong Li updated STORM-582:
-
Description: 
 To notice, it is different from STORM-130. It is nimbus to halt. We ran into 
this problem several times, every time it happens is after several days of 
stable running. Here is the stacktrace
==
2014-11-03 14:30:56 b.s.d.nimbus [INFO] Cleaning inbox ... deleted: 
stormjar-c1c856f0-cf8b-4299-9c20-712f169802b7.jar
2014-11-12 19:32:33 b.s.d.nimbus [ERROR] Error when processing event
java.io.FileNotFoundException: File 
'/tmp/storm-0.9.3/nimbus/stormdist/DNSAnalyse-7-1414992073/stormconf.ser' does 
not exist
at org.apache.commons.io.FileUtils.openInputStream(FileUtils.java:299) 
~[commons-io-2.4.jar:2.4]
at 
org.apache.commons.io.FileUtils.readFileToByteArray(FileUtils.java:1763) 
~[commons-io-2.4.jar:2.4]
at backtype.storm.daemon.nimbus$read_storm_conf.invoke(nimbus.clj:89) 
~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
at 
backtype.storm.daemon.nimbus$read_topology_details.invoke(nimbus.clj:324) 
~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
at 
backtype.storm.daemon.nimbus$mk_assignments$iter__3100__3104$fn__3105.invoke(nimbus.clj:649)
 ~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
at clojure.lang.LazySeq.sval(LazySeq.java:42) ~[clojure-1.5.1.jar:na]
at clojure.lang.LazySeq.seq(LazySeq.java:60) ~[clojure-1.5.1.jar:na]
at clojure.lang.RT.seq(RT.java:484) ~[clojure-1.5.1.jar:na]
at clojure.core$seq.invoke(core.clj:133) ~[clojure-1.5.1.jar:na]
at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30) 
~[clojure-1.5.1.jar:na]
at clojure.core.protocols$fn__6026.invoke(protocols.clj:54) 
~[clojure-1.5.1.jar:na]
at 
clojure.core.protocols$fn__5979$G__5974__5992.invoke(protocols.clj:13) 
~[clojure-1.5.1.jar:na]
at clojure.core$reduce.invoke(core.clj:6177) ~[clojure-1.5.1.jar:na]
at clojure.core$into.invoke(core.clj:6229) ~[clojure-1.5.1.jar:na]
at backtype.storm.daemon.nimbus$mk_assignments.doInvoke(nimbus.clj:648) 
~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
at clojure.lang.RestFn.invoke(RestFn.java:410) ~[clojure-1.5.1.jar:na]
at 
backtype.storm.daemon.nimbus$fn__3281$exec_fn__1205__auto3282$fn__3287$fn__3288.invoke(nimbus.clj:907)
 ~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
at 
backtype.storm.daemon.nimbus$fn__3281$exec_fn__1205__auto3282$fn__3287.invoke(nimbus.clj:906)
 ~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
at 
backtype.storm.timer$schedule_recurring$this__2169.invoke(timer.clj:99) 
~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
at backtype.storm.timer$mk_timer$fn__2152$fn__2153.invoke(timer.clj:50) 
~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT] 
at backtype.storm.timer$mk_timer$fn__2152.invoke(timer.clj:42) 
[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:679) [na:1.6.0_22]
2014-11-12 19:32:33 b.s.util [INFO] Halting process: ("Error when processing an 
event")
2014-11-12 19:32:33 b.s.d.nimbus [INFO] Shutting down master
=
To notice, after stable running for 9 days, without printing "Clean up 
{storm-id}" logs, nimbus halt.

  was:
 To notice, it is different from STORM-130. It is nimbus to halt. We ran into 
this problem several times, every time it happens is after several days of 
stable running. Here is the stacktrace
==
2014-11-03 14:30:56 b.s.d.nimbus [INFO] Cleaning inbox ... deleted: 
stormjar-c1c856f0-cf8b-4299-9c20-712f169802b7.jar
2014-11-12 19:32:33 b.s.d.nimbus [ERROR] Error when processing event
java.io.FileNotFoundException: File 
'/tmp/storm-0.9.3/nimbus/stormdist/DNSAnalyse-7-1414992073/stormconf.ser' does 
not exist
at org.apache.commons.io.FileUtils.openInputStream(FileUtils.java:299) 
~[commons-io-2.4.jar:2.4]
at 
org.apache.commons.io.FileUtils.readFileToByteArray(FileUtils.java:1763) 
~[commons-io-2.4.jar:2.4]
at backtype.storm.daemon.nimbus$read_storm_conf.invoke(nimbus.clj:89) 
~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
at 
backtype.storm.daemon.nimbus$read_topology_details.invoke(nimbus.clj:324) 
~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
at 
backtype.storm.daemon.nimbus$mk_assignments$iter__3100__3104$fn__3105.invoke(nimbus.clj:649)
 ~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
at clojure.lang

[jira] [Created] (STORM-582) Nimbus Halt with FileNotFoundException: '../nimbus/../stormconf.ser' dose not exists

2014-12-03 Thread Jiahong Li (JIRA)
Jiahong Li created STORM-582:


 Summary: Nimbus Halt with FileNotFoundException: 
'../nimbus/../stormconf.ser' dose not exists
 Key: STORM-582
 URL: https://issues.apache.org/jira/browse/STORM-582
 Project: Apache Storm
  Issue Type: Bug
Affects Versions: 0.9.2-incubating
Reporter: Jiahong Li


 To notice, it is different from STORM-130. It is nimbus to halt. We ran into 
this problem several times, every time it happens is after several days of 
stable running. Here is the stacktrace
==
2014-11-03 14:30:56 b.s.d.nimbus [INFO] Cleaning inbox ... deleted: 
stormjar-c1c856f0-cf8b-4299-9c20-712f169802b7.jar
2014-11-12 19:32:33 b.s.d.nimbus [ERROR] Error when processing event
java.io.FileNotFoundException: File 
'/tmp/storm-0.9.3/nimbus/stormdist/DNSAnalyse-7-1414992073/stormconf.ser' does 
not exist
at org.apache.commons.io.FileUtils.openInputStream(FileUtils.java:299) 
~[commons-io-2.4.jar:2.4]
at 
org.apache.commons.io.FileUtils.readFileToByteArray(FileUtils.java:1763) 
~[commons-io-2.4.jar:2.4]
at backtype.storm.daemon.nimbus$read_storm_conf.invoke(nimbus.clj:89) 
~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
at 
backtype.storm.daemon.nimbus$read_topology_details.invoke(nimbus.clj:324) 
~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
at 
backtype.storm.daemon.nimbus$mk_assignments$iter__3100__3104$fn__3105.invoke(nimbus.clj:649)
 ~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
at clojure.lang.LazySeq.sval(LazySeq.java:42) ~[clojure-1.5.1.jar:na]
at clojure.lang.LazySeq.seq(LazySeq.java:60) ~[clojure-1.5.1.jar:na]
at clojure.lang.RT.seq(RT.java:484) ~[clojure-1.5.1.jar:na]
at clojure.core$seq.invoke(core.clj:133) ~[clojure-1.5.1.jar:na]
at clojure.core.protocols$seq_reduce.invoke(protocols.clj:30) 
~[clojure-1.5.1.jar:na]
at clojure.core.protocols$fn__6026.invoke(protocols.clj:54) 
~[clojure-1.5.1.jar:na]
at 
clojure.core.protocols$fn__5979$G__5974__5992.invoke(protocols.clj:13) 
~[clojure-1.5.1.jar:na]
at clojure.core$reduce.invoke(core.clj:6177) ~[clojure-1.5.1.jar:na]
at clojure.core$into.invoke(core.clj:6229) ~[clojure-1.5.1.jar:na]
at backtype.storm.daemon.nimbus$mk_assignments.doInvoke(nimbus.clj:648) 
~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
at clojure.lang.RestFn.invoke(RestFn.java:410) ~[clojure-1.5.1.jar:na]
at 
backtype.storm.daemon.nimbus$fn__3281$exec_fn__1205__auto3282$fn__3287$fn__3288.invoke(nimbus.clj:907)
 ~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
at 
backtype.storm.daemon.nimbus$fn__3281$exec_fn__1205__auto3282$fn__3287.invoke(nimbus.clj:906)
 ~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
at 
backtype.storm.timer$schedule_recurring$this__2169.invoke(timer.clj:99) 
~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
at backtype.storm.timer$mk_timer$fn__2152$fn__2153.invoke(timer.clj:50) 
~[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT] 
at backtype.storm.timer$mk_timer$fn__2152.invoke(timer.clj:42) 
[storm-core-0.9.3-incubating-SNAPSHOT.jar:0.9.3-incubating-SNAPSHOT]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:679) [na:1.6.0_22]
2014-11-12 19:32:33 b.s.util [INFO] Halting process: ("Error when processing an 
event")
2014-11-12 19:32:33 b.s.d.nimbus [INFO] Shutting down master
=
To notice, after stable running for 9 days, without printing "Clean up 
{storm-id}" logs, nimbus halt.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (STORM-391) KafkaSpout to await for the topic

2014-12-03 Thread Kai Sasaki (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kai Sasaki updated STORM-391:
-
Assignee: (was: Kai Sasaki)

> KafkaSpout to await for the topic
> -
>
> Key: STORM-391
> URL: https://issues.apache.org/jira/browse/STORM-391
> Project: Apache Storm
>  Issue Type: Improvement
>Affects Versions: 0.9.2-incubating
>Reporter: Alexey Raga
>  Labels: features
>
> When topic does not yet exist and the consumer is asked to consume from it, 
> the default behaviour for Kafka heigh-level consumer is to "await" for the 
> topic without a failure.
> KafkaSpout currently fails trying to get the partition information about the 
> topic that does not exist.
> It may be a good idea to have the same common behaviour in KafkaSpout and it 
> can probably be implemented through the zookeeper watchers: if topic does not 
> exist, then set up a watcher and don't do anything until it yields.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (STORM-391) KafkaSpout to await for the topic

2014-12-03 Thread Kai Sasaki (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kai Sasaki reassigned STORM-391:


Assignee: Kai Sasaki

> KafkaSpout to await for the topic
> -
>
> Key: STORM-391
> URL: https://issues.apache.org/jira/browse/STORM-391
> Project: Apache Storm
>  Issue Type: Improvement
>Affects Versions: 0.9.2-incubating
>Reporter: Alexey Raga
>Assignee: Kai Sasaki
>  Labels: features
>
> When topic does not yet exist and the consumer is asked to consume from it, 
> the default behaviour for Kafka heigh-level consumer is to "await" for the 
> topic without a failure.
> KafkaSpout currently fails trying to get the partition information about the 
> topic that does not exist.
> It may be a good idea to have the same common behaviour in KafkaSpout and it 
> can probably be implemented through the zookeeper watchers: if topic does not 
> exist, then set up a watcher and don't do anything until it yields.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-561) Add ability to create topologies dynamically

2014-12-03 Thread John Reilly (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14232767#comment-14232767
 ] 

John Reilly commented on STORM-561:
---

I was thinking the object itself, not a serialized representation.  I figured 
that providing the object would be the responsibility of whatever is using 
these classes, probably as part of the deserialization.

One example I can imagine is that these classes are used in spring xml or 
something similar





  





Although...after typing that, it feels like the creation of the spouts and 
bolts is even something that could be delegated to the whatever is using these 
core classes. e.g. 




  

  

  





and 

class StormSpout {
String name;
ISpout spout;
int numTasks;
int numExecutors;
}




> Add ability to create topologies dynamically
> 
>
> Key: STORM-561
> URL: https://issues.apache.org/jira/browse/STORM-561
> Project: Apache Storm
>  Issue Type: Improvement
>Reporter: Nathan Leung
>Assignee: Nathan Leung
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> It would be nice if a storm topology could be built dynamically, instead of 
> requiring a recompile to change parameters (e.g. number of workers, number of 
> tasks, layout, etc).
> I would propose the following data structures for building core storm 
> topologies.  I haven't done a design for trident yet but the intention would 
> be to add trident support when core storm support is complete (or in parallel 
> if there are other people working on it):
> {code}
> // fields value and arguments are mutually exclusive
> class Argument {
> String argumentType;  // Class used to lookup arguments in 
> method/constructor
> String implementationType; // Class used to create this argument
> String value; // String used to construct this argument
> List arguments; // arguments used to build this argument
> }
> class Dependency {
> String upstreamComponent; // name of upstream component
> String grouping;
> List arguments; // arguments for the grouping
> }
> class StormSpout {
> String name;
> String klazz;  // Class of this spout
> List  arguments;
> int numTasks;
> int numExecutors;
> }
> class StormBolt {
> String name;
> String klazz; // Class of this bolt
> List  arguments;
> int numTasks;
> int numExecutors;
> List dependencies;
> }
> class StormTopologyRepresentation {
> String name;
> List spouts;
> List bolts;
> Map config;
> int numWorkers;
> }
> {code}
> Topology creation will be built on top of the data structures above.  The 
> benefits:
> * Dependency free.  Code to unmarshal from json, xml, etc, can be kept in 
> extensions, or as examples, and users can write a different unmarshaller if 
> they want to use a different text representation.
> * support arbitrary spout and bolts types
> * support of all groupings, streams, via reflections
> * ability to specify configuration map via config file
> * reification of spout / bolt / dependency arguments
> ** recursive argument reification for complex objects



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (STORM-561) Add ability to create topologies dynamically

2014-12-03 Thread John Reilly (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14232767#comment-14232767
 ] 

John Reilly edited comment on STORM-561 at 12/3/14 8:36 AM:


I was thinking the object itself, not a serialized representation.  I figured 
that providing the object would be the responsibility of whatever is using 
these classes, probably as part of the deserialization.

One example I can imagine is that these classes are used in spring xml or 
something similar
{code}




  




{code}
Although...after typing that, it feels like the creation of the spouts and 
bolts is even something that could be delegated to the whatever is using these 
core classes. e.g. 
{code}



  

  

  





where StormSpout looks like 

class StormSpout {
String name;
ISpout spout;
int numTasks;
int numExecutors;
}
{code}



was (Author: jr):
I was thinking the object itself, not a serialized representation.  I figured 
that providing the object would be the responsibility of whatever is using 
these classes, probably as part of the deserialization.

One example I can imagine is that these classes are used in spring xml or 
something similar
{code}




  




{code}
Although...after typing that, it feels like the creation of the spouts and 
bolts is even something that could be delegated to the whatever is using these 
core classes. e.g. 
{code}



  

  

  





and 

class StormSpout {
String name;
ISpout spout;
int numTasks;
int numExecutors;
}
{code}


> Add ability to create topologies dynamically
> 
>
> Key: STORM-561
> URL: https://issues.apache.org/jira/browse/STORM-561
> Project: Apache Storm
>  Issue Type: Improvement
>Reporter: Nathan Leung
>Assignee: Nathan Leung
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> It would be nice if a storm topology could be built dynamically, instead of 
> requiring a recompile to change parameters (e.g. number of workers, number of 
> tasks, layout, etc).
> I would propose the following data structures for building core storm 
> topologies.  I haven't done a design for trident yet but the intention would 
> be to add trident support when core storm support is complete (or in parallel 
> if there are other people working on it):
> {code}
> // fields value and arguments are mutually exclusive
> class Argument {
> String argumentType;  // Class used to lookup arguments in 
> method/constructor
> String implementationType; // Class used to create this argument
> String value; // String used to construct this argument
> List arguments; // arguments used to build this argument
> }
> class Dependency {
> String upstreamComponent; // name of upstream component
> String grouping;
> List arguments; // arguments for the grouping
> }
> class StormSpout {
> String name;
> String klazz;  // Class of this spout
> List  arguments;
> int numTasks;
> int numExecutors;
> }
> class StormBolt {
> String name;
> String klazz; // Class of this bolt
> List  arguments;
> int numTasks;
> int numExecutors;
> List dependencies;
> }
> class StormTopologyRepresentation {
> String name;
> List spouts;
> List bolts;
> Map config;
> int numWorkers;
> }
> {code}
> Topology creation will be built on top of the data structures above.  The 
> benefits:
> * Dependency free.  Code to unmarshal from json, xml, etc, can be kept in 
> extensions, or as examples, and users can write a different unmarshaller if 
> they want to use a different text representation.
> * support arbitrary spout and bolts types
> * support of all groupings, streams, via reflections
> * ability to specify configuration map via config file
> * reification of spout / bolt / dependency arguments
> ** recursive argument reification for complex objects



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (STORM-561) Add ability to create topologies dynamically

2014-12-03 Thread John Reilly (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14232767#comment-14232767
 ] 

John Reilly edited comment on STORM-561 at 12/3/14 8:34 AM:


I was thinking the object itself, not a serialized representation.  I figured 
that providing the object would be the responsibility of whatever is using 
these classes, probably as part of the deserialization.

One example I can imagine is that these classes are used in spring xml or 
something similar
{code}




  




{code}
Although...after typing that, it feels like the creation of the spouts and 
bolts is even something that could be delegated to the whatever is using these 
core classes. e.g. 
{code}



  

  

  





and 

class StormSpout {
String name;
ISpout spout;
int numTasks;
int numExecutors;
}
{code}



was (Author: jr):
I was thinking the object itself, not a serialized representation.  I figured 
that providing the object would be the responsibility of whatever is using 
these classes, probably as part of the deserialization.

One example I can imagine is that these classes are used in spring xml or 
something similar





  





Although...after typing that, it feels like the creation of the spouts and 
bolts is even something that could be delegated to the whatever is using these 
core classes. e.g. 




  

  

  





and 

class StormSpout {
String name;
ISpout spout;
int numTasks;
int numExecutors;
}




> Add ability to create topologies dynamically
> 
>
> Key: STORM-561
> URL: https://issues.apache.org/jira/browse/STORM-561
> Project: Apache Storm
>  Issue Type: Improvement
>Reporter: Nathan Leung
>Assignee: Nathan Leung
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> It would be nice if a storm topology could be built dynamically, instead of 
> requiring a recompile to change parameters (e.g. number of workers, number of 
> tasks, layout, etc).
> I would propose the following data structures for building core storm 
> topologies.  I haven't done a design for trident yet but the intention would 
> be to add trident support when core storm support is complete (or in parallel 
> if there are other people working on it):
> {code}
> // fields value and arguments are mutually exclusive
> class Argument {
> String argumentType;  // Class used to lookup arguments in 
> method/constructor
> String implementationType; // Class used to create this argument
> String value; // String used to construct this argument
> List arguments; // arguments used to build this argument
> }
> class Dependency {
> String upstreamComponent; // name of upstream component
> String grouping;
> List arguments; // arguments for the grouping
> }
> class StormSpout {
> String name;
> String klazz;  // Class of this spout
> List  arguments;
> int numTasks;
> int numExecutors;
> }
> class StormBolt {
> String name;
> String klazz; // Class of this bolt
> List  arguments;
> int numTasks;
> int numExecutors;
> List dependencies;
> }
> class StormTopologyRepresentation {
> String name;
> List spouts;
> List bolts;
> Map config;
> int numWorkers;
> }
> {code}
> Topology creation will be built on top of the data structures above.  The 
> benefits:
> * Dependency free.  Code to unmarshal from json, xml, etc, can be kept in 
> extensions, or as examples, and users can write a different unmarshaller if 
> they want to use a different text representation.
> * support arbitrary spout and bolts types
> * support of all groupings, streams, via reflections
> * ability to specify configuration map via config file
> * reification of spout / bolt / dependency arguments
> ** recursive argument reification for complex objects



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)