Re: Multiple groupBy with Trident

2014-09-18 Thread Adam Lewis
You can't emit on two streams in trident, but you can use the DSL to
split a stream into two processing paths, e.g.

Stream *queryResultStream* = someState.stateQuery(...);

Stream firstGroupingAggregate = *queryResultStream*
   .groupBy(/* first grouping */)
   .aggregate(...)
;

Stream secondGroupingAggregate = *queryResultStream*
   .groupBy(/* second grouping */)
   .aggregate(...)
;

I'm not entirely sure I understand the original request, but if the idea of
doing different processing on identical streams works for you, then this is
the idiom for doing that.

On Thu, Sep 18, 2014 at 4:04 AM, Bechennec, Marion mar...@dictanova.com
wrote:

 Hi,

 Thanks for the answer, but in Trident it doesn't seem possible to emit on
 a different streams in the same bolt, therefore it wouldn't be possible to
 combine the output of the 2 streams.

 2014-09-15 20:54 GMT+02:00 John Reilly j...@inconspicuous.org:

 Can you split the initial stream into 2 identical streams (using a bolt)
 and then perform a groupBy on each of the streams and then combine the
 output of those 2 groupBys?

 On Fri, Sep 12, 2014 at 9:51 AM, Bechennec, Marion mar...@dictanova.com
 wrote:

 Hi,

 For one of our applications we are trying to perform multiple groupBy
 statements on the same stream, ideally it should look like this :

 .stateQuery(...)
 .groupBy(new Fields(field))
 .chainedAgg()
 .groupBy(new Fields(anotherField))
 .chainedAgg()
 .aggregate( ...)
 .chainEnd()
 .aggregate()
 .aggregate()
 .chainEnd();

 Obviously this doesn't work, we've tried several things, and came up
 with something like this :
 .stateQuery(...)
 .groupBy(new Fields(field, anotherField))
 .aggregate( /* Do something */)
 .groupBy(new Fields(lemma))
 .chainedAgg()
 .aggregate()
 .aggregate()
 .chainEnd();

 However, this doesn't work either, we are not able to reemit the initial
 values on the input stream of the second groupBy statement.

 Any thoughts on how can this be accomplished ?

 Thank you for your help,

 Marion






Re: Packaging Multiple Topology Jars

2014-06-26 Thread Adam Lewis
Others correct me if I'm wrong, but I believe using your multi-topology
uber jar to deploy only one topology should not affect the other running
topologies that were (in the past) deployed using an earlier version of the
same jar.  I believe the jar is stored at time of deployment under a unique
ID.  You also don't strictly need to have a separate main methods for each
topology, assuming you parameterize your use of the StormSubmitter.  I've
done this with success where I have one main method which deploys multiple
topologies in a loop.  If the topology is already deployed, an exception is
thrown (which I catch inside the loop) and continue on.  My dev deployment
process then becomes: kill topologies I wish to re-deploy, run the one main
method which tries to submit all topos, but only succeeds on the ones which
had been killed.


On Thu, Jun 26, 2014 at 2:37 PM, Sandon Jacobs sjac...@appia.com wrote:

  We currently have 1 project in GIT containing multiple topologies. With
 this model, I use 1 compiled artifact containing several “topology”
 classes, each with a main method to configure, build, and submit the
 topology. Some of these topologies share some common components (bolt
 classes, util classes, etc…).

  I do not necessarily need to deploy the newest version of each topology
 ever time we release code. Here are couple of options I have thought of:

- Break up project into a parent module, keeping 1 GIT repo, with a
child module for common components and child for each topology.
- Break the common code into a GIT repo, then each topology into a GIT
repo (don’t really wanna do this one at all).
- Have the gradle build create a JAR per topology, using
exclusion/inclusion in gradle a task.

 I see PROs and CONs to each approach. I am curious as to how others are
 maintaining this model. Do you have a separate compiled artifact for each
 topology? Do you use a similar approach to ours?

  Thanks in advance…



Re: persistentAggregate chaining in Trident?

2014-06-24 Thread Adam Lewis
It sounds like you want a persistentAggregate to occur before the stream
grouping by word, would this work?
in pseudo trident DSL:

Stream words = ...;
words.groupBy(word).persistentAggregate(...);
words.persistentAggregate(...);


On Tue, Jun 24, 2014 at 12:45 PM, Can Gencer cgen...@gmail.com wrote:

 Hi all,

 I'm wondering what is the best way to chain persistent aggregations in
 Trident.

 Let's say I have a running count of words and I also want to do another
 aggregation to calculate the total count of all words using the results of
 the previous aggregation.

 I can use a persistentAggregate to calculate the running total for each
 word. However I can't use another persistentAggregate directly chained to
 that as that would only add the new value instead of decrementing the old
 value for the group first.

 What is the best solution to this scenario? Does storm have a way to
 handle this out of the box?

 Regards,
 Can



Re: Trident Binned Aggregation Design Help

2014-06-18 Thread Adam Lewis
Do you have any insight into how DRPC plays into this?  The groupBy bolt
boundary makes perfect sense and I understand how that maps to some
collection of bolts that would process different groupings (depending on
parallelism).  What stumps me are the cases where adding multiple DRPC
spouts to the topology seems to result in the whole things being duplicated
for each spout.  I can see some extra tracking mechanisms get stood up to
track DRPC requests and then match request with response but still not sure
why that wouldn't scale linearly with # of DRPC spouts.




On Tue, Jun 17, 2014 at 8:05 PM, P. Taylor Goetz ptgo...@gmail.com wrote:

 Not at the moment but I will be adding that functionality (trident state)
 to the storm-hbase project very soon. Currently it only supports MapState.

 -Taylor

 On Jun 17, 2014, at 6:09 PM, Andrew Serff and...@serff.net wrote:

 I'm currently using Redis, but I'm by no means tied to it.  Are there any
 example of using either to do that?

 Andrew


 On Tue, Jun 17, 2014 at 3:51 PM, P. Taylor Goetz ptgo...@gmail.com
 wrote:

 Andrew/Adam,

 Partitioning operations like groupBy() form the bolt boundaries in
 trident topologies, so the more you have the more bolts you will have and
 thus, potentially, more network transfer.

 What backing store are you using for persistence? If you are using
 something with counter support like HBase or Cassandra you could leverage
 that in combination with tridents exactly once semantics to let it handle
 the counting, and potentially greatly reduce the complexity of your
 topology.

 -Taylor

 On Jun 17, 2014, at 5:15 PM, Adam Lewis m...@adamlewis.com wrote:

 I, too, am eagerly awaiting a reply from the list on this topic.  I hit
 up against max topology size limits doing something similar with trident.
  There are definitely linear changes to a trident topology that result in
 quadratic growth of the compiled storm topology size, such as adding DRPC
 spouts.  Sadly the compilation process of trident to plain storm remains
 somewhat opaque to me and I haven't had time to dig deeper.  My work around
 has been to limit myself to one DRPC spout per topology and
 programmatically build multiple topologies for the variations (which
 results in a lot of structural and functional duplication of deployed
 topologies, but at least not code duplication).

 Trident presents a seemingly nice abstraction, but from my point of view
 it is a leaky one if I need to understand the compilation process to know
 why adding a single DRPC spout double the topology size.



 On Tue, Jun 17, 2014 at 4:33 PM, Andrew Serff and...@serff.net wrote:

 Is there no one out there that can help with this?  If I use this
 paradigm, my topology ends up having like 170 bolts.  Then I add DRPC
 stream and I have like 50 spouts.  All of this adds up to a topology that I
 can't even submit because it's too large (and i've bumped the trident max
 to 50mb already...).  It seems like I'm thinking about this wrong, but I
 haven't be able to come up with another way to do it.  I don't really see
 how using vanilla Storm would help, maybe someone can offer some guidance?

 Thanks
 Andrew


 On Mon, Jun 9, 2014 at 11:45 AM, Andrew Serff and...@serff.net wrote:

 Hello,

 I'm new to using Trident and had a few questions about the best way to
 do things in this framework.  I'm trying to build a real-time streaming
 aggregation system and Trident seems to have a very easy framework to allow
 me to do that.  I have a basic setup working, but as I am adding more
 counters, the performance becomes very slow and eventually I start having
 many failures.  At the basic level here is what I want to do:

 Have an incoming stream that is using a KafkaSpout to read data from.
 I take the Kafka stream, parse it and output multiple fields.
 I then want many different counters for those fields in the data.

 For example, say it was the twitter stream.  I may want to count:
 - A counter for each username I come across.  So how many times I have
 received a tweet from each user
 - A counter for each hashtag so you know how many tweets mention a
 hashtag
 - Binned counters based on date for each tweet (i.e. how many tweets in
 2014, June 2014, June 08 2014, etc).

 The list could continue, but this can add up to hundreds of counters
 running in real time.  Right now I have something like the following:

 TridentTopology topology = new TridentTopology();
 KafkaSpout spout = new KafkaSpout(kafkaConfig);
 Stream stream = topology.newStream(messages, spout).shuffle()
 .each(new Fields(str), new FieldEmitter(), new
 Fields(username, hashtag));

 stream.groupBy(new Fields(username))
 .persistentAggregate(stateFactory, new
 Fields(username), new Count(), new Fields(count))
 .parallelismHint(6);
 stream.groupBy(new Fields(hashtag))
 .persistentAggregate(stateFactory, new
 Fields(hashtag), new Count(), new Fields(count

Re: Trident Binned Aggregation Design Help

2014-06-17 Thread Adam Lewis
I, too, am eagerly awaiting a reply from the list on this topic.  I hit up
against max topology size limits doing something similar with trident.
 There are definitely linear changes to a trident topology that result in
quadratic growth of the compiled storm topology size, such as adding DRPC
spouts.  Sadly the compilation process of trident to plain storm remains
somewhat opaque to me and I haven't had time to dig deeper.  My work around
has been to limit myself to one DRPC spout per topology and
programmatically build multiple topologies for the variations (which
results in a lot of structural and functional duplication of deployed
topologies, but at least not code duplication).

Trident presents a seemingly nice abstraction, but from my point of view it
is a leaky one if I need to understand the compilation process to know why
adding a single DRPC spout double the topology size.



On Tue, Jun 17, 2014 at 4:33 PM, Andrew Serff and...@serff.net wrote:

 Is there no one out there that can help with this?  If I use this
 paradigm, my topology ends up having like 170 bolts.  Then I add DRPC
 stream and I have like 50 spouts.  All of this adds up to a topology that I
 can't even submit because it's too large (and i've bumped the trident max
 to 50mb already...).  It seems like I'm thinking about this wrong, but I
 haven't be able to come up with another way to do it.  I don't really see
 how using vanilla Storm would help, maybe someone can offer some guidance?

 Thanks
 Andrew


 On Mon, Jun 9, 2014 at 11:45 AM, Andrew Serff and...@serff.net wrote:

 Hello,

 I'm new to using Trident and had a few questions about the best way to do
 things in this framework.  I'm trying to build a real-time streaming
 aggregation system and Trident seems to have a very easy framework to allow
 me to do that.  I have a basic setup working, but as I am adding more
 counters, the performance becomes very slow and eventually I start having
 many failures.  At the basic level here is what I want to do:

 Have an incoming stream that is using a KafkaSpout to read data from.
 I take the Kafka stream, parse it and output multiple fields.
 I then want many different counters for those fields in the data.

 For example, say it was the twitter stream.  I may want to count:
 - A counter for each username I come across.  So how many times I have
 received a tweet from each user
 - A counter for each hashtag so you know how many tweets mention a hashtag
 - Binned counters based on date for each tweet (i.e. how many tweets in
 2014, June 2014, June 08 2014, etc).

 The list could continue, but this can add up to hundreds of counters
 running in real time.  Right now I have something like the following:

 TridentTopology topology = new TridentTopology();
 KafkaSpout spout = new KafkaSpout(kafkaConfig);
 Stream stream = topology.newStream(messages, spout).shuffle()
 .each(new Fields(str), new FieldEmitter(), new
 Fields(username, hashtag));

 stream.groupBy(new Fields(username))
 .persistentAggregate(stateFactory, new
 Fields(username), new Count(), new Fields(count))
 .parallelismHint(6);
 stream.groupBy(new Fields(hashtag))
 .persistentAggregate(stateFactory, new Fields(hashtag),
 new Count(), new Fields(count))
 .parallelismHint(6);

 Repeat something similar for everything you want to have a unique count
 for.

 I end up having hundreds of GroupBys each that has an aggregator for
 each.  I have so far only run this on my local machine and not on a cluster
 yet, but I'm wondering if this is the correct design for something like
 this or if there is a better way to distribute this within Trident to make
 it more efficient.

 Any suggestions would be appreciated!
 Thanks!
 Andrew





Re: Trident Stream Join with Self

2014-04-25 Thread Adam Lewis
Since you are doing a self join you don't need to actually use trident
join, or the multireducer on which it is based.  You could group the stream
on your join key, then write an aggregator which collects all the tuples in
each group and emits the cross product at the end of each batch (or in a
streaming fashion where each incremental tuple emits the cross of that
tuple with all the tuples already received); and then finally implement a
filter function downstream of each aggregates output.  Your aggregator will
have to take care of the * in your SQL example since typically
aggregators only keep the join key plus the aggregated value.



On Thu, Apr 24, 2014 at 5:46 PM, Charles LeDoux
charles.a.led...@gmail.comwrote:

 Is it possible to join a trident stream with itself?

 My particular use case is that I want to take the cross product of all the
 incoming tuples for a batch and then only keep the joined tuples containing
 a known value.

 I believe the SQL for what I am trying to accomplish is:

 SELECT * FROM table AS t1 JOIN table AS t2 ON field1 WHERE t1.field2 =
 known value;

 My intention was to do a self join on my stream and then run the now
 joined stream through a filter.

 Thanks,
 Charles

 --
 PhD Candidate; University Fellow
 University of Louisiana at Lafayette
 Center for Advanced Computer Studies
 http://charlesledoux.com




Re: Topology submission exception caused by Class Not Found backtype.storm.daemon.nimbus$normalize_conf$get_merged_conf_val__3916$fn__3917

2014-04-21 Thread Adam Lewis
Are there other things that could cause this error?  Since upgrading to
0.9.1-incubating, I've hit it twice.  The first time I resolved it (in one
project) by fixing an issue where two slf4j bindings were on the classpath
together (strange, but it worked)...now I'm hitting the problem again in a
different project and can't figure out what is causing the problem.  This
is for a test which is submitting a topology to a LocalCluster; the full
trace follows (happens launching JUnit from Eclipse and from Maven command
line)

java.lang.RuntimeException: java.lang.ClassNotFoundException:
backtype.storm.daemon.nimbus$normalize_conf$get_merged_conf_val__3916$fn__3917
 at backtype.storm.utils.Utils.deserialize(Utils.java:88)
 at backtype.storm.daemon.nimbus$read_storm_conf.invoke(nimbus.clj:89)
at backtype.storm.daemon.nimbus$start_storm.invoke(nimbus.clj:724)
 at
backtype.storm.daemon.nimbus$eval3974$exec_fn__1459__auto__$reify__3987.submitTopologyWithOpts(nimbus.clj:962)
 at
backtype.storm.daemon.nimbus$eval3974$exec_fn__1459__auto__$reify__3987.submitTopology(nimbus.clj:971)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
 at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28)
 at backtype.storm.testing$submit_local_topology.invoke(testing.clj:253)
at backtype.storm.LocalCluster$_submitTopology.invoke(LocalCluster.clj:34)
 at backtype.storm.LocalCluster.submitTopology(Unknown Source)
 at
com.acuitysds.trident.TestTimeseriesAssembly.testBasicTopology(TestTimeseriesAssembly.java:108)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
 at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
 at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
 at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
 at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
 at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
 at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
 at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
 at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at
org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
 at
org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
 at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
 at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
 at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
 at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
Caused by: java.lang.ClassNotFoundException:
backtype.storm.daemon.nimbus$normalize_conf$get_merged_conf_val__3916$fn__3917
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
 at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:623)
 at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1610)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515)
 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1769)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913)
 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
 at 

Re: Topology submission exception caused by Class Not Found backtype.storm.daemon.nimbus$normalize_conf$get_merged_conf_val__3916$fn__3917

2014-04-21 Thread Adam Lewis
The tests are in the test tree (which I don't think eclipse cares about,
but should get treated properly on the command line).  The scope wasn't
provided, it was defaulted to runtime...and I've now corrected that and it
doesn't help.

Strangely, if I explicitly exclude logback-classic within the storm
dependency, it does work as long as I also add a dependency on
slf4j-simple...so I am still seeing weird logger induced classpath
issues...so this is what is working for me:

dependency
  groupIdorg.apache.storm/groupId
  artifactIdstorm-core/artifactId
  exclusions
  exclusion
  artifactIdlogback-classic/artifactId
  groupIdch.qos.logback/groupId
  /exclusion
  /exclusions
/dependency
dependency
 groupIdorg.slf4j/groupId
artifactIdslf4j-simple/artifactId
/dependency

WEIRD!



On Mon, Apr 21, 2014 at 8:54 PM, Jon Logan jmlo...@buffalo.edu wrote:

 Are your maven scopes right? The scope of the Storm dependency should be
 provided -- not runtime. Also be sure that your main method / unit test is
 under your test/ classpath, not your main/ classpath.


 On Mon, Apr 21, 2014 at 8:49 PM, Adam Lewis m...@adamlewis.com wrote:

 Are there other things that could cause this error?  Since upgrading to
 0.9.1-incubating, I've hit it twice.  The first time I resolved it (in one
 project) by fixing an issue where two slf4j bindings were on the classpath
 together (strange, but it worked)...now I'm hitting the problem again in a
 different project and can't figure out what is causing the problem.  This
 is for a test which is submitting a topology to a LocalCluster; the full
 trace follows (happens launching JUnit from Eclipse and from Maven command
 line)

 java.lang.RuntimeException: java.lang.ClassNotFoundException:
 backtype.storm.daemon.nimbus$normalize_conf$get_merged_conf_val__3916$fn__3917
  at backtype.storm.utils.Utils.deserialize(Utils.java:88)
  at backtype.storm.daemon.nimbus$read_storm_conf.invoke(nimbus.clj:89)
 at backtype.storm.daemon.nimbus$start_storm.invoke(nimbus.clj:724)
  at
 backtype.storm.daemon.nimbus$eval3974$exec_fn__1459__auto__$reify__3987.submitTopologyWithOpts(nimbus.clj:962)
  at
 backtype.storm.daemon.nimbus$eval3974$exec_fn__1459__auto__$reify__3987.submitTopology(nimbus.clj:971)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:606)
 at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
  at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28)
  at backtype.storm.testing$submit_local_topology.invoke(testing.clj:253)
 at backtype.storm.LocalCluster$_submitTopology.invoke(LocalCluster.clj:34)
  at backtype.storm.LocalCluster.submitTopology(Unknown Source)
  at
 com.acuitysds.trident.TestTimeseriesAssembly.testBasicTopology(TestTimeseriesAssembly.java:108)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
  at
 org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
  at
 org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
  at
 org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
  at
 org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
  at
 org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
 at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
  at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
  at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
 at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
  at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
  at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
 at
 org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
  at
 org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
  at
 org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
  at
 org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
  at
 org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
  at
 org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
 Caused by: java.lang.ClassNotFoundException:
 backtype.storm.daemon.nimbus$normalize_conf$get_merged_conf_val__3916$fn__3917
 at java.net.URLClassLoader$1

Re: Local Cluster serialization

2014-03-31 Thread Adam Lewis
From my observations, serialization is going to occur only if you have
parallelism in your LocalCluster (more than one worker thread).  I've also
found that when going from LocalCluster to the real thing, there will
always be a couple unexpected twists and some tuning to do.

Also, make sure to turn off java serialization fallback (
Config#setFallBackOnJavaSerialization(false)) to make sure Kryo is working
properly for all your types.


On Sat, Mar 29, 2014 at 5:24 PM, János Háber janos.ha...@finesolution.huwrote:

 Hi,

 I have a little question. In Local Cluster mode the serialization enabled?
 Example if I emit a Json4s JValue type it's serializable by kyro? How can I
 check without release to normal cluster?

 Thanks

 b0c1



Re: DI with Storm

2014-03-27 Thread Adam Lewis
Yes that is exactly right, the submission to Nimbus is in the form of a big
thrift message describing the topology...this message includes java
serialized blobs of your topology components (spouts/bolts). They get
instantiated within the VM calling StormSubmitter.  Typically you would
pass configuration info to the constructor, but dependencies (e.g. DB
connection pool, etc) are transient fields.  Then in the prepare method
(called after deserialization on the worker) you use the serialized
configuration fields to initialize the transient ones.  Of course Guice
fits naturally into that step.


On Thu, Mar 27, 2014 at 12:37 AM, Software Dev static.void@gmail.comwrote:

 Ok so you would configure the map in the main method before submitting
 the topology. Then this conf can be used to create guice injectors. Is
 that correct?

 In the book Getting Started with Storm it states:

 To customize a bolt, you should set parameters in its constructor and
 save them as instance variables so they will be serialized when
 submitting the bolt to the cluster.

 Does this mean bolts are instantiated on the client side before being
 submitted to nimbus/cluster?

 On Wed, Mar 26, 2014 at 2:05 PM, Svend Vanderveken
 svend.vanderve...@gmail.com wrote:
 
  The storm configuration map is part of the arguments received by each
  prepare() method, in most Storm primitives, on each worker. It's
 serialised
  to each worker when a topology instance is started there. The initial
 storm
  configuration map is provided at deploy time to Nimbus, in the class
  containing the main() method, specified in the storm jar blabla.jar
  some.class.here command.
 
 
 
 
  On Wed, Mar 26, 2014 at 4:42 PM, Software Dev static.void@gmail.com
 
  wrote:
 
  How does one get the configuration map to each worker?
 
  On Wed, Mar 26, 2014 at 6:41 AM, Adam Lewis m...@adamlewis.com wrote:
   Or, since this is only being called from prepare at startup anyway,
   simpler:
  
   public class InjectorProvider {
  
   private static Injector injector;
   public static synchronized Injector get(Map conf) {
   if (injector == null) {
   injector = Guice.createInjector(
   new DAOModule(conf),
   new S3Module(conf));
   }
  
   return injector;
   }
   }
  
  
  
  
  
   On Wed, Mar 26, 2014 at 9:26 AM, Svend Vanderveken
   svend.vanderve...@gmail.com wrote:
  
private static Injector injector;
  
   or better:
  
   private static volatile Injector injector;
  
  
   see
  
 http://www.cs.umd.edu/~pugh/java/memoryModel/DoubleCheckedLocking.html
  
  
   On Tue, Mar 25, 2014 at 9:55 PM, Patricio Echagüe 
 patric...@gmail.com
   wrote:
  
   It's fine. You can synchronize with a static monitor the creation on
   the
   injector per worker. That's how I do it.
  
   public class InjectorProvider {
  
   private static Injector injector;
  
   public static Injector get() {
   if (injector == null) {
   synchronized (InjectorProvider.class) {
   if (injector == null) {
   injector = Guice.createInjector(
   new DAOModule(),
   new S3Module();
   }
   }
   }
  
   return injector;
   }
  
  
   On Tue, Mar 25, 2014 at 6:24 PM, Adam Lewis m...@adamlewis.com
   wrote:
  
  
   Doesn't Storm 0.9 have a prepare for the worker?
  
  
   No, I don't think it does, but please point this out if I'm
 mistaken.
   I
   found the right JIRA issue though:
   https://issues.apache.org/jira/browse/STORM-126
  
   Seems like the patch was well along but hasn't seen any recent
   activity.
  
  
  
  
 
 



Re: date time in tuple

2014-03-27 Thread Adam Lewis

 Also it might be worth reading:
 https://github.com/nathanmarz/storm/wiki/Serialization


After which you'll seek out this library:
https://github.com/magro/kryo-serializers


On Thu, Mar 27, 2014 at 11:09 AM, Dan Guja dang...@gmail.com wrote:

 Also it might be worth reading:
 https://github.com/nathanmarz/storm/wiki/Serialization


 On Thu, Mar 27, 2014 at 9:01 AM, Dan Guja dang...@gmail.com wrote:

 Try this:
 (DateTime)tuple.getValueByField(myDateTimeFieldName);


 On Thu, Mar 27, 2014 at 8:50 AM, michael campbell 
 michael.campb...@dsl.pipex.com wrote:


 How do you put a datetime, let's say a jodatime datetime value, in a
 tuple?

 How do you get a datetime out of a tuple, what sort of method
 corresponds to tuple.getLongByField for a datetime?

 Michael Campbell

 --







Re: DI with Storm

2014-03-25 Thread Adam Lewis
As I recall there was an open issue for adding all the needed lifecycle
hooks for integrating cleanly with something like Guice (worker lifecycle
callbacks?) but I can't find it now...maybe STORM-172
.

In any case,
I think there is an issue with not having quite enough
exposed
from storm to properly integrate things like Guice.
Having said that,
I'm using
Guice now, but it is basically a hack until I can find a better way.

The key to my hack is that injectors are not serializable, but modules are.
 I use the singleton pattern with static accessor to hold the injector
which is created lazily (i.e. after topology components have been
deserialized and are running in the worker VM).  I instantiate my guice
module during topology build and it gets serialized as part of the topo
definition.  In my storm components, I pass the module to the singleton
injector factory which creates the injector from the passed module if one
doesn't exist in the VM, and just returns the existing injector otherwise.
 Finally, with an injector in hand for the prepare method, I can then
either do a injector.getInstance() to create objects (e.g. inside a trident
state factory) or injector.injectMembers(this) in a prepare method if my
topology component has transient @Inject fields for dependencies (since
storm controls the object lifecycle and I can't use constructor injection
there)

The biggest problem with this hack is it assumes that all module instances
are interchangeable (and identical) and the first component to init gets to
build the injector.  With the right lifecycle hooks, the injector could be
created in a cleaner way.


On Tue, Mar 25, 2014 at 12:45 PM, Software Dev static.void@gmail.com
wrote:

 How would you go about use DI (Google Guice) with a Storm cluster? The
 worker nodes themselves will not have access to the Injector instance
 after job submission.

 Thanks


Re: serialization exception when using multiple worker/supervisors

2014-03-25 Thread Adam Lewis
You can register non-visible classes in code, but it is a pain.  e.g. I had
the issue with a Guava type which happened to by java serializable but no
easy way to implement a kryo serializer (and I don't like auto fall-back on
java serialization within storm since I prefer to know when things aren't
getting handled by kryo)

List serializers = (List) stormConf.get(Config.TOPOLOGY_KRYO_REGISTER);

serializers.add(Collections.singletonMap(
com.google.common.collect.RegularImmutableList, SerializableSerializer.
class.getName()));




On Tue, Mar 25, 2014 at 11:56 AM, Vinay Pothnis vinay.poth...@gmail.comwrote:

 Naresh,

 Cannot do that in code because the I cannot reference the private class to
 register.

 Samit,

 This is actually being used indirectly. I use 'storm-rabbitmq' library (
 https://github.com/ppat/storm-rabbitmq) and that internally initializes
 the rabbit mq ConnectionFactory. As part of that initialization, the
 default client properties are added and that is where the
 ByteArrayLongString class is being referenced.


 http://grepcode.com/file/repo1.maven.org/maven2/com.rabbitmq/amqp-client/2.5.1/com/rabbitmq/client/impl/AMQConnection.java#AMQConnection.defaultClientProperties%28%29

 So, kinda stuck here.

 Thanks
 Vinay


 On Tue, Mar 25, 2014 at 6:18 AM, Naresh naresh.bha...@gmail.com wrote:

 Hey Vinay,
 Did you specify the kryo serialization parameter in all the yaml files on
 different supervisors? An easier way is to specify this in the code as
  Srinath had suggested.
 Regards
 Naresh

 On Mar 25, 2014, at 12:12 AM, Samit Sasan sasansa...@gmail.com wrote:

 Hey Vinay,

 Sorry to hear that ... before we dwell into workarounds can you describe
 what object exactly are you passing in the tuple and can you extract your
 needed info into ur custom DS object and use that instead.

 -Samit
 On Tue, Mar 25, 2014 at 2:38 AM, Vinay Pothnis 
 vinay.poth...@gmail.comwrote:

 com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString







Re: DI with Storm

2014-03-25 Thread Adam Lewis
Yeah, that's basically what I do.  My point about being hackish is that if
your module classes need configuration, you need to have that same
configuration available from every call site of InjectorProvider#get()


On Tue, Mar 25, 2014 at 9:55 PM, Patricio Echagüe patric...@gmail.comwrote:

 It's fine. You can synchronize with a static monitor the creation on the
 injector per worker. That's how I do it.

 public class InjectorProvider {

 private static Injector injector;

 public static Injector get() {
 if (injector == null) {
 synchronized (InjectorProvider.class) {
  if (injector == null) {
 injector = Guice.createInjector(
 new DAOModule(),
  new S3Module();
 }
 }
 }

 return injector;
 }


 On Tue, Mar 25, 2014 at 6:24 PM, Adam Lewis m...@adamlewis.com wrote:


 Doesn't Storm 0.9 have a prepare for the worker?


 No, I don't think it does, but please point this out if I'm mistaken.  I
 found the right JIRA issue though:
 https://issues.apache.org/jira/browse/STORM-126

 Seems like the patch was well along but hasn't seen any recent activity.





Re: Storm/Trident as a distributed query engine?

2014-03-21 Thread Adam Lewis
When evaluating storm, definitely take a closer look at the DRPC mechanism
in Trident for your use case.  To my knowledge there is no current support
for data locality like you describe with Cassandra, although there was a
discussion on the mailing list a couple of months ago around someone
looking to do a school project and one of the popular suggestions was to
implement a state-location-aware partitioning.

As far as other projects, have you taken a look at druid (http://druid.io/)?
 It would represent an alternative to Cassandra in your current setup but
is more suited to the types of multi-dimensional querying and aggregates
you describe and can ingest sensor data in batch or realtime.


On Fri, Mar 21, 2014 at 10:21 AM, Simon Chemouil schemo...@gmail.comwrote:

 Hi,

 I am very new to Storm and trying to evaluate whether it fits my needs
 or not. I work on a project where we compute reasonably simple queries
 (sum, average, mean, percentile...) on large amount of very simple
 structured data (counters of many sensors with a value every 5 minutes).
 We are currently reaching the limit of our architecture (still MySQL
 based) and moving to Cassandra for our data store. We want to also
 parallelize the queries to run on a cluster to be able to answer the
 queries as fast as possible.

 While Cassandra seems to be a good fit for our needs of data storage
 (quick access, good write performance, fault-tolerant, ...), we're still
 looking for a component which could help us distribute our queries over
 a cluster. I've been looking at Storm/Trident and running some
 tests/examples for the last few days, and while I do believe we could
 make it happen, I would like to have the opinion of an experienced
 Storm user/dev to know if it truly makes sense for our problem, since we
 don't really have a continuous stream of data.

 First, in the short-term, we want to run simple queries over the
 Cassandra store. I envision things this way:
 query -- [ QUEUE ] -- [ distribute/process queries ] -- answer/output

 Queries are a discrete events, we don't want to keep state between them.

 We have some very simple queries and some more complex that require
 going through a lot of data (tens of millions of 'cells'), so we want to
 be able to *cut down* big queries in smaller pieces (most probably
 divide them by time range) both to reply faster and to prevent big
 queries from taking all resources.

 We would like to send the results of the query straight into another
 Cassandra CF and to an endpoint in our system.

 Finally, because of some non-technical business requirements (i.e, our
 clients' IT team reluctance to give us more servers ;))  we will have to
 host the 'workers' on the same servers as Cassandra nodes. I thought it
 could make sense to use Cassandra's token aware policy to always try to
 make workers fetch data locally. This would allow us to piggyback on
 Cassandra's load balancing since we use random partitioning that
 normally evenly distributes the rows across our cluster, and a row is
 small enough to compute on without breaking the task down further. Is it
 possible with Storm/Trident to direct the way computations are
 distributed (i.e, to which worker 'tasks' are sent) or is it going
 against its design?

 All-in-all, how good a fit is Storm for this use case? What about
 Trident? If the project isn't a good fit, do you know of other
 open-source projects that address this need? The current alternative I
 envision is designing a homebrew solution using Akka. Any opinion is
 greatly appreciated!

 Thanks a lot for your help!

 Simon Chemouil



Re: User Interface

2014-03-20 Thread Adam Lewis
I really like this; I've been wondering about the easiest way to visualize
the storm topology graph; especially since I'm a heavy trident user so my
topologies are generated...with really long names for the bolts.  What
you're showing here solves that and adds operational metrics to boot, which
is great.  Have you considered building the backend on top of the existing
storm UI app?  You might be able to leverage existing code to talk to
nimbus.  It would be awesome if storm provided something like what you've
demoed out of the box on the same server infrastructure.


On Thu, Mar 20, 2014 at 8:28 AM, Middleton, Jonathan 
jonathan.middle...@citi.com wrote:

  Very pretty. Hope you carry this forward.



 You might consider sampling from Nimbus using Thrift.



 *From:* Klausen Schaefersinho [mailto:klaus.schaef...@gmail.com]
 *Sent:* Thursday, March 20, 2014 8:21 AM
 *To:* user@storm.incubator.apache.org
 *Subject:* Re: User Interface



 Hi,



 it is just a prototype yet with no backend... So i still have to decide
 what backend to use...



 Cheers,



 Klaus



 On Thu, Mar 20, 2014 at 1:15 PM, Jean-Sebastien Vachon 
 jean-sebastien.vac...@wantedanalytics.com wrote:

 Looks very promising. I like the fact that you see the flow of tuples
 between elements.



 Any idea when you will release this so that we can try  it out with our
 topology?



 *From:* Klausen Schaefersinho [mailto:klaus.schaef...@gmail.com]
 *Sent:* March-18-14 4:41 PM
 *To:* user@storm.incubator.apache.org
 *Subject:* User Interface



 Hi,



 I have been prototyping an alternative UI for monitoring a storm cluster.
 The main driver was, that I would like to have a more condensed view of
 what is going on in my cluster and that I would also like to monitor some
 use case specific metrics, e.g. the training accuracy of a classifier or
 so. I have put a dummy online:



 http://vommond.de/streamui/streamui.html



 http://vommond.de/streamui/streamui-dark.html



 The width and the color of the lines between the boxes correlate to the
 number of events flowing between the spouts and bolts. The widgets in the
 boxes show some other (dummy) metrics that one might of instance monitor.
 You can customize the UI quite a lot, e.g. hide widgets or change their
 type (but all changes are lost in the dummy).



 Before I move on, I have some questions:



 1) For now I have not integrated the UI with any backend. So before I
 start, I would be happy if

 anybody could give me some advice what kind of monitoring backend to use.
 Ideally the backend would be:



 - Easy to integrate with storm and

 - Could also read the normal storm metrics.

 - Have custom metrics variables to monitor.



 Also I do not have too much time, so any kind of solution that has a REST
 service and would not require any server side hacking would be a big plus.



 2) I saw that quite some people are using Ganglia or Nagius, but I have
 nether used it. Can any body share their experience?





 Off course all comments with regards to the UI are welcome.



 I am planing to release the source code in some days (after some cleaning
 up), so any body is welcome to pick it up.





 Cheers,



 Klaus




--

 Aucun virus trouvé dans ce message.
 Analyse effectuée par AVG - www.avg.fr
 Version: 2014.0.4336 / Base de données virale: 3722/7203 - Date: 16/03/2014





Re: Nimbus Fails After Uploading Topology - Reading Too Large of Frame Size

2014-03-20 Thread Adam Lewis
Robert--

I just saw this thread now.  I had the same issue recently myself.  Taylor
posted the full conversation we had back to the list, but you may not have
noticed it since it was several messages squashed into one.

In any case, I think you can fix this by increasing the max allowable
buffer on Nimbus in storm.yaml using the config option
nimbus.thrift.max_buffer_size (it is specified in bytes).  The default is
something like 1MB, I upped mine to 32MB to accommodate a 6MB topology.  It
looks like your topology is only about 2MB so you might not need to go that
extreme.  I think the only downside of increasing this is that you lose the
protection it was designed to provide: namely a malicious/buggy client
connecting to Nimbus can exhaust Nimbus memory by sending really big thrift
messages.



On Thu, Mar 13, 2014 at 11:44 AM, Robert Lee lee.robert...@gmail.comwrote:

 Downgrading to storm-0.9.0.1 remedied the issue for me. I no longer
 receive any exceptions. I'm fairly certain nothing is listening to the
 thrift port besides the storm jar command I issue. The only thing I can
 think of is I am connected to the nimbus node to view the logs in real
 time. When I have finished other things on my plate, I'll revisit this
 issue to try to find a cause.


 On Wed, Mar 12, 2014 at 2:56 PM, P. Taylor Goetz ptgo...@gmail.comwrote:

 Hi Robert, let me know if you experience the same issue with 0.9.0.1. One
 thing that caught my eye was this:

 2014-03-11 22:38:32 o.a.t.s.TNonblockingServer [ERROR] Read a frame size
 of 2064605, which is bigger than the maximum allowable buffer size for ALL
 connections.

 In 0.9.1 that can indicate that something other than the thrift client
 (like ssh, telnet, or a security scanner) is accessing nimbus' thrift port.
 More details here (https://github.com/apache/incubator-storm/pull/3).

 The fact that you can run a different topology class from the same jar
 file suggests that it is not a problem with the upload process.

 Is it possible that something in your topology (like a cassandra client,
 etc.) is misconfigured to point to nimbus' host and port?

 - Taylor

 On Mar 11, 2014, at 7:48 PM, Robert Lee lee.robert...@gmail.com wrote:

 I will downgrade to storm-0.9.0.1 and see if the error persists in that
 version as well.


 On Tue, Mar 11, 2014 at 7:47 PM, Robert Lee lee.robert...@gmail.comwrote:

 Yes -- more details:

 Storm version: 0.9.1-incubating installed using a variant of your
 storm-vagrant deployment (https://github.com/ptgoetz/storm-vagrant).

 Cluster setup: two supervisor nodes with 1024m, nimbus with 1024m,
 zookeeper (3.3.5) 512mb node, and a kafka (0.8.0) 512mb node. Persisting to
 a local cassandra cluster.

 Here's an example topology I'm running. This topology works both in
 local and distributed mode. A variant of this topology (more persisting and
 more complicated functions on the kafka stream) works in local mode but
 gives the thrift error reported above when submitting.

 public class SentenceAggregationTopology {

 private final BrokerHosts brokerHosts;

 public SentenceAggregationTopology(String kafkaZookeeper) {
 brokerHosts = new ZkHosts(kafkaZookeeper);
 }

 public StormTopology buildTopology() {
 return buildTopology(null);
 }

 public StormTopology buildTopology(LocalDRPC drpc) {
 TridentKafkaConfig kafkaConfig = new
 TridentKafkaConfig(brokerHosts, storm-sentence, storm);
 kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
 TransactionalTridentKafkaSpout kafkaSpout = new
 TransactionalTridentKafkaSpout(kafkaConfig);
 KafkaSentenceMapper mapper = new KafkaSentenceMapper(playlist,
 testtable, word, count);
 TridentTopology topology = new TridentTopology();

 TridentState wordCounts = topology.newStream(kafka,
 kafkaSpout).shuffle().
 each(new Fields(str), new WordSplit(), new
 Fields(word)).
 groupBy(new Fields(word)).
 persistentAggregate(
 CassandraBackingMap.nonTransactional(mapper),
 new Count(), new Fields(aggregates_words))
 .parallelismHint(2);


 topology.newDRPCStream(words, drpc)
 .each(new Fields(args), new Split(), new
 Fields(word))
 .groupBy(new Fields(word))
 .stateQuery(wordCounts, new Fields(word), new
 MapGet(), new Fields(count))
 .each(new Fields(count), new FilterNull())
 .aggregate(new Fields(count), new Sum(), new
 Fields(sum));

 return topology.build();
 }

 public static void main(String[] args) throws Exception {
 final int TIME_INTERVAL_IN_MILLIS = 1000;

 String kafkaZk = args[0];
 SentenceAggregationTopology sentenceAggregationTopology = new
 SentenceAggregationTopology(kafkaZk);

 Config config = new Config();
 config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS,
 

nimbus.thrift.max_buffer_size

2014-03-18 Thread Adam Lewis
Upon upgrading from 0.9.0.1 to 0.9.1-incubating, I'm exceeding the new
thrift max buffer size (nicely logged on the server side, although the
client just gets a broken pipe stack trace form thrift) with an approx 6 MB
message(!).  Increasing the configured limit solves the problem, but I
would have thought the 1MB default should be enough.

Does the storm submitter encode the entire topology as a single thrift
message?  I'm really surprised that the message is coming out so large, my
topology isn't exactly small, but it only has about 20 bolts...does anyone
have any suggestions on how to determine why the message is so large?  Is
this within the realm of what others have seen or am I doing something
wrong?

Thanks,
Adam


Re: nimbus.thrift.max_buffer_size

2014-03-18 Thread Adam Lewis
It isn't the jar file, but something about the topology itself; I have a
submitter program that submits four topologies all from the same jar.  Upon
submitting the first topology, the jar is uploaded and topology starts,
then the submitter submits two more topologies whilst reusing the
uploaded jar.  The broken pipe occurs when trying to submit the fourth
(large) topology.  That is why I was assuming the large message was
actually the encoded topology itself.  This is reproducible and the errors
are as follows:

nimbus.log:

2014-03-18 18:16:39 o.a.t.s.TNonblockingServer [ERROR] Read a frame size of
 6644632, which is bigger than the maximum allowable buffer size for ALL
 connections.


storm jar console:

 2321 [main] INFO  backtype.storm.StormSubmitter - Uploading topology jar
 /Users/adam/git/impl/impl-storm/target/impl-storm-0.0.1-SNAPSHOT.jar to
 assigned location:
 /mnt/storm/nimbus/inbox/stormjar-04acf27a-a6d1-4a9e-9231-9f4f5f30fd03.jar

 97762 [main] INFO  backtype.storm.StormSubmitter - Successfully uploaded
 topology jar to assigned location:
 /mnt/storm/nimbus/inbox/stormjar-04acf27a-a6d1-4a9e-9231-9f4f5f30fd03.jar

 97762 [main] INFO  backtype.storm.StormSubmitter - Submitting topology
 global__topo_forecastRuntime in distributed mode with conf
 {topology.fall.back.on.java.serialization:false,topology.workers:2,drpc.servers:[10.118.57.229],topology.debug:false,topology.kryo.register:[{org.joda.time.DateTime:de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer},{org.joda.time.Interval:de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer},com.mycompany.data.Simulated,com.mycompany.data.SomeClass1,com.mycompany.ml.SomeClass2,com.mycompany.model.SomeClass3,com.mycompany.model.SomeClass4,{com.mycompany.ml.SomeClass4:com.esotericsoftware.kryo.serializers.DefaultSerializers$EnumSerializer},{java.math.BigDecimal:com.esotericsoftware.kryo.serializers.DefaultSerializers$BigDecimalSerializer},{java.sql.Date:de.javakaffee.kryoserializers.DateSerializer},{com.tdunning.math.stats.TDigest:com.mycompany.trident.tdigest.TDigestSerializer},{java.lang.Class:com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer},{java.util.UUID:de.javakaffee.kryoserializers.UUIDSerializer},{com.google.common.collect.RegularImmutableList:backtype.storm.serialization.SerializableSerializer}],topology.max.spout.pending:16,topology.message.timeout.secs:900,drpc.request.timeout.secs:45}

 java.lang.RuntimeException:
 org.apache.thrift7.transport.TTransportException: java.net.SocketException:
 Broken pipe

  at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:112)

  at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:58)

  at
 com.mycompany.runtime.DeployStormTopologies.main(DeployStormTopologies.java:92)

 Caused by: org.apache.thrift7.transport.TTransportException:
 java.net.SocketException: Broken pipe

  at
 org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:147)

  at
 org.apache.thrift7.transport.TFramedTransport.flush(TFramedTransport.java:157)

  at org.apache.thrift7.TServiceClient.sendBase(TServiceClient.java:65)

  at
 backtype.storm.generated.Nimbus$Client.send_submitTopology(Nimbus.java:156)

  at backtype.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:145)

  at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:98)

  ... 2 more

 Caused by: java.net.SocketException: Broken pipe

  at java.net.SocketOutputStream.socketWrite0(Native Method)

  at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)

  at java.net.SocketOutputStream.write(SocketOutputStream.java:153)

  at
 org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:145)

  ... 7 more



On Tue, Mar 18, 2014 at 4:12 PM, P. Taylor Goetz ptgo...@gmail.com wrote:

 It uploads the file in small (1024*5 bytes) chunks.

 Does this happen every time (i.e. reproducible)? What is the size of your
 topology jar?

 Can you post the server side message (I want to see the length it output).

 - Taylor

 On Mar 18, 2014, at 3:40 PM, Adam Lewis superca...@gmail.com wrote:

 Upon upgrading from 0.9.0.1 to 0.9.1-incubating, I'm exceeding the new
 thrift max buffer size (nicely logged on the server side, although the
 client just gets a broken pipe stack trace form thrift) with an approx 6 MB
 message(!).  Increasing the configured limit solves the problem, but I
 would have thought the 1MB default should be enough.

 Does the storm submitter encode the entire topology as a single thrift
 message?  I'm really surprised that the message is coming out so large, my
 topology isn't exactly small, but it only has about 20 bolts...does anyone
 have any suggestions on how to determine why the message is so large?  Is
 this within the realm of what others have seen or am I doing something
 wrong?

 Thanks,
 Adam





Re: Where does system.out gets written?

2014-03-13 Thread Adam Lewis
import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

...

public static Logger LOG = LoggerFactory.getLogger(MyClass.class);
Check out the slf4j API for details


On Thu, Mar 13, 2014 at 9:57 AM, Manthosh Kumar T manth...@gmail.comwrote:

 HI,
Thanks. How should I initialize the logger object?


 On 13 March 2014 19:15, James Xu xumingmi...@gmail.com wrote:

 Just use logger, it will appear in $STORM_HOME/logs/worker-xxx.log

 On 2014年3月13日, at 下午9:26, Manthosh Kumar T manth...@gmail.com wrote:

 Hi All,
   I'm new to storm. My topology runs in Local mode without any
 error. But when I try to submit it to the cluster, it doesn't work. I don't
 know where to check for errors. To debug I had added some
 System.out.println() statements. Where does these get printed when running
 in cluster?. If it's not printed anywhere how can I use storm's logger to
 log into supervisor.log or any where else for that matter?

 --
 Cheers,
 Manthosh Kumar. T





 --
 Cheers,
 Manthosh Kumar. T




Re: Storm Message Size

2014-02-26 Thread Adam Lewis
Hi Klaus,

I've been dealing with similar use cases.  I do a couple of things (which
may not be a final solution, but it is interesting to discuss alternate
approaches): I have passed trained models in the 200MB range through storm,
but I try to avoid it. The model gets dropped into persistence and then
only ID to the model is passed through the topology.  So my training bolt
passes the whole model blob to the persistence bolt and that's it...in the
future I may even remove that step so that the model blob never gets
transferred by storm.  Also, I use separate topologies for training, and
those tend to have timeouts much higher because the train aggregator can
take quite a while.  Traditionally this would probably happen in Hadoop or
some other batch system, but I'm too busy to do the setup and storm is
handling it fine anyway.

I don't have to do any polling because I have model selection running as a
logically different step, i.e. tuple shows up for prediction, run a
selection step which finds the model ID for scoring that tuple, then it
flows on to an actual scoring bolt which retrieves the model based on ID
and applies it to the tuple.  If the creation of a new model leads you to
re-score old tuples, you could use the model write to trigger those
tuples to be replayed from some source of state such that they will pickup
the new model ID and proceed as normal.

Best,

Adam




On Wed, Feb 26, 2014 at 7:54 AM, Klausen Schaefersinho 
klaus.schaef...@gmail.com wrote:

 THX,

 the idea is good, I will keep that in mind. The only drawback is that it
 relies on polling, what I do not like to much in the PredictionBolt. Off
 couse I could also pass S3 or File refernces around in the messages, to
 trigger an update. But for the sake of simplicity I was thinking of keeping
 everything in storm and do not rely if possible on other system.

 Cheers,

 Klaus


 On Wed, Feb 26, 2014 at 12:22 PM, Enno Shioji eshi...@gmail.com wrote:

 I can't comment on how large tuples fare, but about the synchronization,
 would this not make more sense?

 InputSpout - AggregationBolt - PredictionBolt - OutputBolt
  | |
   \/   |
Agg. State|
 /\ |
   |V
TrainingBolt - Model State

 I.e. AggregationBolt writes to AggregationState, which is polled by
 TrainingBolt, which writes to ModelState. ModelState is then polled by
 PredictionBolt.

 This way, you can get rid of the large tuples as well and use instead
 something like S3 for these large states.





 On Wed, Feb 26, 2014 at 11:02 AM, Klausen Schaefersinho 
 klaus.schaef...@gmail.com wrote:

 Hi,

 I have a topology which process events and aggregates them in some form
 and performs some prediction based on a machine learning (ML) model. Every
 x events the one of the bolt involved in the normal processing emit an
 trainModel event, which is routed to a bolt which is just dedicated to
 the training. One the training is done, the new model should be send back
 to the prediction bolt. The topology looks like:


  InputSpout - AggregationBolt - PredictionBolt - OutputBolt
  | /\
   \/   |
TrainingBolt -+


 The model can get quite large ( 100 mb) so I am not sure how this would
 impact the performance of my cluster.  Does anybody has experiences with
 transmitting large messages?

 Also the training might take a while, so the aggregation bolt should not
 trigger the training bolt if he is busy. Is there an established patterns
 how to archive this kind of synchronization? I could have some streams to
 send states, but then I would mix data stream with control stream, what I
 really would like to avoid. An alternative would be use ZooKeeper and
 perform the synchronization there. Lats but not least I could also make
 make the aggregation bolt into a data base and have the training bolt
 periodically wake up and read the data base. Does anybody has experience
 with such a setup?

 Kind Regards,

 Klaus






Re: Unexpected behavior on message resend

2014-02-26 Thread Adam Lewis
In my case it was the state objects created as part of trident aggregation.
 Here is the final message in the thread (i.e. read bottom up):

http://mail-archives.apache.org/mod_mbox/storm-user/201312.mbox/%3CCAAYLz+p4YhF+i3LAkFoyU3nvngZXOusZWXj=0+bynrx0+tg...@mail.gmail.com%3E




On Wed, Feb 26, 2014 at 10:35 AM, Harald Kirsch
harald.kir...@raytion.comwrote:

 Hi Adam,

 ok, good to know. I resolved to create the tuple from scratch in case it
 needs to be resend. I don't where else in-place modification could hurt in
 a linear process. Am I missing something?

 Thanks,
 Harald.


 On 26.02.2014 15:48, Adam Lewis wrote:

 I've already gotten slapped around on the list for doing in place
 modifications, so let me pass it on :)

 Don't modify tuple objects in place.

 You shouldn't rely on serialization happening or not happening for
 correctness.


 On Mon, Feb 24, 2014 at 11:18 AM, Harald Kirsch
 harald.kir...@raytion.com mailto:harald.kir...@raytion.com wrote:

 Hi all,

 my  TOPOLOGY_MESSAGE_TIMEOUT_SECS was slightly to low. I got a fail
 for a tuple and the spout just resend it.

 One bolt normalizes a date in place in a field of the tuple. After
 the spout resend the tuple, I got errors from the date parser
 because the date was already normalized.

 Since I currently have only one node, I know of course what happens.
 The tuple was just the very same object that was already partially
 processed when the timeout hit.

 In a distributed setup I envisage the bolt to be on another machine
 with a serialized copy of the spout's tuple such that changes to the
 tuple are not reflected in the original. Would that be true?

 I reckon from this that all processing in bolts needs to be
 idempotent if I want to be able to replay failed tuples.

 Is that true or am I doing something wrong?

 Harald.


 --
 Harald Kirsch
 Raytion GmbH
 Kaiser-Friedrich-Ring 74
 40547 Duesseldorf
 Fon +49-211-550266-0 tel:%2B49-211-550266-0
 Fax +49-211-550266-19 tel:%2B49-211-550266-19
 http://www.raytion.com



 --
 Harald Kirsch
 Raytion GmbH
 Kaiser-Friedrich-Ring 74
 40547 Duesseldorf
 Fon +49-211-550266-0
 Fax +49-211-550266-19
 http://www.raytion.com



Re: Alternative classpath settings apart from fatjar

2014-02-19 Thread Adam Lewis
Is it possible for you to use maven to build your fat jar?  Check out the
maven shade plugin, which has handling for things like META-INF, and some
special cases (like services manifests) such as merging those from multiple
jar files.

http://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html

The (not recommended) alternative is to place your JARs in the supervisor's
lib directory.


On Wed, Feb 19, 2014 at 9:15 AM, Harald Kirsch harald.kir...@raytion.comwrote:

 I am trying to use tika-parsers-1.4.jar in a self-build fat jar, meaning I
 just unjar all jars into one directory and jar the result into one fat jar.

 The problem is, that this produces an unreliable META-INF, so just before
 running jar, I delete the META-INF directory.

 This seemed to work, except that tika-parsers does not work anymore. No
 error, no exception, no log, it just returns nothing. And I can boil this
 down to the META-INF.

 Now, there might be a way to hand-craft a fat jar that works. However I
 wonder if there is a better way  to add additional jars to a topology?

 Harald.




Re: Alternative classpath settings apart from fatjar

2014-02-19 Thread Adam Lewis
You can put the jars in $STORM_HOME/lib/ on each of the supervisor nodes.

There are a couple of reasons why this is not recommended:
1) you need to be careful not to override any built-in storm libraries
since this would be effectively changing storm and will have unpredictable
results
2) you are creating a management issue for yourself...with just one
topology it is probably fine, but consider that as you add and remove
supervisor nodes to your cluster you need to ensure that the lib directory
stays synced with your dependencies or else you'll get spurious class
loading issues...it is pretty much equivalent to the reasoning behind _not_
putting application dependency jars in your JVM's lib directory. To put it
another way, adding a dependency to your topology goes from being a dev
task to an ops task.




On Wed, Feb 19, 2014 at 9:45 AM, Harald Kirsch harald.kir...@raytion.comwrote:

 Hi Adam,

 thanks for the hint. Maven is not part of our build environment. We are
 using an ant/ivy combination.

 Which one would be the supervisor's lib directory? Why is it not
 recommended to be used? We intend to run only one dedicated topology
 anyway, so interference between those is not an issue.

 Harald


 On 19.02.2014 15:25, Adam Lewis wrote:

 Is it possible for you to use maven to build your fat jar?  Check out
 the maven shade plugin, which has handling for things like META-INF, and
 some special cases (like services manifests) such as merging those from
 multiple jar files.

 http://maven.apache.org/plugins/maven-shade-plugin/examples/resource-
 transformers.html

 The (not recommended) alternative is to place your JARs in the
 supervisor's lib directory.


 On Wed, Feb 19, 2014 at 9:15 AM, Harald Kirsch
 harald.kir...@raytion.com mailto:harald.kir...@raytion.com wrote:

 I am trying to use tika-parsers-1.4.jar in a self-build fat jar,
 meaning I just unjar all jars into one directory and jar the result
 into one fat jar.

 The problem is, that this produces an unreliable META-INF, so just
 before running jar, I delete the META-INF directory.

 This seemed to work, except that tika-parsers does not work anymore.
 No error, no exception, no log, it just returns nothing. And I can
 boil this down to the META-INF.

 Now, there might be a way to hand-craft a fat jar that works.
 However I wonder if there is a better way  to add additional jars to
 a topology?

 Harald.



 --
 Harald Kirsch
 Raytion GmbH
 Kaiser-Friedrich-Ring 74
 40547 Duesseldorf
 Fon +49-211-550266-0
 Fax +49-211-550266-19
 http://www.raytion.com



Re: Streaming DRPC?

2014-02-16 Thread Adam Lewis
It might help to understand your use case a little bit better.  Are the
requests for a finite amount of data that you just happen to want to stream
out, or are they more akin to a subscription for an unbounded amount of
data?  Also, does the request contain a specification of what needs to be
computed?  That is, do you imagine your bolts (or various trident
components) each keeping a list of active requests and modifying their
behaviour (or data source in the case of spouts) as the requests come in?
 You might want to check out the storm-signals project for ideas of how you
could orchestrate all the components to do the right thing in face of your
requests coming in: https://github.com/ptgoetz/storm-signals


On Sat, Feb 15, 2014 at 7:30 PM, Carl Lerche m...@carllerche.com wrote:

 Hey Adam,

 Actually, that's quite a good idea. I'm glad you responded, this is a
 better approach than what I was going to attempt (aka, mega hacks). I
 understand how your approach could be done with non-DRPC trident. The
 one drawback that I can think of would be that the request message
 would need to be part of a batch, so if matches only happen every
 15~20 seconds, it would take a while for the response to start
 arriving.

 Perhaps you could elaborate more on how one could use vanilla storm
 could be used to solve this? Perhaps it could help with the batch
 delay problem.

 Cheers,
 Carl

 On Sat, Feb 15, 2014 at 4:42 AM, Adam Lewis m...@adamlewis.com wrote:
  Hi Carl,
 
  DRPC is inherently synchronous in the way it works so if I understand
 what
  you are trying to do correctly then I suggest you stick to non-DRPC
 trident
  or even vanilla storm.  You can setup some messaging queues to handle the
  input (request) and output (streaming result).  Include a field in the
 input
  tuple that can be used to correlate any downstream results (where that
 ID is
  client generated), then create a client which handles publishing to the
  input queue and subscribing to the output queue (filtering on messages
 which
  have the input correlation id).  You can partition your storm topology
 (and
  the input and output queues) on that correlation ID to achieve some load
  balancing.  Finally, if your output streams are infinite, you need some
  mechanism to stop them...
 
  As a side benefit, you can overcome some limitations of DRPC such as no
  control over serialization and even have multiple trident streams all
  writing to your output queue (whereas DRPC doesn't support that sort of
  branching in the topology).
 
  Adam
 
 
  On Fri, Feb 14, 2014 at 7:06 PM, Carl Lerche m...@carllerche.com wrote:
 
  Hello,
 
  I noticed that the DRPC client only allows a single response (and for
  that response to be JSON encoded). I was hoping to implement some sort
  of DRPC stream where I a constant stream of data based on a given
  query. Also, is there a way to serialize the response using Kryo
  instead of JSON? The specific format of my data is not very JSON
  friendly.
 
  Cheers,
  Carl
 
 



Re: Streaming DRPC?

2014-02-15 Thread Adam Lewis
Hi Carl,

DRPC is inherently synchronous in the way it works so if I understand what
you are trying to do correctly then I suggest you stick to non-DRPC trident
or even vanilla storm.  You can setup some messaging queues to handle the
input (request) and output (streaming result).  Include a field in the
input tuple that can be used to correlate any downstream results (where
that ID is client generated), then create a client which handles publishing
to the input queue and subscribing to the output queue (filtering on
messages which have the input correlation id).  You can partition your
storm topology (and the input and output queues) on that correlation ID to
achieve some load balancing.  Finally, if your output streams are infinite,
you need some mechanism to stop them...

As a side benefit, you can overcome some limitations of DRPC such as no
control over serialization and even have multiple trident streams all
writing to your output queue (whereas DRPC doesn't support that sort of
branching in the topology).

Adam


On Fri, Feb 14, 2014 at 7:06 PM, Carl Lerche m...@carllerche.com wrote:

 Hello,

 I noticed that the DRPC client only allows a single response (and for
 that response to be JSON encoded). I was hoping to implement some sort
 of DRPC stream where I a constant stream of data based on a given
 query. Also, is there a way to serialize the response using Kryo
 instead of JSON? The specific format of my data is not very JSON
 friendly.

 Cheers,
 Carl



Re: storm jar slowness

2014-02-14 Thread Adam Lewis
thanks for the pointer...it looks like it is using default 15K chunk sizes,
I'll see if tweaking that has any effect.


On Fri, Feb 14, 2014 at 5:48 PM, Jon Logan jmlo...@buffalo.edu wrote:

 The code that uploads it is
 at backtype.storm.StormSubmitter#submitJar(java.util.Map,
 java.lang.String). It looks like it's just a simple upload over
 Thrift...SCP is specifically designed for file uploads, and is probably
 better-tuned for large transfers, through compression, or whatever other
 means.

 You could just upload the Jar using SCP, and then submit it from the
 server itself. I think many (most?) use cases are submitted on a local
 network, where upload speed is not a concern.


 On Fri, Feb 14, 2014 at 5:18 PM, Adam Lewis superca...@gmail.com wrote:

 I've seen this issue raised on the list in the past, but with no clear
 suggestions:

 storm jar is very slow at sending the jar file, averaging about 250
 KB/s between my system and EC2...is there some reason for this in the way
 storm sends the jar?  scp goes about 2.5 MB/s, the same 10x difference I've
 seen reported previously.

 I'm using storm 0.9.0.1

 Any ideas?

 Thanks,
 Adam





Re: storm jar slowness

2014-02-14 Thread Adam Lewis
This is promising, 150KB chunk size is giving me over 1 MB/sec; 300KB chunk
size up to 2 MB/sec although spikier performance.  My experimental setup
leaves a lot to be desired here, but it seems pretty conclusive that 15KB
is not optimal (at least over-the-internet).  As far as I can tell the only
downside of larger chunks is that thrift keeps an entire chunk in memory.

Something in the 100 to 200 KB range seems reasonable.  Any thoughts?
 Perhaps I should open a JIRA ticket for this.

As for use cases: I'm sure in production everything will be on the same
network.  For me, it is the transition from using local cluster to
deploying to a real cluster (which happens to run on AWS across the
Internet from my dev machine) and dealing with classpath, serialization and
other fun issues that don't crop up in local mode...and my fat jar hasn't
been put on a diet yet so it is large...all of which adds up to long
code/build/test cycle times.

But, increasing chunk size is really helping, and the 15KB default seems
arbitrary.  If this seems reasonable I'll file a JIRA and a PR.


On Fri, Feb 14, 2014 at 7:57 PM, Adam Lewis gm...@adamlewis.com wrote:

 thanks for the pointer...it looks like it is using default 15K chunk
 sizes, I'll see if tweaking that has any effect.


 On Fri, Feb 14, 2014 at 5:48 PM, Jon Logan jmlo...@buffalo.edu wrote:

 The code that uploads it is
 at backtype.storm.StormSubmitter#submitJar(java.util.Map,
 java.lang.String). It looks like it's just a simple upload over
 Thrift...SCP is specifically designed for file uploads, and is probably
 better-tuned for large transfers, through compression, or whatever other
 means.

 You could just upload the Jar using SCP, and then submit it from the
 server itself. I think many (most?) use cases are submitted on a local
 network, where upload speed is not a concern.


 On Fri, Feb 14, 2014 at 5:18 PM, Adam Lewis superca...@gmail.com wrote:

 I've seen this issue raised on the list in the past, but with no clear
 suggestions:

 storm jar is very slow at sending the jar file, averaging about 250
 KB/s between my system and EC2...is there some reason for this in the way
 storm sends the jar?  scp goes about 2.5 MB/s, the same 10x difference I've
 seen reported previously.

 I'm using storm 0.9.0.1

 Any ideas?

 Thanks,
 Adam






Re: storm jar slowness

2014-02-14 Thread Adam Lewis
Great.  STORM-241 has been filed.

https://issues.apache.org/jira/browse/STORM-241


On Fri, Feb 14, 2014 at 8:42 PM, Nathan Marz nat...@nathanmarz.com wrote:

 Yes, it is arbitrary. Opening an issue for this is a good idea.


 On Fri, Feb 14, 2014 at 5:24 PM, Adam Lewis gm...@adamlewis.com wrote:

 This is promising, 150KB chunk size is giving me over 1 MB/sec; 300KB
 chunk size up to 2 MB/sec although spikier performance.  My experimental
 setup leaves a lot to be desired here, but it seems pretty conclusive that
 15KB is not optimal (at least over-the-internet).  As far as I can tell the
 only downside of larger chunks is that thrift keeps an entire chunk in
 memory.

 Something in the 100 to 200 KB range seems reasonable.  Any thoughts?
  Perhaps I should open a JIRA ticket for this.

 As for use cases: I'm sure in production everything will be on the same
 network.  For me, it is the transition from using local cluster to
 deploying to a real cluster (which happens to run on AWS across the
 Internet from my dev machine) and dealing with classpath, serialization and
 other fun issues that don't crop up in local mode...and my fat jar hasn't
 been put on a diet yet so it is large...all of which adds up to long
 code/build/test cycle times.

 But, increasing chunk size is really helping, and the 15KB default seems
 arbitrary.  If this seems reasonable I'll file a JIRA and a PR.


 On Fri, Feb 14, 2014 at 7:57 PM, Adam Lewis gm...@adamlewis.com wrote:

 thanks for the pointer...it looks like it is using default 15K chunk
 sizes, I'll see if tweaking that has any effect.


 On Fri, Feb 14, 2014 at 5:48 PM, Jon Logan jmlo...@buffalo.edu wrote:

 The code that uploads it is
 at backtype.storm.StormSubmitter#submitJar(java.util.Map,
 java.lang.String). It looks like it's just a simple upload over
 Thrift...SCP is specifically designed for file uploads, and is probably
 better-tuned for large transfers, through compression, or whatever other
 means.

 You could just upload the Jar using SCP, and then submit it from the
 server itself. I think many (most?) use cases are submitted on a local
 network, where upload speed is not a concern.


 On Fri, Feb 14, 2014 at 5:18 PM, Adam Lewis superca...@gmail.comwrote:

 I've seen this issue raised on the list in the past, but with no clear
 suggestions:

 storm jar is very slow at sending the jar file, averaging about 250
 KB/s between my system and EC2...is there some reason for this in the way
 storm sends the jar?  scp goes about 2.5 MB/s, the same 10x difference 
 I've
 seen reported previously.

 I'm using storm 0.9.0.1

 Any ideas?

 Thanks,
 Adam







 --
 Twitter: @nathanmarz
 http://nathanmarz.com



Re: http-client version conflict

2014-02-06 Thread Adam Lewis
My $0.02 on this subject:

Without going down the path of class loader or OSGi mania and becoming a
full container, I'd definitely be in favor of storm relocating its own
dependencies.  In this way edge cases around things like reflection can be
handled once within storm rather than burdening every topology builder with
those details.  Part of the problem seems to be that storm makes extensive
use (directly or transitively) of a lot of go-to utility libraries like
guava, thrift, jodatime, json-simple, snakeyaml, commons-io, etc... I'm
sure that leveraging these libraries allowed storm's development to proceed
rapidly, but from a maturity perspective, it is problematic to impose these
version choices on users.  And while I might want Storm to, say, try to
track the latest Guava version, that same policy could be very problematic
for others.

If storm can relocate even some of its own dependencies, I think that would
be a great help to me at least.  Longer term, I wonder how much of some of
these libraries are really being used.  For example, is clj-time (and by
extension joda-time) really needed? Or just a small fraction of the
functionality in that library?  I can probably pitch in some of the effort
required to do this, if this is the direction people want to go in.




On Thu, Feb 6, 2014 at 8:44 PM, P. Taylor Goetz ptgo...@gmail.com wrote:

 I'm glad the shader plugin worked for you.

 Updating dependencies can be tricky as it can easily introduce
 regressions.

 Ultimately we need to figure out the best solution to avoiding conflicts
 between user code (i.e. dependencies in topology jar files) and Storm's
 libraries.

 The classloader approach has been attempted, but IMO Storm's use of
 serialization complicates things significantly. Package relocation seems to
 be a relatively lightweight solution.

 If that's a direction we pursue, then it introduces the question of
 whether Storm should relocate its dependencies, or if that should be left
 up to the user (topology developer).

 Elastic Search has gone down the path of relocating some of their
 dependencies [1] (not necessarily an endorsement, just an observation).

 I've CC'd dev@ since this is all related to the infamous issue #115,
 which is now STORM-129 [2].

 - Taylor

 [1]
 https://github.com/elasticsearch/elasticsearch/blob/master/pom.xml#L474
 [2] https://issues.apache.org/jira/browse/STORM-129





 On Feb 6, 2014, at 7:25 PM, Vinay Pothnis vinay.poth...@gmail.com wrote:

 Thank you all for replies! The shader-plugin solution seems to work for
 us.

 I wonder if we can create a JIRA ticket for storm to upgrade the
 http-client library as part of their next release.

 -Vinay


 On Thu, Feb 6, 2014 at 2:38 PM, Michael Rose mich...@fullcontact.comwrote:

 We've done this with SLF4j and Guava as well without issues.

 Michael Rose (@Xorlev https://twitter.com/xorlev)
 Senior Platform Engineer, FullContact http://www.fullcontact.com/
 mich...@fullcontact.com


 On Thu, Feb 6, 2014 at 3:03 PM, Mark Greene m...@evertrue.com wrote:

 We had this problem as well. We modified our chef cookbook to just
 replace the older version with the newer one and storm didn't complain or
 have any other issues as a result.


 On Wed, Feb 5, 2014 at 10:31 AM, P. Taylor Goetz ptgo...@gmail.comwrote:

 Your best bet is probably  to use the shade plugin to relocate the
 http-client package so it doesn't conflict with the version storm uses.

 Storm does this with the libtrhift dependency in storm-core:


 https://github.com/apache/incubator-storm/blob/master/storm-core/pom.xml#L220

 (You can ignore the clojure transformer in that config, unless you have
 non-AOT clojure code that uses the http-client library).

 More information on using the shade plugin to do package relocations
 can be found here:


 http://maven.apache.org/plugins/maven-shade-plugin/examples/class-relocation.html

 - Taylor

 On Feb 4, 2014, at 4:27 PM, Vinay Pothnis vinay.poth...@gmail.com
 wrote:

  Hello,
 
  I am using storm version 0.9.0.1.
  My application depends on apache http-client version 4.3.2 - but
 storm depends on http-client version 4.1.1.
 
  What is the best way to override this dependency?
 
  Thanks
  Vinay








Re: Not able to build wurstmeister storm-kafka-0.8-plus

2014-01-14 Thread Adam Lewis
I've found that Maven can drift away from MacOS in terms of which java
version they use (even if you have your preferred version on the path and
JAVA_HOME).  Did you compare java -version and mvn -version?  Something is
getting built by Java 7; the latest storm-kafka-0.8-plus pom on github
depends on storm 0.9.0 which was accidentally built using Java 7, check out
storm 0.9.0.1 for Java6 version...or move to Java 7 if you can.


On Tue, Jan 14, 2014 at 2:00 AM, Krishnanand Khambadkone 
kkhambadk...@yahoo.com wrote:

 Hi,  When I do a mvn package I get this message

 Tests in error:

 testMultiplePartitionsOnDifferentHosts(storm.kafka.DynamicBrokersReaderTest):
 backtype/storm/utils/Utils : Unsupported major.minor version 51.0
   testSwitchHostForPartition(storm.kafka.DynamicBrokersReaderTest):
 backtype/storm/utils/Utils
   testGetBrokerInfo(storm.kafka.DynamicBrokersReaderTest):
 backtype/storm/utils/Utils
   testMultiplePartitionsOnSameHost(storm.kafka.DynamicBrokersReaderTest):
 backtype/storm/utils/Utils

 My java version is, on mac os x 10.7.5

 java version 1.6.0_29
 Java(TM) SE Runtime Environment (build 1.6.0_29-b11-402-11M4609)
 Java HotSpot(TM) 64-Bit Server VM (build 20.4-b02-402, mixed mode)



Re: Not able to build wurstmeister storm-kafka-0.8-plus

2014-01-14 Thread Adam Lewis
I think Sasi's comment was along the right lines.  If you specify the Storm
version in your own POM it will take precedent over the version specified
in storm-kafka-0.8-plus.  Note that storm 0.9.0.1 fixes the issue of Java 7
binaries.


On Tue, Jan 14, 2014 at 12:24 PM, Krishnanand Khambadkone 
kkhambadk...@yahoo.com wrote:

 Adam,  thank you for the tips.  Unfortunately moving to java 7 is not an
 option for me.  Even moving to a higher version of 1.6 (1.6_51 and above)
 breaks hadoop and creates a mess.   Is there a workaround for this,  such
 as specifying the source and target version in pom.xml or some such thing.




   On Tuesday, January 14, 2014 6:31 AM, Adam Lewis m...@adamlewis.com
 wrote:
  I've found that Maven can drift away from MacOS in terms of which java
 version they use (even if you have your preferred version on the path and
 JAVA_HOME).  Did you compare java -version and mvn -version?  Something is
 getting built by Java 7; the latest storm-kafka-0.8-plus pom on github
 depends on storm 0.9.0 which was accidentally built using Java 7, check out
 storm 0.9.0.1 for Java6 version...or move to Java 7 if you can.


 On Tue, Jan 14, 2014 at 2:00 AM, Krishnanand Khambadkone 
 kkhambadk...@yahoo.com wrote:

 Hi,  When I do a mvn package I get this message

 Tests in error:

 testMultiplePartitionsOnDifferentHosts(storm.kafka.DynamicBrokersReaderTest):
 backtype/storm/utils/Utils : Unsupported major.minor version 51.0
   testSwitchHostForPartition(storm.kafka.DynamicBrokersReaderTest):
 backtype/storm/utils/Utils
   testGetBrokerInfo(storm.kafka.DynamicBrokersReaderTest):
 backtype/storm/utils/Utils
   testMultiplePartitionsOnSameHost(storm.kafka.DynamicBrokersReaderTest):
 backtype/storm/utils/Utils

 My java version is, on mac os x 10.7.5

 java version 1.6.0_29
 Java(TM) SE Runtime Environment (build 1.6.0_29-b11-402-11M4609)
 Java HotSpot(TM) 64-Bit Server VM (build 20.4-b02-402, mixed mode)







Re: Is storm suitable for processing tuples with some dependency

2014-01-12 Thread Adam Lewis
Probably yes, but how you do it will depend on the specifics of your use
case.  For instance, you will have to decide if partial pairings are best
kept in memory or in a more reliable state store.  Also, whether to use
trident or not.  I found Svend's article on this sort of thing to be a
great starting point when thinking through this sort of problem:

http://svendvanderveken.wordpress.com/2013/07/30/scalable-real-time-state-update-with-storm/

Adam


On Sun, Jan 12, 2014 at 10:42 AM, 李家宏 jh.li...@gmail.com wrote:

 Hi, all

 Is storm suitable for processing tuples with some dependency ?
 For example, a tuple contains a request, another tuple contains
 corresponding response, the two tuples are depended on each other. Is that
 possible to take the two tuples as a complete session while stream
 processing.

 Regards

 --

 ==

 Gvain

 Email: jh.li...@gmail.com