Re: Number of executors number of tasks?
Hello! Unfortunately you cannot change the number of tasks when doing rebalancing. Considering the following scenario: 1. A storm topology with maximum parallelism of 10 tasks. 2. Your storm topology is deployed on cloud 3. Your application should scale horizontally depending on the incoming volume. 4. if the volume is high you can scale more with more machines but unfortunately you they cannot be used due to the task limit (meaning that you are adding machines that will overlap the maximum parallelism supported by your topology) I found this limitation as an inconvenience for the cloud application, where theoretically, you can scale horizontally without limit. You can check my complains about the examples from the wiki page regarding the parallelism, here : http://mail-archives.apache.org/mod_mbox/storm-user/201402.mbox/%3CCAOy42CtEz6P0kRVOEff=u83bo_317ld3zrmywgkt2yjx6qr...@mail.gmail.com%3E Hope that these help. Regards, Florin On Mon, Feb 17, 2014 at 11:58 PM, Enno Shioji eshi...@gmail.com wrote: This SO answer seems to answer your question: http://stackoverflow.com/a/17454586/234901 On Mon, Feb 17, 2014 at 7:30 PM, Abhishek Bhattacharjee abhishek.bhattacharje...@gmail.com wrote: If you see practically number of workers should always be less than or equal to the number of tasks. This is not only valid for Comp sci but for any field. If we have more no. of workers than tasks available then we are just wasting our resources. That is we are not using our resources optimally. So in most cases there should at least be one task or more running on an executor. Now, consider an executor which normalizes a sentence into words, now when sentence arrives the executor normalizes that sentence after it is finished it is ready to receive another tuple. With storm there could be several executors doing the same job with different sentences simultaneously but that doesn't mean we have to have as many no. of executors as there are sentences. I hope this makes sense. On Mon, Feb 17, 2014 at 11:11 PM, Simon Cooper simon.coo...@featurespace.co.uk wrote: I've been looking at the parallelism of a storm topology. In what situations is it useful to have more than one task running on the same executor? An executor is a thread, so if there's several tasks on that executor only one task can run at a time. So why would you want more than one task on the same thread, if only one can run at once? SimonC -- *Abhishek Bhattacharjee* *Pune Institute of Computer Technology*
Storm 0.9.0.1 - Topology logs are not getting printed anywhere
Hi , I am using Storm 0.9.0.1. My topology files use slf4j logger (as recommended here - http://storm.incubator.apache.org/2013/12/08/storm090-released.html). But, none of the log statements are being printed in the nimbus/supervisor servers. I am searching for my log statements under - ${storm.home}/logs directory. Anyone knows why ? Thanks Binita
Re: Storm 0.9.0.1 - Topology logs are not getting printed anywhere
Hi, General Tip : install mlocate and then locate your log file. yum install mlocate updatedb locate file_name Regards, Kavi Kumar Koneti On Tue, Feb 18, 2014 at 3:09 PM, Binita Bharati binita.bhar...@gmail.comwrote: Hi , I am using Storm 0.9.0.1. My topology files use slf4j logger (as recommended here - http://storm.incubator.apache.org/2013/12/08/storm090-released.html). But, none of the log statements are being printed in the nimbus/supervisor servers. I am searching for my log statements under - ${storm.home}/logs directory. Anyone knows why ? Thanks Binita
Re: Storm 0.9.0.1 - Topology logs are not getting printed anywhere
There was a Netty config related Exception in my worker logs, because of which the topology wasn't executed at all. After correcting the Netty related error, I am able to see my topology log statements in the worker logs. On Tue, Feb 18, 2014 at 3:09 PM, Binita Bharati binita.bhar...@gmail.comwrote: Hi , I am using Storm 0.9.0.1. My topology files use slf4j logger (as recommended here - http://storm.incubator.apache.org/2013/12/08/storm090-released.html). But, none of the log statements are being printed in the nimbus/supervisor servers. I am searching for my log statements under - ${storm.home}/logs directory. Anyone knows why ? Thanks Binita
What is the structure of a fatjar, if any?
Hi all, I need to add dependent jars to my topology. With 'storm jar', the jar file shall contain all dependencies, but I miss information on how the jar must be structured. I only find references to maven plugins or gradle magic or leiningen whatever, none of which technologies I am using. Could some kind soul just briefly describe how the jar needs to be structured: 1) Unpack all dependent jars and repack them into one jar. Is that what fatjar refers to? 2) Just add the dependent jars as jars and they are unpacked for use with the topology? Thanks, Harald.
Re: How to specify worker.childopts for a specified topology?
Try this: conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, WORKER_OPTS); Your WORKER_OPTS should be appended to WORKER_CHILDOPTS. -- Derek On 2/18/14, 1:47, Link Wang wrote: Dear all, I want to specify some worker.childopts for my topology inner it's code, and I use this way: conf.put(Config.WORKER_CHILDOPTS, WORKER_OPTS); but I found it doesn't work. I don't use storm.yaml file to set worker.childopts, because the memory requirement of my topologies are widely different. is there some one encounter the same problem?
Re: Reading state of a streaming topology via DRPC?
Thanks for the pointer. I''ll have a close look at this. Is there also an example in 'plain' storm api? Niels On Feb 16, 2014 11:14 AM, Enno Shioji eshi...@gmail.com wrote: This uses Trident but I think it covers what you need: https://github.com/eshioji/trident-tutorial/blob/master/src/main/java/tutorial/storm/trident/Part04_BasicStateAndDRPC.java On Sat, Feb 15, 2014 at 8:17 PM, Niels Basjes ni...@basjes.nl wrote: Hi, I want to create a bolt that keeps some kind of state (aggregate) about the messages it has seen so far (i.e. web click stream). Once such a bolt has gathered information I would like to get to that information for an application I designing. So far I've come up with two way of getting at this data: 1) Persist it into something like HBase (i.e push from the bolt). 2) Use DRPC to query the bolt state directly. Regarding this last idea (using DRPC): Is this possible? If it is then where can I find an example on how to create a single topology that is essentially both streaming and drpc. Thanks. -- Best regards, Niels Basjes
回复: Millisecond-level tick generation granularity does not work
Hi Antonio James has created an JIRA issue https://issues.apache.org/jira/browse/STORM-186. And I just opened a PR https://github.com/apache/incubator-storm/pull/37 for it. -- Best Regards! 肖康(Kang Xiao,kxiao.ti...@gmail.com (mailto:kxiao.ti...@gmail.com)) Distributed Software Engineer 在 2013年12月27日 星期五,3:02,Antonio Verardi 写道: Thanks, guys! On Fri, Dec 20, 2013 at 6:06 PM, James Xu xumingmi...@gmail.com (mailto:xumingmi...@gmail.com) wrote: Yeah, it is a bug. can open an Pull Request Kang? On 2013年12月21日, at 上午2:20, 肖康(Kang Xiao) kxiao.ti...@gmail.com (mailto:kxiao.ti...@gmail.com) wrote: It seems that PR #547 is modified before it's merged to upstream. @nathan, is it a bug? https://github.com/apache/incubator-storm/commits/master/storm-core/src/clj/backtype/storm/timer.clj On Sat, Dec 21, 2013 at 2:08 AM, 肖康(Kang Xiao) kxiao.ti...@gmail.com (mailto:kxiao.ti...@gmail.com) wrote: Yes, it's truncated. The code is as follows. (* 1000 (long delay-secs))) https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/backtype/storm/timer.clj#L85 On Fri, Dec 20, 2013 at 9:11 AM, Antonio Verardi anto...@yelp.com (mailto:anto...@yelp.com) wrote: Hi, I have already asked this question in the dev mailing list, but no one answered. Maybe I'll be more lucky here. Sorry for the spam. I noticed that Storm 0.9.0 allows to set tick tuple frequencies at the millisecond level: https://github.com/nathanmarz/storm/pull/547 However, I tried to use the feature, but, even though the config seems to show the floating point value I specified (3.9 for example), the tick tuple seems to be generated every truncate(float_value) (in this case, every 3 seconds). Does anyone have experienced the same issue? Should I open an issue on Apache JIRA? -- Best Regards! 肖康(Kang Xiao,kxiao.ti...@gmail.com (mailto:kxiao.ti...@gmail.com)) Distributed Software Engineer -- Best Regards! 肖康(Kang Xiao,kxiao.ti...@gmail.com (mailto:kxiao.ti...@gmail.com)) Distributed Software Engineer
Bolts with long instantiation times -- A.K.A. Zookeeper shenanigans
Hi all, How do you get bolts that take a ludicrously long time to load (we're talking minutes here) to cooperate with Zookeeper? I may not be understanding my problem properly, but on my test cluster (**not** in local mode!) my bolt keeps getting restarted in the middle of its prepare() method -- which may take up to two minutes to return. The problem seems to be the Client session timed out, but I'm not knowledgable enough with Zookeeper to really know how to fix this. Here's a portion of logs from the supervisor affected. The STDIO messages come from a poorly-coded third party library that I have to use. 2014-01-17 23:19:28 o.a.z.ClientCnxn [INFO] Client session timed out, have not heard from server in 2747ms for sessionid 0x143a22eb4060078, closing socket connection and attempting reconnect 2014-01-17 23:19:28 b.s.d.worker [DEBUG] Doing heartbeat #backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 139768, :storm-id nlptools-test-1-139740, :executors #{[3 3] [6 6] [-1 -1]}, :port 6702} 2014-01-17 23:19:28 b.s.d.worker [DEBUG] Doing heartbeat #backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 139768, :storm-id nlptools-test-1-139740, :executors #{[3 3] [6 6] [-1 -1]}, :port 6702} 2014-01-17 23:19:28 c.n.c.f.s.ConnectionStateManager [INFO] State change: SUSPENDED 2014-01-17 23:19:28 c.n.c.f.s.ConnectionStateManager [WARN] There are no ConnectionStateListeners registered. 2014-01-17 23:19:28 b.s.cluster [WARN] Received event :disconnected::none: with disconnected Zookeeper. 2014-01-17 23:19:28 b.s.cluster [WARN] Received event :disconnected::none: with disconnected Zookeeper. 2014-01-17 23:19:28 STDIO [ERROR] done [7.2 sec]. 2014-01-17 23:19:28 STDIO [ERROR] Adding annotator lemma 2014-01-17 23:19:28 STDIO [ERROR] Adding annotator ner 2014-01-17 23:19:28 STDIO [ERROR] Loading classifier from edu/stanford/nlp/models/ner/english.all.3class.distsim.crf.ser.gz 2014-01-17 23:19:28 STDIO [ERROR] ... 2014-01-17 23:19:29 b.s.d.worker [DEBUG] Doing heartbeat #backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 139769, :storm-id nlptools-test-1-139740, :executors #{[3 3] [6 6] [-1 -1]}, :port 6702} 2014-01-17 23:19:29 b.s.d.worker [DEBUG] Doing heartbeat #backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 139769, :storm-id nlptools-test-1-139740, :executors #{[3 3] [6 6] [-1 -1]}, :port 6702} 2014-01-17 23:19:30 o.a.z.ClientCnxn [INFO] Opening socket connection to server zookeeper/192.168.50.3:2181 ^-- This is where the bolt gets restarted in its initialization. Thanks, Eddie
Re: Bolts with long instantiation times -- A.K.A. Zookeeper shenanigans
You may need to configure your cluster to give it more time to start up. Additionally, knowing how long it can take to load the Stanford NLP models, make sure you're only doing it in a single bolt instance (e.g. static initializer or double-check synch) and sharing it between all your bolt instances. supervisor.worker.start.timeout.secs 120 supervisor.worker.timeout.secs 60 I'd try tuning your worker start timeout here. Try setting it up to 300s and (again) ensuring your prepare method only initializes expensive resources once, then shares them between instances in the JVM. Michael Rose (@Xorlev https://twitter.com/xorlev) Senior Platform Engineer, FullContact http://www.fullcontact.com/ mich...@fullcontact.com On Tue, Feb 18, 2014 at 1:45 PM, Eddie Santos easan...@ualberta.ca wrote: Hi all, How do you get bolts that take a ludicrously long time to load (we're talking minutes here) to cooperate with Zookeeper? I may not be understanding my problem properly, but on my test cluster (**not** in local mode!) my bolt keeps getting restarted in the middle of its prepare() method -- which may take up to two minutes to return. The problem seems to be the Client session timed out, but I'm not knowledgable enough with Zookeeper to really know how to fix this. Here's a portion of logs from the supervisor affected. The STDIO messages come from a poorly-coded third party library that I have to use. 2014-01-17 23:19:28 o.a.z.ClientCnxn [INFO] Client session timed out, have not heard from server in 2747ms for sessionid 0x143a22eb4060078, closing socket connection and attempting reconnect 2014-01-17 23:19:28 b.s.d.worker [DEBUG] Doing heartbeat #backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 139768, :storm-id nlptools-test-1-139740, :executors #{[3 3] [6 6] [-1 -1]}, :port 6702} 2014-01-17 23:19:28 b.s.d.worker [DEBUG] Doing heartbeat #backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 139768, :storm-id nlptools-test-1-139740, :executors #{[3 3] [6 6] [-1 -1]}, :port 6702} 2014-01-17 23:19:28 c.n.c.f.s.ConnectionStateManager [INFO] State change: SUSPENDED 2014-01-17 23:19:28 c.n.c.f.s.ConnectionStateManager [WARN] There are no ConnectionStateListeners registered. 2014-01-17 23:19:28 b.s.cluster [WARN] Received event :disconnected::none: with disconnected Zookeeper. 2014-01-17 23:19:28 b.s.cluster [WARN] Received event :disconnected::none: with disconnected Zookeeper. 2014-01-17 23:19:28 STDIO [ERROR] done [7.2 sec]. 2014-01-17 23:19:28 STDIO [ERROR] Adding annotator lemma 2014-01-17 23:19:28 STDIO [ERROR] Adding annotator ner 2014-01-17 23:19:28 STDIO [ERROR] Loading classifier from edu/stanford/nlp/models/ner/english.all.3class.distsim.crf.ser.gz 2014-01-17 23:19:28 STDIO [ERROR] ... 2014-01-17 23:19:29 b.s.d.worker [DEBUG] Doing heartbeat #backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 139769, :storm-id nlptools-test-1-139740, :executors #{[3 3] [6 6] [-1 -1]}, :port 6702} 2014-01-17 23:19:29 b.s.d.worker [DEBUG] Doing heartbeat #backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 139769, :storm-id nlptools-test-1-139740, :executors #{[3 3] [6 6] [-1 -1]}, :port 6702} 2014-01-17 23:19:30 o.a.z.ClientCnxn [INFO] Opening socket connection to server zookeeper/192.168.50.3:2181 ^-- This is where the bolt gets restarted in its initialization. Thanks, Eddie
Experiencing python (v2.7.5) thrift errors for v0.9 (getTopology / getTopologyInfo) ...
Hello: I downloaded the latest Storm source yesterday (v0.9) and compiled python thrift bindings with it, like so: /usr/bin/thrift --gen py storm.thrift /I installed the output into/opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/ The resulting interface works more or less, but I've run into two things variously while coding/testing throughout the day (from within an IDE and also from within an interactive bpython session): (1) Every other call to *client.getTopologyInfo(id)* / *client.getTopology(id)* hangs and I have to *Ctrl-C* and re-issue it. And it's literally 50%/50%: It-Hangs / Doesn't-Hang / It-Hangs / Doesn't-Hang / ... (etc.). (2) Next, for the 50% of the time where calls to *client.getTopologyInfo(id)* / *client.getTopology(id)* doesn't result in a hang, I'm getting 2 different types of exceptions: a) TypeError: unhashable instance b) unknown result Below is a tabular paste of the typical sequence (using */client./**getTopologyInfo(id)* here). I also want to note that the issue became worse after pushing data through the previously idle topology (and perhaps this altered elements of underlying data structures). Before that*/client./**getTopologyInfo(id)* was working. Finally, I should also mention that /*client.getUserTopology(id)*/ (not mentioned previously), has never worked. It always returnes TypeError: unhashable instance Does anyone know what might be happening here? Thank you in advance (and see below). *client.getTopologyInfo('run01-2-1392747346')* Traceback (most recent call last): File input, line 1, in module File /opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/Nimbus.py, line 586, in getTopologyInfo return self.recv_getTopologyInfo() File /opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/Nimbus.py, line 610, in recv_getTopologyInfo raise TApplicationException(TApplicationException.MISSING_RESULT, getTopologyInfo failed: unknown result); TApplicationException: getTopologyInfo failed: unknown result * * *client.getTopologyInfo('run01-2-1392747346') * ctrl-C (because it hung)* * * * * client.getTopologyInfo('run01-2-1392747346')* Traceback (most recent call last): File input, line 1, in module File /opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/Nimbus.py, line 586, in getTopologyInfo return self.recv_getTopologyInfo() File /opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/Nimbus.py, line 604, in recv_getTopologyInfo result.read(self._iprot) File /opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/Nimbus.py, line 2832, in read self.success.read(iprot) File /opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/ttypes.py, line 2726, in read _elem265.read(iprot) File /opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/ttypes.py, line 2602, in read self.stats.read(iprot) File /opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/ttypes.py, line 2393, in read self.specific.read(iprot) File /opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/ttypes.py, line 2282, in read self.bolt.read(iprot) File /opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/ttypes.py, line 1927, in read _val86[_key92] = _val93 TypeError: unhashable instance
Re: How to specify worker.childopts for a specified topology?
Yes! it works when using Config.TOPOLOGY_WORKER_CHILDOPTS. Thanks a lot! On Tue, Feb 18, 2014 at 11:50 PM, Derek Dagit der...@yahoo-inc.com wrote: Try this: conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, WORKER_OPTS); Your WORKER_OPTS should be appended to WORKER_CHILDOPTS. -- Derek On 2/18/14, 1:47, Link Wang wrote: Dear all, I want to specify some worker.childopts for my topology inner it's code, and I use this way: conf.put(Config.WORKER_CHILDOPTS, WORKER_OPTS); but I found it doesn't work. I don't use storm.yaml file to set worker.childopts, because the memory requirement of my topologies are widely different. is there some one encounter the same problem?
Experiencing python (v2.7.5) thrift errors for v0.9 (getTopology / getTopologyInfo against nimbus) ...
Hello: (Apologies. The first send of this email formatted poorly and wasn't very readable. Resent). I downloaded the latest Storm source yesterday (v0.9) and compiled python thrift bindings with it, like so: /usr/bin/thrift --gen py storm.thrift I installed the compile output into /opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d The resulting thrift interface works more or less, but I've run into two fatal things variously while coding/testing throughout the day (from within an IDE, and also from within an interactive bpython session): --- (1) Every other call to client.getTopologyInfo(id) / client.getTopology(id) hangs and I have to Ctrl-C and re-issue it. And it's literally 50%/50%: It-Hangs / Doesn't-Hang / It-Hangs / Doesn't-Hang / ... (etc.). (2) Next, for the 50% of the time where calls to client.getTopologyInfo(id) / client.getTopology(id) don't result in a hang, I'm getting 2 different types of exceptions: a) TypeError: unhashable instance b) unknown result --- Below is a tabular paste of the interaction typical sequence (using client.getTopologyInfo(id) here). I also want to note that the issue became worse after pushing data through the previously idle topology (and perhaps this altered elements of underlying data structures). Before that client.getTopologyInfo(id) was working. Finally, I should also mention that client.getUserTopology(id) (not mentioned previously), has never worked. It always returns TypeError: unhashable instance Does anyone know what might be happening here? Thank you in advance (see below). # = client.getTopologyInfo('run01-2-1392747346') Traceback (most recent call last): File input, line 1, in module File /opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/Nimbus.py, line 586, in getTopologyInfo return self.recv_getTopologyInfo() File /opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/Nimbus.py, line 610, in recv_getTopologyInfo raise TApplicationException(TApplicationException.MISSING_RESULT, getTopologyInfo failed: unknown result); TApplicationException: getTopologyInfo failed: unknown result # === # === client.getTopologyInfo('run01-2-1392747346') ctrl-C (because it hung) # === # === client.getTopologyInfo('run01-2-1392747346') Traceback (most recent call last): File input, line 1, in module File /opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/Nimbus.py, line 586, in getTopologyInfo return self.recv_getTopologyInfo() File /opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/Nimbus.py, line 604, in recv_getTopologyInfo result.read(self._iprot) File /opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/Nimbus.py, line 2832, in read self.success.read(iprot) File /opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/ttypes.py, line 2726, in read _elem265.read(iprot) File /opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/ttypes.py, line 2602, in read self.stats.read(iprot) File /opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/ttypes.py, line 2393, in read self.specific.read(iprot) File /opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/ttypes.py, line 2282, in read self.bolt.read(iprot) File /opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/ttypes.py, line 1927, in read _val86[_key92] = _val93 TypeError: unhashable instance # =
trident: how to access groupBy() keys in a stream after aggregation
Hello, a beginner question coming up. I'm trying to build analytics crunching with Storm Trident; a continuous stream of events of which I need to group/aggregate things and then write the aggregated results over a time-slice into a database for quick access later on. I am starting with the following topology: TridentState state = topology.newStream(logspout, spout) .parallelismHint(8).each(new Fields(json), new ProcessJsonFunction(), new Fields(ad_id, zone, impressions, clicks)) .groupBy(new Fields(ad_id, zone)) .chainedAgg() .aggregate(new Fields(impressions), new Sum(), new Fields(impressions_sum)) .aggregate(new Fields(clicks), new Sum(), new Fields(clicks_sum)) .chainEnd() .partitionPersist(new AnalyticsStateFactory(), new Fields(impressions_sum, clicks_sum), new AnalyticsStateUpdater()); And then in my class AnalyticsStateUpdater:s method updateState() I would like to store the aggregated values (impressions_sum, clicks_sum) per key-bucket -- and here I ran into problems; how do I - in the StateUpdater - know to which groupBy() bucket the aggregated data belongs to? In other words I would need to get the key formed of the values of the fields (ad_id, zone). The aggregated values themselves end up properly Sum():ed in the StateUpdater. I am aware this is probably trivial but from the Trident documentation (or the lack of it) I cannot seem to figure out how to do this. BR, - Matti
Re: trident: how to access groupBy() keys in a stream after aggregation
Hi Matti, Use a persistent aggregate, it's doing precisely what you are describing: giving the result of an Aggregator to a Trident State so we can save it somewhere based on the groupby bucket it belongs to. Here's a blog post where I explain my understanding of how it works: http://svendvanderveken.wordpress.com/2013/07/30/scalable-real-time-state-update-with-storm/ Cheers, Svend On Wed, Feb 19, 2014 at 8:09 AM, Matti Dahlbom mdahlbom...@gmail.comwrote: Hello, a beginner question coming up. I'm trying to build analytics crunching with Storm Trident; a continuous stream of events of which I need to group/aggregate things and then write the aggregated results over a time-slice into a database for quick access later on. I am starting with the following topology: TridentState state = topology.newStream(logspout, spout) .parallelismHint(8).each(new Fields(json), new ProcessJsonFunction(), new Fields(ad_id, zone, impressions, clicks)) .groupBy(new Fields(ad_id, zone)) .chainedAgg() .aggregate(new Fields(impressions), new Sum(), new Fields(impressions_sum)) .aggregate(new Fields(clicks), new Sum(), new Fields(clicks_sum)) .chainEnd() .partitionPersist(new AnalyticsStateFactory(), new Fields(impressions_sum, clicks_sum), new AnalyticsStateUpdater()); And then in my class AnalyticsStateUpdater:s method updateState() I would like to store the aggregated values (impressions_sum, clicks_sum) per key-bucket -- and here I ran into problems; how do I - in the StateUpdater - know to which groupBy() bucket the aggregated data belongs to? In other words I would need to get the key formed of the values of the fields (ad_id, zone). The aggregated values themselves end up properly Sum():ed in the StateUpdater. I am aware this is probably trivial but from the Trident documentation (or the lack of it) I cannot seem to figure out how to do this. BR, - Matti