Re: Multiple groupBy with Trident
You can't emit on two streams in trident, but you can use the DSL to split a stream into two processing paths, e.g. Stream *queryResultStream* = someState.stateQuery(...); Stream firstGroupingAggregate = *queryResultStream* .groupBy(/* first grouping */) .aggregate(...) ; Stream secondGroupingAggregate = *queryResultStream* .groupBy(/* second grouping */) .aggregate(...) ; I'm not entirely sure I understand the original request, but if the idea of doing different processing on identical streams works for you, then this is the idiom for doing that. On Thu, Sep 18, 2014 at 4:04 AM, Bechennec, Marion mar...@dictanova.com wrote: Hi, Thanks for the answer, but in Trident it doesn't seem possible to emit on a different streams in the same bolt, therefore it wouldn't be possible to combine the output of the 2 streams. 2014-09-15 20:54 GMT+02:00 John Reilly j...@inconspicuous.org: Can you split the initial stream into 2 identical streams (using a bolt) and then perform a groupBy on each of the streams and then combine the output of those 2 groupBys? On Fri, Sep 12, 2014 at 9:51 AM, Bechennec, Marion mar...@dictanova.com wrote: Hi, For one of our applications we are trying to perform multiple groupBy statements on the same stream, ideally it should look like this : .stateQuery(...) .groupBy(new Fields(field)) .chainedAgg() .groupBy(new Fields(anotherField)) .chainedAgg() .aggregate( ...) .chainEnd() .aggregate() .aggregate() .chainEnd(); Obviously this doesn't work, we've tried several things, and came up with something like this : .stateQuery(...) .groupBy(new Fields(field, anotherField)) .aggregate( /* Do something */) .groupBy(new Fields(lemma)) .chainedAgg() .aggregate() .aggregate() .chainEnd(); However, this doesn't work either, we are not able to reemit the initial values on the input stream of the second groupBy statement. Any thoughts on how can this be accomplished ? Thank you for your help, Marion
Re: Packaging Multiple Topology Jars
Others correct me if I'm wrong, but I believe using your multi-topology uber jar to deploy only one topology should not affect the other running topologies that were (in the past) deployed using an earlier version of the same jar. I believe the jar is stored at time of deployment under a unique ID. You also don't strictly need to have a separate main methods for each topology, assuming you parameterize your use of the StormSubmitter. I've done this with success where I have one main method which deploys multiple topologies in a loop. If the topology is already deployed, an exception is thrown (which I catch inside the loop) and continue on. My dev deployment process then becomes: kill topologies I wish to re-deploy, run the one main method which tries to submit all topos, but only succeeds on the ones which had been killed. On Thu, Jun 26, 2014 at 2:37 PM, Sandon Jacobs sjac...@appia.com wrote: We currently have 1 project in GIT containing multiple topologies. With this model, I use 1 compiled artifact containing several “topology” classes, each with a main method to configure, build, and submit the topology. Some of these topologies share some common components (bolt classes, util classes, etc…). I do not necessarily need to deploy the newest version of each topology ever time we release code. Here are couple of options I have thought of: - Break up project into a parent module, keeping 1 GIT repo, with a child module for common components and child for each topology. - Break the common code into a GIT repo, then each topology into a GIT repo (don’t really wanna do this one at all). - Have the gradle build create a JAR per topology, using exclusion/inclusion in gradle a task. I see PROs and CONs to each approach. I am curious as to how others are maintaining this model. Do you have a separate compiled artifact for each topology? Do you use a similar approach to ours? Thanks in advance…
Re: persistentAggregate chaining in Trident?
It sounds like you want a persistentAggregate to occur before the stream grouping by word, would this work? in pseudo trident DSL: Stream words = ...; words.groupBy(word).persistentAggregate(...); words.persistentAggregate(...); On Tue, Jun 24, 2014 at 12:45 PM, Can Gencer cgen...@gmail.com wrote: Hi all, I'm wondering what is the best way to chain persistent aggregations in Trident. Let's say I have a running count of words and I also want to do another aggregation to calculate the total count of all words using the results of the previous aggregation. I can use a persistentAggregate to calculate the running total for each word. However I can't use another persistentAggregate directly chained to that as that would only add the new value instead of decrementing the old value for the group first. What is the best solution to this scenario? Does storm have a way to handle this out of the box? Regards, Can
Re: Trident Binned Aggregation Design Help
Do you have any insight into how DRPC plays into this? The groupBy bolt boundary makes perfect sense and I understand how that maps to some collection of bolts that would process different groupings (depending on parallelism). What stumps me are the cases where adding multiple DRPC spouts to the topology seems to result in the whole things being duplicated for each spout. I can see some extra tracking mechanisms get stood up to track DRPC requests and then match request with response but still not sure why that wouldn't scale linearly with # of DRPC spouts. On Tue, Jun 17, 2014 at 8:05 PM, P. Taylor Goetz ptgo...@gmail.com wrote: Not at the moment but I will be adding that functionality (trident state) to the storm-hbase project very soon. Currently it only supports MapState. -Taylor On Jun 17, 2014, at 6:09 PM, Andrew Serff and...@serff.net wrote: I'm currently using Redis, but I'm by no means tied to it. Are there any example of using either to do that? Andrew On Tue, Jun 17, 2014 at 3:51 PM, P. Taylor Goetz ptgo...@gmail.com wrote: Andrew/Adam, Partitioning operations like groupBy() form the bolt boundaries in trident topologies, so the more you have the more bolts you will have and thus, potentially, more network transfer. What backing store are you using for persistence? If you are using something with counter support like HBase or Cassandra you could leverage that in combination with tridents exactly once semantics to let it handle the counting, and potentially greatly reduce the complexity of your topology. -Taylor On Jun 17, 2014, at 5:15 PM, Adam Lewis m...@adamlewis.com wrote: I, too, am eagerly awaiting a reply from the list on this topic. I hit up against max topology size limits doing something similar with trident. There are definitely linear changes to a trident topology that result in quadratic growth of the compiled storm topology size, such as adding DRPC spouts. Sadly the compilation process of trident to plain storm remains somewhat opaque to me and I haven't had time to dig deeper. My work around has been to limit myself to one DRPC spout per topology and programmatically build multiple topologies for the variations (which results in a lot of structural and functional duplication of deployed topologies, but at least not code duplication). Trident presents a seemingly nice abstraction, but from my point of view it is a leaky one if I need to understand the compilation process to know why adding a single DRPC spout double the topology size. On Tue, Jun 17, 2014 at 4:33 PM, Andrew Serff and...@serff.net wrote: Is there no one out there that can help with this? If I use this paradigm, my topology ends up having like 170 bolts. Then I add DRPC stream and I have like 50 spouts. All of this adds up to a topology that I can't even submit because it's too large (and i've bumped the trident max to 50mb already...). It seems like I'm thinking about this wrong, but I haven't be able to come up with another way to do it. I don't really see how using vanilla Storm would help, maybe someone can offer some guidance? Thanks Andrew On Mon, Jun 9, 2014 at 11:45 AM, Andrew Serff and...@serff.net wrote: Hello, I'm new to using Trident and had a few questions about the best way to do things in this framework. I'm trying to build a real-time streaming aggregation system and Trident seems to have a very easy framework to allow me to do that. I have a basic setup working, but as I am adding more counters, the performance becomes very slow and eventually I start having many failures. At the basic level here is what I want to do: Have an incoming stream that is using a KafkaSpout to read data from. I take the Kafka stream, parse it and output multiple fields. I then want many different counters for those fields in the data. For example, say it was the twitter stream. I may want to count: - A counter for each username I come across. So how many times I have received a tweet from each user - A counter for each hashtag so you know how many tweets mention a hashtag - Binned counters based on date for each tweet (i.e. how many tweets in 2014, June 2014, June 08 2014, etc). The list could continue, but this can add up to hundreds of counters running in real time. Right now I have something like the following: TridentTopology topology = new TridentTopology(); KafkaSpout spout = new KafkaSpout(kafkaConfig); Stream stream = topology.newStream(messages, spout).shuffle() .each(new Fields(str), new FieldEmitter(), new Fields(username, hashtag)); stream.groupBy(new Fields(username)) .persistentAggregate(stateFactory, new Fields(username), new Count(), new Fields(count)) .parallelismHint(6); stream.groupBy(new Fields(hashtag)) .persistentAggregate(stateFactory, new Fields(hashtag), new Count(), new Fields(count
Re: Trident Binned Aggregation Design Help
I, too, am eagerly awaiting a reply from the list on this topic. I hit up against max topology size limits doing something similar with trident. There are definitely linear changes to a trident topology that result in quadratic growth of the compiled storm topology size, such as adding DRPC spouts. Sadly the compilation process of trident to plain storm remains somewhat opaque to me and I haven't had time to dig deeper. My work around has been to limit myself to one DRPC spout per topology and programmatically build multiple topologies for the variations (which results in a lot of structural and functional duplication of deployed topologies, but at least not code duplication). Trident presents a seemingly nice abstraction, but from my point of view it is a leaky one if I need to understand the compilation process to know why adding a single DRPC spout double the topology size. On Tue, Jun 17, 2014 at 4:33 PM, Andrew Serff and...@serff.net wrote: Is there no one out there that can help with this? If I use this paradigm, my topology ends up having like 170 bolts. Then I add DRPC stream and I have like 50 spouts. All of this adds up to a topology that I can't even submit because it's too large (and i've bumped the trident max to 50mb already...). It seems like I'm thinking about this wrong, but I haven't be able to come up with another way to do it. I don't really see how using vanilla Storm would help, maybe someone can offer some guidance? Thanks Andrew On Mon, Jun 9, 2014 at 11:45 AM, Andrew Serff and...@serff.net wrote: Hello, I'm new to using Trident and had a few questions about the best way to do things in this framework. I'm trying to build a real-time streaming aggregation system and Trident seems to have a very easy framework to allow me to do that. I have a basic setup working, but as I am adding more counters, the performance becomes very slow and eventually I start having many failures. At the basic level here is what I want to do: Have an incoming stream that is using a KafkaSpout to read data from. I take the Kafka stream, parse it and output multiple fields. I then want many different counters for those fields in the data. For example, say it was the twitter stream. I may want to count: - A counter for each username I come across. So how many times I have received a tweet from each user - A counter for each hashtag so you know how many tweets mention a hashtag - Binned counters based on date for each tweet (i.e. how many tweets in 2014, June 2014, June 08 2014, etc). The list could continue, but this can add up to hundreds of counters running in real time. Right now I have something like the following: TridentTopology topology = new TridentTopology(); KafkaSpout spout = new KafkaSpout(kafkaConfig); Stream stream = topology.newStream(messages, spout).shuffle() .each(new Fields(str), new FieldEmitter(), new Fields(username, hashtag)); stream.groupBy(new Fields(username)) .persistentAggregate(stateFactory, new Fields(username), new Count(), new Fields(count)) .parallelismHint(6); stream.groupBy(new Fields(hashtag)) .persistentAggregate(stateFactory, new Fields(hashtag), new Count(), new Fields(count)) .parallelismHint(6); Repeat something similar for everything you want to have a unique count for. I end up having hundreds of GroupBys each that has an aggregator for each. I have so far only run this on my local machine and not on a cluster yet, but I'm wondering if this is the correct design for something like this or if there is a better way to distribute this within Trident to make it more efficient. Any suggestions would be appreciated! Thanks! Andrew
Re: Trident Stream Join with Self
Since you are doing a self join you don't need to actually use trident join, or the multireducer on which it is based. You could group the stream on your join key, then write an aggregator which collects all the tuples in each group and emits the cross product at the end of each batch (or in a streaming fashion where each incremental tuple emits the cross of that tuple with all the tuples already received); and then finally implement a filter function downstream of each aggregates output. Your aggregator will have to take care of the * in your SQL example since typically aggregators only keep the join key plus the aggregated value. On Thu, Apr 24, 2014 at 5:46 PM, Charles LeDoux charles.a.led...@gmail.comwrote: Is it possible to join a trident stream with itself? My particular use case is that I want to take the cross product of all the incoming tuples for a batch and then only keep the joined tuples containing a known value. I believe the SQL for what I am trying to accomplish is: SELECT * FROM table AS t1 JOIN table AS t2 ON field1 WHERE t1.field2 = known value; My intention was to do a self join on my stream and then run the now joined stream through a filter. Thanks, Charles -- PhD Candidate; University Fellow University of Louisiana at Lafayette Center for Advanced Computer Studies http://charlesledoux.com
Re: Topology submission exception caused by Class Not Found backtype.storm.daemon.nimbus$normalize_conf$get_merged_conf_val__3916$fn__3917
Are there other things that could cause this error? Since upgrading to 0.9.1-incubating, I've hit it twice. The first time I resolved it (in one project) by fixing an issue where two slf4j bindings were on the classpath together (strange, but it worked)...now I'm hitting the problem again in a different project and can't figure out what is causing the problem. This is for a test which is submitting a topology to a LocalCluster; the full trace follows (happens launching JUnit from Eclipse and from Maven command line) java.lang.RuntimeException: java.lang.ClassNotFoundException: backtype.storm.daemon.nimbus$normalize_conf$get_merged_conf_val__3916$fn__3917 at backtype.storm.utils.Utils.deserialize(Utils.java:88) at backtype.storm.daemon.nimbus$read_storm_conf.invoke(nimbus.clj:89) at backtype.storm.daemon.nimbus$start_storm.invoke(nimbus.clj:724) at backtype.storm.daemon.nimbus$eval3974$exec_fn__1459__auto__$reify__3987.submitTopologyWithOpts(nimbus.clj:962) at backtype.storm.daemon.nimbus$eval3974$exec_fn__1459__auto__$reify__3987.submitTopology(nimbus.clj:971) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28) at backtype.storm.testing$submit_local_topology.invoke(testing.clj:253) at backtype.storm.LocalCluster$_submitTopology.invoke(LocalCluster.clj:34) at backtype.storm.LocalCluster.submitTopology(Unknown Source) at com.acuitysds.trident.TestTimeseriesAssembly.testBasicTopology(TestTimeseriesAssembly.java:108) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) at org.junit.runners.ParentRunner.run(ParentRunner.java:309) at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50) at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197) Caused by: java.lang.ClassNotFoundException: backtype.storm.daemon.nimbus$normalize_conf$get_merged_conf_val__3916$fn__3917 at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:623) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1610) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1769) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1913) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) at
Re: Topology submission exception caused by Class Not Found backtype.storm.daemon.nimbus$normalize_conf$get_merged_conf_val__3916$fn__3917
The tests are in the test tree (which I don't think eclipse cares about, but should get treated properly on the command line). The scope wasn't provided, it was defaulted to runtime...and I've now corrected that and it doesn't help. Strangely, if I explicitly exclude logback-classic within the storm dependency, it does work as long as I also add a dependency on slf4j-simple...so I am still seeing weird logger induced classpath issues...so this is what is working for me: dependency groupIdorg.apache.storm/groupId artifactIdstorm-core/artifactId exclusions exclusion artifactIdlogback-classic/artifactId groupIdch.qos.logback/groupId /exclusion /exclusions /dependency dependency groupIdorg.slf4j/groupId artifactIdslf4j-simple/artifactId /dependency WEIRD! On Mon, Apr 21, 2014 at 8:54 PM, Jon Logan jmlo...@buffalo.edu wrote: Are your maven scopes right? The scope of the Storm dependency should be provided -- not runtime. Also be sure that your main method / unit test is under your test/ classpath, not your main/ classpath. On Mon, Apr 21, 2014 at 8:49 PM, Adam Lewis m...@adamlewis.com wrote: Are there other things that could cause this error? Since upgrading to 0.9.1-incubating, I've hit it twice. The first time I resolved it (in one project) by fixing an issue where two slf4j bindings were on the classpath together (strange, but it worked)...now I'm hitting the problem again in a different project and can't figure out what is causing the problem. This is for a test which is submitting a topology to a LocalCluster; the full trace follows (happens launching JUnit from Eclipse and from Maven command line) java.lang.RuntimeException: java.lang.ClassNotFoundException: backtype.storm.daemon.nimbus$normalize_conf$get_merged_conf_val__3916$fn__3917 at backtype.storm.utils.Utils.deserialize(Utils.java:88) at backtype.storm.daemon.nimbus$read_storm_conf.invoke(nimbus.clj:89) at backtype.storm.daemon.nimbus$start_storm.invoke(nimbus.clj:724) at backtype.storm.daemon.nimbus$eval3974$exec_fn__1459__auto__$reify__3987.submitTopologyWithOpts(nimbus.clj:962) at backtype.storm.daemon.nimbus$eval3974$exec_fn__1459__auto__$reify__3987.submitTopology(nimbus.clj:971) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28) at backtype.storm.testing$submit_local_topology.invoke(testing.clj:253) at backtype.storm.LocalCluster$_submitTopology.invoke(LocalCluster.clj:34) at backtype.storm.LocalCluster.submitTopology(Unknown Source) at com.acuitysds.trident.TestTimeseriesAssembly.testBasicTopology(TestTimeseriesAssembly.java:108) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) at org.junit.runners.ParentRunner.run(ParentRunner.java:309) at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50) at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197) Caused by: java.lang.ClassNotFoundException: backtype.storm.daemon.nimbus$normalize_conf$get_merged_conf_val__3916$fn__3917 at java.net.URLClassLoader$1
Re: Local Cluster serialization
From my observations, serialization is going to occur only if you have parallelism in your LocalCluster (more than one worker thread). I've also found that when going from LocalCluster to the real thing, there will always be a couple unexpected twists and some tuning to do. Also, make sure to turn off java serialization fallback ( Config#setFallBackOnJavaSerialization(false)) to make sure Kryo is working properly for all your types. On Sat, Mar 29, 2014 at 5:24 PM, János Háber janos.ha...@finesolution.huwrote: Hi, I have a little question. In Local Cluster mode the serialization enabled? Example if I emit a Json4s JValue type it's serializable by kyro? How can I check without release to normal cluster? Thanks b0c1
Re: DI with Storm
Yes that is exactly right, the submission to Nimbus is in the form of a big thrift message describing the topology...this message includes java serialized blobs of your topology components (spouts/bolts). They get instantiated within the VM calling StormSubmitter. Typically you would pass configuration info to the constructor, but dependencies (e.g. DB connection pool, etc) are transient fields. Then in the prepare method (called after deserialization on the worker) you use the serialized configuration fields to initialize the transient ones. Of course Guice fits naturally into that step. On Thu, Mar 27, 2014 at 12:37 AM, Software Dev static.void@gmail.comwrote: Ok so you would configure the map in the main method before submitting the topology. Then this conf can be used to create guice injectors. Is that correct? In the book Getting Started with Storm it states: To customize a bolt, you should set parameters in its constructor and save them as instance variables so they will be serialized when submitting the bolt to the cluster. Does this mean bolts are instantiated on the client side before being submitted to nimbus/cluster? On Wed, Mar 26, 2014 at 2:05 PM, Svend Vanderveken svend.vanderve...@gmail.com wrote: The storm configuration map is part of the arguments received by each prepare() method, in most Storm primitives, on each worker. It's serialised to each worker when a topology instance is started there. The initial storm configuration map is provided at deploy time to Nimbus, in the class containing the main() method, specified in the storm jar blabla.jar some.class.here command. On Wed, Mar 26, 2014 at 4:42 PM, Software Dev static.void@gmail.com wrote: How does one get the configuration map to each worker? On Wed, Mar 26, 2014 at 6:41 AM, Adam Lewis m...@adamlewis.com wrote: Or, since this is only being called from prepare at startup anyway, simpler: public class InjectorProvider { private static Injector injector; public static synchronized Injector get(Map conf) { if (injector == null) { injector = Guice.createInjector( new DAOModule(conf), new S3Module(conf)); } return injector; } } On Wed, Mar 26, 2014 at 9:26 AM, Svend Vanderveken svend.vanderve...@gmail.com wrote: private static Injector injector; or better: private static volatile Injector injector; see http://www.cs.umd.edu/~pugh/java/memoryModel/DoubleCheckedLocking.html On Tue, Mar 25, 2014 at 9:55 PM, Patricio Echagüe patric...@gmail.com wrote: It's fine. You can synchronize with a static monitor the creation on the injector per worker. That's how I do it. public class InjectorProvider { private static Injector injector; public static Injector get() { if (injector == null) { synchronized (InjectorProvider.class) { if (injector == null) { injector = Guice.createInjector( new DAOModule(), new S3Module(); } } } return injector; } On Tue, Mar 25, 2014 at 6:24 PM, Adam Lewis m...@adamlewis.com wrote: Doesn't Storm 0.9 have a prepare for the worker? No, I don't think it does, but please point this out if I'm mistaken. I found the right JIRA issue though: https://issues.apache.org/jira/browse/STORM-126 Seems like the patch was well along but hasn't seen any recent activity.
Re: date time in tuple
Also it might be worth reading: https://github.com/nathanmarz/storm/wiki/Serialization After which you'll seek out this library: https://github.com/magro/kryo-serializers On Thu, Mar 27, 2014 at 11:09 AM, Dan Guja dang...@gmail.com wrote: Also it might be worth reading: https://github.com/nathanmarz/storm/wiki/Serialization On Thu, Mar 27, 2014 at 9:01 AM, Dan Guja dang...@gmail.com wrote: Try this: (DateTime)tuple.getValueByField(myDateTimeFieldName); On Thu, Mar 27, 2014 at 8:50 AM, michael campbell michael.campb...@dsl.pipex.com wrote: How do you put a datetime, let's say a jodatime datetime value, in a tuple? How do you get a datetime out of a tuple, what sort of method corresponds to tuple.getLongByField for a datetime? Michael Campbell --
Re: DI with Storm
As I recall there was an open issue for adding all the needed lifecycle hooks for integrating cleanly with something like Guice (worker lifecycle callbacks?) but I can't find it now...maybe STORM-172 . In any case, I think there is an issue with not having quite enough exposed from storm to properly integrate things like Guice. Having said that, I'm using Guice now, but it is basically a hack until I can find a better way. The key to my hack is that injectors are not serializable, but modules are. I use the singleton pattern with static accessor to hold the injector which is created lazily (i.e. after topology components have been deserialized and are running in the worker VM). I instantiate my guice module during topology build and it gets serialized as part of the topo definition. In my storm components, I pass the module to the singleton injector factory which creates the injector from the passed module if one doesn't exist in the VM, and just returns the existing injector otherwise. Finally, with an injector in hand for the prepare method, I can then either do a injector.getInstance() to create objects (e.g. inside a trident state factory) or injector.injectMembers(this) in a prepare method if my topology component has transient @Inject fields for dependencies (since storm controls the object lifecycle and I can't use constructor injection there) The biggest problem with this hack is it assumes that all module instances are interchangeable (and identical) and the first component to init gets to build the injector. With the right lifecycle hooks, the injector could be created in a cleaner way. On Tue, Mar 25, 2014 at 12:45 PM, Software Dev static.void@gmail.com wrote: How would you go about use DI (Google Guice) with a Storm cluster? The worker nodes themselves will not have access to the Injector instance after job submission. Thanks
Re: serialization exception when using multiple worker/supervisors
You can register non-visible classes in code, but it is a pain. e.g. I had the issue with a Guava type which happened to by java serializable but no easy way to implement a kryo serializer (and I don't like auto fall-back on java serialization within storm since I prefer to know when things aren't getting handled by kryo) List serializers = (List) stormConf.get(Config.TOPOLOGY_KRYO_REGISTER); serializers.add(Collections.singletonMap( com.google.common.collect.RegularImmutableList, SerializableSerializer. class.getName())); On Tue, Mar 25, 2014 at 11:56 AM, Vinay Pothnis vinay.poth...@gmail.comwrote: Naresh, Cannot do that in code because the I cannot reference the private class to register. Samit, This is actually being used indirectly. I use 'storm-rabbitmq' library ( https://github.com/ppat/storm-rabbitmq) and that internally initializes the rabbit mq ConnectionFactory. As part of that initialization, the default client properties are added and that is where the ByteArrayLongString class is being referenced. http://grepcode.com/file/repo1.maven.org/maven2/com.rabbitmq/amqp-client/2.5.1/com/rabbitmq/client/impl/AMQConnection.java#AMQConnection.defaultClientProperties%28%29 So, kinda stuck here. Thanks Vinay On Tue, Mar 25, 2014 at 6:18 AM, Naresh naresh.bha...@gmail.com wrote: Hey Vinay, Did you specify the kryo serialization parameter in all the yaml files on different supervisors? An easier way is to specify this in the code as Srinath had suggested. Regards Naresh On Mar 25, 2014, at 12:12 AM, Samit Sasan sasansa...@gmail.com wrote: Hey Vinay, Sorry to hear that ... before we dwell into workarounds can you describe what object exactly are you passing in the tuple and can you extract your needed info into ur custom DS object and use that instead. -Samit On Tue, Mar 25, 2014 at 2:38 AM, Vinay Pothnis vinay.poth...@gmail.comwrote: com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString
Re: DI with Storm
Yeah, that's basically what I do. My point about being hackish is that if your module classes need configuration, you need to have that same configuration available from every call site of InjectorProvider#get() On Tue, Mar 25, 2014 at 9:55 PM, Patricio Echagüe patric...@gmail.comwrote: It's fine. You can synchronize with a static monitor the creation on the injector per worker. That's how I do it. public class InjectorProvider { private static Injector injector; public static Injector get() { if (injector == null) { synchronized (InjectorProvider.class) { if (injector == null) { injector = Guice.createInjector( new DAOModule(), new S3Module(); } } } return injector; } On Tue, Mar 25, 2014 at 6:24 PM, Adam Lewis m...@adamlewis.com wrote: Doesn't Storm 0.9 have a prepare for the worker? No, I don't think it does, but please point this out if I'm mistaken. I found the right JIRA issue though: https://issues.apache.org/jira/browse/STORM-126 Seems like the patch was well along but hasn't seen any recent activity.
Re: Storm/Trident as a distributed query engine?
When evaluating storm, definitely take a closer look at the DRPC mechanism in Trident for your use case. To my knowledge there is no current support for data locality like you describe with Cassandra, although there was a discussion on the mailing list a couple of months ago around someone looking to do a school project and one of the popular suggestions was to implement a state-location-aware partitioning. As far as other projects, have you taken a look at druid (http://druid.io/)? It would represent an alternative to Cassandra in your current setup but is more suited to the types of multi-dimensional querying and aggregates you describe and can ingest sensor data in batch or realtime. On Fri, Mar 21, 2014 at 10:21 AM, Simon Chemouil schemo...@gmail.comwrote: Hi, I am very new to Storm and trying to evaluate whether it fits my needs or not. I work on a project where we compute reasonably simple queries (sum, average, mean, percentile...) on large amount of very simple structured data (counters of many sensors with a value every 5 minutes). We are currently reaching the limit of our architecture (still MySQL based) and moving to Cassandra for our data store. We want to also parallelize the queries to run on a cluster to be able to answer the queries as fast as possible. While Cassandra seems to be a good fit for our needs of data storage (quick access, good write performance, fault-tolerant, ...), we're still looking for a component which could help us distribute our queries over a cluster. I've been looking at Storm/Trident and running some tests/examples for the last few days, and while I do believe we could make it happen, I would like to have the opinion of an experienced Storm user/dev to know if it truly makes sense for our problem, since we don't really have a continuous stream of data. First, in the short-term, we want to run simple queries over the Cassandra store. I envision things this way: query -- [ QUEUE ] -- [ distribute/process queries ] -- answer/output Queries are a discrete events, we don't want to keep state between them. We have some very simple queries and some more complex that require going through a lot of data (tens of millions of 'cells'), so we want to be able to *cut down* big queries in smaller pieces (most probably divide them by time range) both to reply faster and to prevent big queries from taking all resources. We would like to send the results of the query straight into another Cassandra CF and to an endpoint in our system. Finally, because of some non-technical business requirements (i.e, our clients' IT team reluctance to give us more servers ;)) we will have to host the 'workers' on the same servers as Cassandra nodes. I thought it could make sense to use Cassandra's token aware policy to always try to make workers fetch data locally. This would allow us to piggyback on Cassandra's load balancing since we use random partitioning that normally evenly distributes the rows across our cluster, and a row is small enough to compute on without breaking the task down further. Is it possible with Storm/Trident to direct the way computations are distributed (i.e, to which worker 'tasks' are sent) or is it going against its design? All-in-all, how good a fit is Storm for this use case? What about Trident? If the project isn't a good fit, do you know of other open-source projects that address this need? The current alternative I envision is designing a homebrew solution using Akka. Any opinion is greatly appreciated! Thanks a lot for your help! Simon Chemouil
Re: User Interface
I really like this; I've been wondering about the easiest way to visualize the storm topology graph; especially since I'm a heavy trident user so my topologies are generated...with really long names for the bolts. What you're showing here solves that and adds operational metrics to boot, which is great. Have you considered building the backend on top of the existing storm UI app? You might be able to leverage existing code to talk to nimbus. It would be awesome if storm provided something like what you've demoed out of the box on the same server infrastructure. On Thu, Mar 20, 2014 at 8:28 AM, Middleton, Jonathan jonathan.middle...@citi.com wrote: Very pretty. Hope you carry this forward. You might consider sampling from Nimbus using Thrift. *From:* Klausen Schaefersinho [mailto:klaus.schaef...@gmail.com] *Sent:* Thursday, March 20, 2014 8:21 AM *To:* user@storm.incubator.apache.org *Subject:* Re: User Interface Hi, it is just a prototype yet with no backend... So i still have to decide what backend to use... Cheers, Klaus On Thu, Mar 20, 2014 at 1:15 PM, Jean-Sebastien Vachon jean-sebastien.vac...@wantedanalytics.com wrote: Looks very promising. I like the fact that you see the flow of tuples between elements. Any idea when you will release this so that we can try it out with our topology? *From:* Klausen Schaefersinho [mailto:klaus.schaef...@gmail.com] *Sent:* March-18-14 4:41 PM *To:* user@storm.incubator.apache.org *Subject:* User Interface Hi, I have been prototyping an alternative UI for monitoring a storm cluster. The main driver was, that I would like to have a more condensed view of what is going on in my cluster and that I would also like to monitor some use case specific metrics, e.g. the training accuracy of a classifier or so. I have put a dummy online: http://vommond.de/streamui/streamui.html http://vommond.de/streamui/streamui-dark.html The width and the color of the lines between the boxes correlate to the number of events flowing between the spouts and bolts. The widgets in the boxes show some other (dummy) metrics that one might of instance monitor. You can customize the UI quite a lot, e.g. hide widgets or change their type (but all changes are lost in the dummy). Before I move on, I have some questions: 1) For now I have not integrated the UI with any backend. So before I start, I would be happy if anybody could give me some advice what kind of monitoring backend to use. Ideally the backend would be: - Easy to integrate with storm and - Could also read the normal storm metrics. - Have custom metrics variables to monitor. Also I do not have too much time, so any kind of solution that has a REST service and would not require any server side hacking would be a big plus. 2) I saw that quite some people are using Ganglia or Nagius, but I have nether used it. Can any body share their experience? Off course all comments with regards to the UI are welcome. I am planing to release the source code in some days (after some cleaning up), so any body is welcome to pick it up. Cheers, Klaus -- Aucun virus trouvé dans ce message. Analyse effectuée par AVG - www.avg.fr Version: 2014.0.4336 / Base de données virale: 3722/7203 - Date: 16/03/2014
Re: Nimbus Fails After Uploading Topology - Reading Too Large of Frame Size
Robert-- I just saw this thread now. I had the same issue recently myself. Taylor posted the full conversation we had back to the list, but you may not have noticed it since it was several messages squashed into one. In any case, I think you can fix this by increasing the max allowable buffer on Nimbus in storm.yaml using the config option nimbus.thrift.max_buffer_size (it is specified in bytes). The default is something like 1MB, I upped mine to 32MB to accommodate a 6MB topology. It looks like your topology is only about 2MB so you might not need to go that extreme. I think the only downside of increasing this is that you lose the protection it was designed to provide: namely a malicious/buggy client connecting to Nimbus can exhaust Nimbus memory by sending really big thrift messages. On Thu, Mar 13, 2014 at 11:44 AM, Robert Lee lee.robert...@gmail.comwrote: Downgrading to storm-0.9.0.1 remedied the issue for me. I no longer receive any exceptions. I'm fairly certain nothing is listening to the thrift port besides the storm jar command I issue. The only thing I can think of is I am connected to the nimbus node to view the logs in real time. When I have finished other things on my plate, I'll revisit this issue to try to find a cause. On Wed, Mar 12, 2014 at 2:56 PM, P. Taylor Goetz ptgo...@gmail.comwrote: Hi Robert, let me know if you experience the same issue with 0.9.0.1. One thing that caught my eye was this: 2014-03-11 22:38:32 o.a.t.s.TNonblockingServer [ERROR] Read a frame size of 2064605, which is bigger than the maximum allowable buffer size for ALL connections. In 0.9.1 that can indicate that something other than the thrift client (like ssh, telnet, or a security scanner) is accessing nimbus' thrift port. More details here (https://github.com/apache/incubator-storm/pull/3). The fact that you can run a different topology class from the same jar file suggests that it is not a problem with the upload process. Is it possible that something in your topology (like a cassandra client, etc.) is misconfigured to point to nimbus' host and port? - Taylor On Mar 11, 2014, at 7:48 PM, Robert Lee lee.robert...@gmail.com wrote: I will downgrade to storm-0.9.0.1 and see if the error persists in that version as well. On Tue, Mar 11, 2014 at 7:47 PM, Robert Lee lee.robert...@gmail.comwrote: Yes -- more details: Storm version: 0.9.1-incubating installed using a variant of your storm-vagrant deployment (https://github.com/ptgoetz/storm-vagrant). Cluster setup: two supervisor nodes with 1024m, nimbus with 1024m, zookeeper (3.3.5) 512mb node, and a kafka (0.8.0) 512mb node. Persisting to a local cassandra cluster. Here's an example topology I'm running. This topology works both in local and distributed mode. A variant of this topology (more persisting and more complicated functions on the kafka stream) works in local mode but gives the thrift error reported above when submitting. public class SentenceAggregationTopology { private final BrokerHosts brokerHosts; public SentenceAggregationTopology(String kafkaZookeeper) { brokerHosts = new ZkHosts(kafkaZookeeper); } public StormTopology buildTopology() { return buildTopology(null); } public StormTopology buildTopology(LocalDRPC drpc) { TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, storm-sentence, storm); kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(kafkaConfig); KafkaSentenceMapper mapper = new KafkaSentenceMapper(playlist, testtable, word, count); TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream(kafka, kafkaSpout).shuffle(). each(new Fields(str), new WordSplit(), new Fields(word)). groupBy(new Fields(word)). persistentAggregate( CassandraBackingMap.nonTransactional(mapper), new Count(), new Fields(aggregates_words)) .parallelismHint(2); topology.newDRPCStream(words, drpc) .each(new Fields(args), new Split(), new Fields(word)) .groupBy(new Fields(word)) .stateQuery(wordCounts, new Fields(word), new MapGet(), new Fields(count)) .each(new Fields(count), new FilterNull()) .aggregate(new Fields(count), new Sum(), new Fields(sum)); return topology.build(); } public static void main(String[] args) throws Exception { final int TIME_INTERVAL_IN_MILLIS = 1000; String kafkaZk = args[0]; SentenceAggregationTopology sentenceAggregationTopology = new SentenceAggregationTopology(kafkaZk); Config config = new Config(); config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS,
nimbus.thrift.max_buffer_size
Upon upgrading from 0.9.0.1 to 0.9.1-incubating, I'm exceeding the new thrift max buffer size (nicely logged on the server side, although the client just gets a broken pipe stack trace form thrift) with an approx 6 MB message(!). Increasing the configured limit solves the problem, but I would have thought the 1MB default should be enough. Does the storm submitter encode the entire topology as a single thrift message? I'm really surprised that the message is coming out so large, my topology isn't exactly small, but it only has about 20 bolts...does anyone have any suggestions on how to determine why the message is so large? Is this within the realm of what others have seen or am I doing something wrong? Thanks, Adam
Re: nimbus.thrift.max_buffer_size
It isn't the jar file, but something about the topology itself; I have a submitter program that submits four topologies all from the same jar. Upon submitting the first topology, the jar is uploaded and topology starts, then the submitter submits two more topologies whilst reusing the uploaded jar. The broken pipe occurs when trying to submit the fourth (large) topology. That is why I was assuming the large message was actually the encoded topology itself. This is reproducible and the errors are as follows: nimbus.log: 2014-03-18 18:16:39 o.a.t.s.TNonblockingServer [ERROR] Read a frame size of 6644632, which is bigger than the maximum allowable buffer size for ALL connections. storm jar console: 2321 [main] INFO backtype.storm.StormSubmitter - Uploading topology jar /Users/adam/git/impl/impl-storm/target/impl-storm-0.0.1-SNAPSHOT.jar to assigned location: /mnt/storm/nimbus/inbox/stormjar-04acf27a-a6d1-4a9e-9231-9f4f5f30fd03.jar 97762 [main] INFO backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: /mnt/storm/nimbus/inbox/stormjar-04acf27a-a6d1-4a9e-9231-9f4f5f30fd03.jar 97762 [main] INFO backtype.storm.StormSubmitter - Submitting topology global__topo_forecastRuntime in distributed mode with conf {topology.fall.back.on.java.serialization:false,topology.workers:2,drpc.servers:[10.118.57.229],topology.debug:false,topology.kryo.register:[{org.joda.time.DateTime:de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer},{org.joda.time.Interval:de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer},com.mycompany.data.Simulated,com.mycompany.data.SomeClass1,com.mycompany.ml.SomeClass2,com.mycompany.model.SomeClass3,com.mycompany.model.SomeClass4,{com.mycompany.ml.SomeClass4:com.esotericsoftware.kryo.serializers.DefaultSerializers$EnumSerializer},{java.math.BigDecimal:com.esotericsoftware.kryo.serializers.DefaultSerializers$BigDecimalSerializer},{java.sql.Date:de.javakaffee.kryoserializers.DateSerializer},{com.tdunning.math.stats.TDigest:com.mycompany.trident.tdigest.TDigestSerializer},{java.lang.Class:com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer},{java.util.UUID:de.javakaffee.kryoserializers.UUIDSerializer},{com.google.common.collect.RegularImmutableList:backtype.storm.serialization.SerializableSerializer}],topology.max.spout.pending:16,topology.message.timeout.secs:900,drpc.request.timeout.secs:45} java.lang.RuntimeException: org.apache.thrift7.transport.TTransportException: java.net.SocketException: Broken pipe at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:112) at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:58) at com.mycompany.runtime.DeployStormTopologies.main(DeployStormTopologies.java:92) Caused by: org.apache.thrift7.transport.TTransportException: java.net.SocketException: Broken pipe at org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:147) at org.apache.thrift7.transport.TFramedTransport.flush(TFramedTransport.java:157) at org.apache.thrift7.TServiceClient.sendBase(TServiceClient.java:65) at backtype.storm.generated.Nimbus$Client.send_submitTopology(Nimbus.java:156) at backtype.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:145) at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:98) ... 2 more Caused by: java.net.SocketException: Broken pipe at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) at java.net.SocketOutputStream.write(SocketOutputStream.java:153) at org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:145) ... 7 more On Tue, Mar 18, 2014 at 4:12 PM, P. Taylor Goetz ptgo...@gmail.com wrote: It uploads the file in small (1024*5 bytes) chunks. Does this happen every time (i.e. reproducible)? What is the size of your topology jar? Can you post the server side message (I want to see the length it output). - Taylor On Mar 18, 2014, at 3:40 PM, Adam Lewis superca...@gmail.com wrote: Upon upgrading from 0.9.0.1 to 0.9.1-incubating, I'm exceeding the new thrift max buffer size (nicely logged on the server side, although the client just gets a broken pipe stack trace form thrift) with an approx 6 MB message(!). Increasing the configured limit solves the problem, but I would have thought the 1MB default should be enough. Does the storm submitter encode the entire topology as a single thrift message? I'm really surprised that the message is coming out so large, my topology isn't exactly small, but it only has about 20 bolts...does anyone have any suggestions on how to determine why the message is so large? Is this within the realm of what others have seen or am I doing something wrong? Thanks, Adam
Re: Where does system.out gets written?
import org.slf4j.Logger; import org.slf4j.LoggerFactory; ... public static Logger LOG = LoggerFactory.getLogger(MyClass.class); Check out the slf4j API for details On Thu, Mar 13, 2014 at 9:57 AM, Manthosh Kumar T manth...@gmail.comwrote: HI, Thanks. How should I initialize the logger object? On 13 March 2014 19:15, James Xu xumingmi...@gmail.com wrote: Just use logger, it will appear in $STORM_HOME/logs/worker-xxx.log On 2014年3月13日, at 下午9:26, Manthosh Kumar T manth...@gmail.com wrote: Hi All, I'm new to storm. My topology runs in Local mode without any error. But when I try to submit it to the cluster, it doesn't work. I don't know where to check for errors. To debug I had added some System.out.println() statements. Where does these get printed when running in cluster?. If it's not printed anywhere how can I use storm's logger to log into supervisor.log or any where else for that matter? -- Cheers, Manthosh Kumar. T -- Cheers, Manthosh Kumar. T
Re: Storm Message Size
Hi Klaus, I've been dealing with similar use cases. I do a couple of things (which may not be a final solution, but it is interesting to discuss alternate approaches): I have passed trained models in the 200MB range through storm, but I try to avoid it. The model gets dropped into persistence and then only ID to the model is passed through the topology. So my training bolt passes the whole model blob to the persistence bolt and that's it...in the future I may even remove that step so that the model blob never gets transferred by storm. Also, I use separate topologies for training, and those tend to have timeouts much higher because the train aggregator can take quite a while. Traditionally this would probably happen in Hadoop or some other batch system, but I'm too busy to do the setup and storm is handling it fine anyway. I don't have to do any polling because I have model selection running as a logically different step, i.e. tuple shows up for prediction, run a selection step which finds the model ID for scoring that tuple, then it flows on to an actual scoring bolt which retrieves the model based on ID and applies it to the tuple. If the creation of a new model leads you to re-score old tuples, you could use the model write to trigger those tuples to be replayed from some source of state such that they will pickup the new model ID and proceed as normal. Best, Adam On Wed, Feb 26, 2014 at 7:54 AM, Klausen Schaefersinho klaus.schaef...@gmail.com wrote: THX, the idea is good, I will keep that in mind. The only drawback is that it relies on polling, what I do not like to much in the PredictionBolt. Off couse I could also pass S3 or File refernces around in the messages, to trigger an update. But for the sake of simplicity I was thinking of keeping everything in storm and do not rely if possible on other system. Cheers, Klaus On Wed, Feb 26, 2014 at 12:22 PM, Enno Shioji eshi...@gmail.com wrote: I can't comment on how large tuples fare, but about the synchronization, would this not make more sense? InputSpout - AggregationBolt - PredictionBolt - OutputBolt | | \/ | Agg. State| /\ | |V TrainingBolt - Model State I.e. AggregationBolt writes to AggregationState, which is polled by TrainingBolt, which writes to ModelState. ModelState is then polled by PredictionBolt. This way, you can get rid of the large tuples as well and use instead something like S3 for these large states. On Wed, Feb 26, 2014 at 11:02 AM, Klausen Schaefersinho klaus.schaef...@gmail.com wrote: Hi, I have a topology which process events and aggregates them in some form and performs some prediction based on a machine learning (ML) model. Every x events the one of the bolt involved in the normal processing emit an trainModel event, which is routed to a bolt which is just dedicated to the training. One the training is done, the new model should be send back to the prediction bolt. The topology looks like: InputSpout - AggregationBolt - PredictionBolt - OutputBolt | /\ \/ | TrainingBolt -+ The model can get quite large ( 100 mb) so I am not sure how this would impact the performance of my cluster. Does anybody has experiences with transmitting large messages? Also the training might take a while, so the aggregation bolt should not trigger the training bolt if he is busy. Is there an established patterns how to archive this kind of synchronization? I could have some streams to send states, but then I would mix data stream with control stream, what I really would like to avoid. An alternative would be use ZooKeeper and perform the synchronization there. Lats but not least I could also make make the aggregation bolt into a data base and have the training bolt periodically wake up and read the data base. Does anybody has experience with such a setup? Kind Regards, Klaus
Re: Unexpected behavior on message resend
In my case it was the state objects created as part of trident aggregation. Here is the final message in the thread (i.e. read bottom up): http://mail-archives.apache.org/mod_mbox/storm-user/201312.mbox/%3CCAAYLz+p4YhF+i3LAkFoyU3nvngZXOusZWXj=0+bynrx0+tg...@mail.gmail.com%3E On Wed, Feb 26, 2014 at 10:35 AM, Harald Kirsch harald.kir...@raytion.comwrote: Hi Adam, ok, good to know. I resolved to create the tuple from scratch in case it needs to be resend. I don't where else in-place modification could hurt in a linear process. Am I missing something? Thanks, Harald. On 26.02.2014 15:48, Adam Lewis wrote: I've already gotten slapped around on the list for doing in place modifications, so let me pass it on :) Don't modify tuple objects in place. You shouldn't rely on serialization happening or not happening for correctness. On Mon, Feb 24, 2014 at 11:18 AM, Harald Kirsch harald.kir...@raytion.com mailto:harald.kir...@raytion.com wrote: Hi all, my TOPOLOGY_MESSAGE_TIMEOUT_SECS was slightly to low. I got a fail for a tuple and the spout just resend it. One bolt normalizes a date in place in a field of the tuple. After the spout resend the tuple, I got errors from the date parser because the date was already normalized. Since I currently have only one node, I know of course what happens. The tuple was just the very same object that was already partially processed when the timeout hit. In a distributed setup I envisage the bolt to be on another machine with a serialized copy of the spout's tuple such that changes to the tuple are not reflected in the original. Would that be true? I reckon from this that all processing in bolts needs to be idempotent if I want to be able to replay failed tuples. Is that true or am I doing something wrong? Harald. -- Harald Kirsch Raytion GmbH Kaiser-Friedrich-Ring 74 40547 Duesseldorf Fon +49-211-550266-0 tel:%2B49-211-550266-0 Fax +49-211-550266-19 tel:%2B49-211-550266-19 http://www.raytion.com -- Harald Kirsch Raytion GmbH Kaiser-Friedrich-Ring 74 40547 Duesseldorf Fon +49-211-550266-0 Fax +49-211-550266-19 http://www.raytion.com
Re: Alternative classpath settings apart from fatjar
Is it possible for you to use maven to build your fat jar? Check out the maven shade plugin, which has handling for things like META-INF, and some special cases (like services manifests) such as merging those from multiple jar files. http://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html The (not recommended) alternative is to place your JARs in the supervisor's lib directory. On Wed, Feb 19, 2014 at 9:15 AM, Harald Kirsch harald.kir...@raytion.comwrote: I am trying to use tika-parsers-1.4.jar in a self-build fat jar, meaning I just unjar all jars into one directory and jar the result into one fat jar. The problem is, that this produces an unreliable META-INF, so just before running jar, I delete the META-INF directory. This seemed to work, except that tika-parsers does not work anymore. No error, no exception, no log, it just returns nothing. And I can boil this down to the META-INF. Now, there might be a way to hand-craft a fat jar that works. However I wonder if there is a better way to add additional jars to a topology? Harald.
Re: Alternative classpath settings apart from fatjar
You can put the jars in $STORM_HOME/lib/ on each of the supervisor nodes. There are a couple of reasons why this is not recommended: 1) you need to be careful not to override any built-in storm libraries since this would be effectively changing storm and will have unpredictable results 2) you are creating a management issue for yourself...with just one topology it is probably fine, but consider that as you add and remove supervisor nodes to your cluster you need to ensure that the lib directory stays synced with your dependencies or else you'll get spurious class loading issues...it is pretty much equivalent to the reasoning behind _not_ putting application dependency jars in your JVM's lib directory. To put it another way, adding a dependency to your topology goes from being a dev task to an ops task. On Wed, Feb 19, 2014 at 9:45 AM, Harald Kirsch harald.kir...@raytion.comwrote: Hi Adam, thanks for the hint. Maven is not part of our build environment. We are using an ant/ivy combination. Which one would be the supervisor's lib directory? Why is it not recommended to be used? We intend to run only one dedicated topology anyway, so interference between those is not an issue. Harald On 19.02.2014 15:25, Adam Lewis wrote: Is it possible for you to use maven to build your fat jar? Check out the maven shade plugin, which has handling for things like META-INF, and some special cases (like services manifests) such as merging those from multiple jar files. http://maven.apache.org/plugins/maven-shade-plugin/examples/resource- transformers.html The (not recommended) alternative is to place your JARs in the supervisor's lib directory. On Wed, Feb 19, 2014 at 9:15 AM, Harald Kirsch harald.kir...@raytion.com mailto:harald.kir...@raytion.com wrote: I am trying to use tika-parsers-1.4.jar in a self-build fat jar, meaning I just unjar all jars into one directory and jar the result into one fat jar. The problem is, that this produces an unreliable META-INF, so just before running jar, I delete the META-INF directory. This seemed to work, except that tika-parsers does not work anymore. No error, no exception, no log, it just returns nothing. And I can boil this down to the META-INF. Now, there might be a way to hand-craft a fat jar that works. However I wonder if there is a better way to add additional jars to a topology? Harald. -- Harald Kirsch Raytion GmbH Kaiser-Friedrich-Ring 74 40547 Duesseldorf Fon +49-211-550266-0 Fax +49-211-550266-19 http://www.raytion.com
Re: Streaming DRPC?
It might help to understand your use case a little bit better. Are the requests for a finite amount of data that you just happen to want to stream out, or are they more akin to a subscription for an unbounded amount of data? Also, does the request contain a specification of what needs to be computed? That is, do you imagine your bolts (or various trident components) each keeping a list of active requests and modifying their behaviour (or data source in the case of spouts) as the requests come in? You might want to check out the storm-signals project for ideas of how you could orchestrate all the components to do the right thing in face of your requests coming in: https://github.com/ptgoetz/storm-signals On Sat, Feb 15, 2014 at 7:30 PM, Carl Lerche m...@carllerche.com wrote: Hey Adam, Actually, that's quite a good idea. I'm glad you responded, this is a better approach than what I was going to attempt (aka, mega hacks). I understand how your approach could be done with non-DRPC trident. The one drawback that I can think of would be that the request message would need to be part of a batch, so if matches only happen every 15~20 seconds, it would take a while for the response to start arriving. Perhaps you could elaborate more on how one could use vanilla storm could be used to solve this? Perhaps it could help with the batch delay problem. Cheers, Carl On Sat, Feb 15, 2014 at 4:42 AM, Adam Lewis m...@adamlewis.com wrote: Hi Carl, DRPC is inherently synchronous in the way it works so if I understand what you are trying to do correctly then I suggest you stick to non-DRPC trident or even vanilla storm. You can setup some messaging queues to handle the input (request) and output (streaming result). Include a field in the input tuple that can be used to correlate any downstream results (where that ID is client generated), then create a client which handles publishing to the input queue and subscribing to the output queue (filtering on messages which have the input correlation id). You can partition your storm topology (and the input and output queues) on that correlation ID to achieve some load balancing. Finally, if your output streams are infinite, you need some mechanism to stop them... As a side benefit, you can overcome some limitations of DRPC such as no control over serialization and even have multiple trident streams all writing to your output queue (whereas DRPC doesn't support that sort of branching in the topology). Adam On Fri, Feb 14, 2014 at 7:06 PM, Carl Lerche m...@carllerche.com wrote: Hello, I noticed that the DRPC client only allows a single response (and for that response to be JSON encoded). I was hoping to implement some sort of DRPC stream where I a constant stream of data based on a given query. Also, is there a way to serialize the response using Kryo instead of JSON? The specific format of my data is not very JSON friendly. Cheers, Carl
Re: Streaming DRPC?
Hi Carl, DRPC is inherently synchronous in the way it works so if I understand what you are trying to do correctly then I suggest you stick to non-DRPC trident or even vanilla storm. You can setup some messaging queues to handle the input (request) and output (streaming result). Include a field in the input tuple that can be used to correlate any downstream results (where that ID is client generated), then create a client which handles publishing to the input queue and subscribing to the output queue (filtering on messages which have the input correlation id). You can partition your storm topology (and the input and output queues) on that correlation ID to achieve some load balancing. Finally, if your output streams are infinite, you need some mechanism to stop them... As a side benefit, you can overcome some limitations of DRPC such as no control over serialization and even have multiple trident streams all writing to your output queue (whereas DRPC doesn't support that sort of branching in the topology). Adam On Fri, Feb 14, 2014 at 7:06 PM, Carl Lerche m...@carllerche.com wrote: Hello, I noticed that the DRPC client only allows a single response (and for that response to be JSON encoded). I was hoping to implement some sort of DRPC stream where I a constant stream of data based on a given query. Also, is there a way to serialize the response using Kryo instead of JSON? The specific format of my data is not very JSON friendly. Cheers, Carl
Re: storm jar slowness
thanks for the pointer...it looks like it is using default 15K chunk sizes, I'll see if tweaking that has any effect. On Fri, Feb 14, 2014 at 5:48 PM, Jon Logan jmlo...@buffalo.edu wrote: The code that uploads it is at backtype.storm.StormSubmitter#submitJar(java.util.Map, java.lang.String). It looks like it's just a simple upload over Thrift...SCP is specifically designed for file uploads, and is probably better-tuned for large transfers, through compression, or whatever other means. You could just upload the Jar using SCP, and then submit it from the server itself. I think many (most?) use cases are submitted on a local network, where upload speed is not a concern. On Fri, Feb 14, 2014 at 5:18 PM, Adam Lewis superca...@gmail.com wrote: I've seen this issue raised on the list in the past, but with no clear suggestions: storm jar is very slow at sending the jar file, averaging about 250 KB/s between my system and EC2...is there some reason for this in the way storm sends the jar? scp goes about 2.5 MB/s, the same 10x difference I've seen reported previously. I'm using storm 0.9.0.1 Any ideas? Thanks, Adam
Re: storm jar slowness
This is promising, 150KB chunk size is giving me over 1 MB/sec; 300KB chunk size up to 2 MB/sec although spikier performance. My experimental setup leaves a lot to be desired here, but it seems pretty conclusive that 15KB is not optimal (at least over-the-internet). As far as I can tell the only downside of larger chunks is that thrift keeps an entire chunk in memory. Something in the 100 to 200 KB range seems reasonable. Any thoughts? Perhaps I should open a JIRA ticket for this. As for use cases: I'm sure in production everything will be on the same network. For me, it is the transition from using local cluster to deploying to a real cluster (which happens to run on AWS across the Internet from my dev machine) and dealing with classpath, serialization and other fun issues that don't crop up in local mode...and my fat jar hasn't been put on a diet yet so it is large...all of which adds up to long code/build/test cycle times. But, increasing chunk size is really helping, and the 15KB default seems arbitrary. If this seems reasonable I'll file a JIRA and a PR. On Fri, Feb 14, 2014 at 7:57 PM, Adam Lewis gm...@adamlewis.com wrote: thanks for the pointer...it looks like it is using default 15K chunk sizes, I'll see if tweaking that has any effect. On Fri, Feb 14, 2014 at 5:48 PM, Jon Logan jmlo...@buffalo.edu wrote: The code that uploads it is at backtype.storm.StormSubmitter#submitJar(java.util.Map, java.lang.String). It looks like it's just a simple upload over Thrift...SCP is specifically designed for file uploads, and is probably better-tuned for large transfers, through compression, or whatever other means. You could just upload the Jar using SCP, and then submit it from the server itself. I think many (most?) use cases are submitted on a local network, where upload speed is not a concern. On Fri, Feb 14, 2014 at 5:18 PM, Adam Lewis superca...@gmail.com wrote: I've seen this issue raised on the list in the past, but with no clear suggestions: storm jar is very slow at sending the jar file, averaging about 250 KB/s between my system and EC2...is there some reason for this in the way storm sends the jar? scp goes about 2.5 MB/s, the same 10x difference I've seen reported previously. I'm using storm 0.9.0.1 Any ideas? Thanks, Adam
Re: storm jar slowness
Great. STORM-241 has been filed. https://issues.apache.org/jira/browse/STORM-241 On Fri, Feb 14, 2014 at 8:42 PM, Nathan Marz nat...@nathanmarz.com wrote: Yes, it is arbitrary. Opening an issue for this is a good idea. On Fri, Feb 14, 2014 at 5:24 PM, Adam Lewis gm...@adamlewis.com wrote: This is promising, 150KB chunk size is giving me over 1 MB/sec; 300KB chunk size up to 2 MB/sec although spikier performance. My experimental setup leaves a lot to be desired here, but it seems pretty conclusive that 15KB is not optimal (at least over-the-internet). As far as I can tell the only downside of larger chunks is that thrift keeps an entire chunk in memory. Something in the 100 to 200 KB range seems reasonable. Any thoughts? Perhaps I should open a JIRA ticket for this. As for use cases: I'm sure in production everything will be on the same network. For me, it is the transition from using local cluster to deploying to a real cluster (which happens to run on AWS across the Internet from my dev machine) and dealing with classpath, serialization and other fun issues that don't crop up in local mode...and my fat jar hasn't been put on a diet yet so it is large...all of which adds up to long code/build/test cycle times. But, increasing chunk size is really helping, and the 15KB default seems arbitrary. If this seems reasonable I'll file a JIRA and a PR. On Fri, Feb 14, 2014 at 7:57 PM, Adam Lewis gm...@adamlewis.com wrote: thanks for the pointer...it looks like it is using default 15K chunk sizes, I'll see if tweaking that has any effect. On Fri, Feb 14, 2014 at 5:48 PM, Jon Logan jmlo...@buffalo.edu wrote: The code that uploads it is at backtype.storm.StormSubmitter#submitJar(java.util.Map, java.lang.String). It looks like it's just a simple upload over Thrift...SCP is specifically designed for file uploads, and is probably better-tuned for large transfers, through compression, or whatever other means. You could just upload the Jar using SCP, and then submit it from the server itself. I think many (most?) use cases are submitted on a local network, where upload speed is not a concern. On Fri, Feb 14, 2014 at 5:18 PM, Adam Lewis superca...@gmail.comwrote: I've seen this issue raised on the list in the past, but with no clear suggestions: storm jar is very slow at sending the jar file, averaging about 250 KB/s between my system and EC2...is there some reason for this in the way storm sends the jar? scp goes about 2.5 MB/s, the same 10x difference I've seen reported previously. I'm using storm 0.9.0.1 Any ideas? Thanks, Adam -- Twitter: @nathanmarz http://nathanmarz.com
Re: http-client version conflict
My $0.02 on this subject: Without going down the path of class loader or OSGi mania and becoming a full container, I'd definitely be in favor of storm relocating its own dependencies. In this way edge cases around things like reflection can be handled once within storm rather than burdening every topology builder with those details. Part of the problem seems to be that storm makes extensive use (directly or transitively) of a lot of go-to utility libraries like guava, thrift, jodatime, json-simple, snakeyaml, commons-io, etc... I'm sure that leveraging these libraries allowed storm's development to proceed rapidly, but from a maturity perspective, it is problematic to impose these version choices on users. And while I might want Storm to, say, try to track the latest Guava version, that same policy could be very problematic for others. If storm can relocate even some of its own dependencies, I think that would be a great help to me at least. Longer term, I wonder how much of some of these libraries are really being used. For example, is clj-time (and by extension joda-time) really needed? Or just a small fraction of the functionality in that library? I can probably pitch in some of the effort required to do this, if this is the direction people want to go in. On Thu, Feb 6, 2014 at 8:44 PM, P. Taylor Goetz ptgo...@gmail.com wrote: I'm glad the shader plugin worked for you. Updating dependencies can be tricky as it can easily introduce regressions. Ultimately we need to figure out the best solution to avoiding conflicts between user code (i.e. dependencies in topology jar files) and Storm's libraries. The classloader approach has been attempted, but IMO Storm's use of serialization complicates things significantly. Package relocation seems to be a relatively lightweight solution. If that's a direction we pursue, then it introduces the question of whether Storm should relocate its dependencies, or if that should be left up to the user (topology developer). Elastic Search has gone down the path of relocating some of their dependencies [1] (not necessarily an endorsement, just an observation). I've CC'd dev@ since this is all related to the infamous issue #115, which is now STORM-129 [2]. - Taylor [1] https://github.com/elasticsearch/elasticsearch/blob/master/pom.xml#L474 [2] https://issues.apache.org/jira/browse/STORM-129 On Feb 6, 2014, at 7:25 PM, Vinay Pothnis vinay.poth...@gmail.com wrote: Thank you all for replies! The shader-plugin solution seems to work for us. I wonder if we can create a JIRA ticket for storm to upgrade the http-client library as part of their next release. -Vinay On Thu, Feb 6, 2014 at 2:38 PM, Michael Rose mich...@fullcontact.comwrote: We've done this with SLF4j and Guava as well without issues. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Thu, Feb 6, 2014 at 3:03 PM, Mark Greene m...@evertrue.com wrote: We had this problem as well. We modified our chef cookbook to just replace the older version with the newer one and storm didn't complain or have any other issues as a result. On Wed, Feb 5, 2014 at 10:31 AM, P. Taylor Goetz ptgo...@gmail.comwrote: Your best bet is probably to use the shade plugin to relocate the http-client package so it doesn't conflict with the version storm uses. Storm does this with the libtrhift dependency in storm-core: https://github.com/apache/incubator-storm/blob/master/storm-core/pom.xml#L220 (You can ignore the clojure transformer in that config, unless you have non-AOT clojure code that uses the http-client library). More information on using the shade plugin to do package relocations can be found here: http://maven.apache.org/plugins/maven-shade-plugin/examples/class-relocation.html - Taylor On Feb 4, 2014, at 4:27 PM, Vinay Pothnis vinay.poth...@gmail.com wrote: Hello, I am using storm version 0.9.0.1. My application depends on apache http-client version 4.3.2 - but storm depends on http-client version 4.1.1. What is the best way to override this dependency? Thanks Vinay
Re: Not able to build wurstmeister storm-kafka-0.8-plus
I've found that Maven can drift away from MacOS in terms of which java version they use (even if you have your preferred version on the path and JAVA_HOME). Did you compare java -version and mvn -version? Something is getting built by Java 7; the latest storm-kafka-0.8-plus pom on github depends on storm 0.9.0 which was accidentally built using Java 7, check out storm 0.9.0.1 for Java6 version...or move to Java 7 if you can. On Tue, Jan 14, 2014 at 2:00 AM, Krishnanand Khambadkone kkhambadk...@yahoo.com wrote: Hi, When I do a mvn package I get this message Tests in error: testMultiplePartitionsOnDifferentHosts(storm.kafka.DynamicBrokersReaderTest): backtype/storm/utils/Utils : Unsupported major.minor version 51.0 testSwitchHostForPartition(storm.kafka.DynamicBrokersReaderTest): backtype/storm/utils/Utils testGetBrokerInfo(storm.kafka.DynamicBrokersReaderTest): backtype/storm/utils/Utils testMultiplePartitionsOnSameHost(storm.kafka.DynamicBrokersReaderTest): backtype/storm/utils/Utils My java version is, on mac os x 10.7.5 java version 1.6.0_29 Java(TM) SE Runtime Environment (build 1.6.0_29-b11-402-11M4609) Java HotSpot(TM) 64-Bit Server VM (build 20.4-b02-402, mixed mode)
Re: Not able to build wurstmeister storm-kafka-0.8-plus
I think Sasi's comment was along the right lines. If you specify the Storm version in your own POM it will take precedent over the version specified in storm-kafka-0.8-plus. Note that storm 0.9.0.1 fixes the issue of Java 7 binaries. On Tue, Jan 14, 2014 at 12:24 PM, Krishnanand Khambadkone kkhambadk...@yahoo.com wrote: Adam, thank you for the tips. Unfortunately moving to java 7 is not an option for me. Even moving to a higher version of 1.6 (1.6_51 and above) breaks hadoop and creates a mess. Is there a workaround for this, such as specifying the source and target version in pom.xml or some such thing. On Tuesday, January 14, 2014 6:31 AM, Adam Lewis m...@adamlewis.com wrote: I've found that Maven can drift away from MacOS in terms of which java version they use (even if you have your preferred version on the path and JAVA_HOME). Did you compare java -version and mvn -version? Something is getting built by Java 7; the latest storm-kafka-0.8-plus pom on github depends on storm 0.9.0 which was accidentally built using Java 7, check out storm 0.9.0.1 for Java6 version...or move to Java 7 if you can. On Tue, Jan 14, 2014 at 2:00 AM, Krishnanand Khambadkone kkhambadk...@yahoo.com wrote: Hi, When I do a mvn package I get this message Tests in error: testMultiplePartitionsOnDifferentHosts(storm.kafka.DynamicBrokersReaderTest): backtype/storm/utils/Utils : Unsupported major.minor version 51.0 testSwitchHostForPartition(storm.kafka.DynamicBrokersReaderTest): backtype/storm/utils/Utils testGetBrokerInfo(storm.kafka.DynamicBrokersReaderTest): backtype/storm/utils/Utils testMultiplePartitionsOnSameHost(storm.kafka.DynamicBrokersReaderTest): backtype/storm/utils/Utils My java version is, on mac os x 10.7.5 java version 1.6.0_29 Java(TM) SE Runtime Environment (build 1.6.0_29-b11-402-11M4609) Java HotSpot(TM) 64-Bit Server VM (build 20.4-b02-402, mixed mode)
Re: Is storm suitable for processing tuples with some dependency
Probably yes, but how you do it will depend on the specifics of your use case. For instance, you will have to decide if partial pairings are best kept in memory or in a more reliable state store. Also, whether to use trident or not. I found Svend's article on this sort of thing to be a great starting point when thinking through this sort of problem: http://svendvanderveken.wordpress.com/2013/07/30/scalable-real-time-state-update-with-storm/ Adam On Sun, Jan 12, 2014 at 10:42 AM, 李家宏 jh.li...@gmail.com wrote: Hi, all Is storm suitable for processing tuples with some dependency ? For example, a tuple contains a request, another tuple contains corresponding response, the two tuples are depended on each other. Is that possible to take the two tuples as a complete session while stream processing. Regards -- == Gvain Email: jh.li...@gmail.com