Re: Number of executors number of tasks?

2014-02-18 Thread Spico Florin
Hello!
   Unfortunately you cannot change the number of tasks when doing
rebalancing. Considering the following scenario:
1. A storm topology with maximum parallelism of 10 tasks.
2. Your storm topology is deployed on cloud
3. Your application should scale horizontally depending on the incoming
volume.
4. if the volume is high you can scale more with more machines but
unfortunately you they cannot be used due to the task limit  (meaning that
you are adding machines that will overlap the maximum parallelism supported
by your topology)

I found this limitation as an inconvenience for the cloud application,
where theoretically, you can scale horizontally without limit.


You can check my complains about the examples from the wiki page regarding
the parallelism, here :
http://mail-archives.apache.org/mod_mbox/storm-user/201402.mbox/%3CCAOy42CtEz6P0kRVOEff=u83bo_317ld3zrmywgkt2yjx6qr...@mail.gmail.com%3E

Hope that these help.
 Regards,
 Florin



On Mon, Feb 17, 2014 at 11:58 PM, Enno Shioji eshi...@gmail.com wrote:

 This SO answer seems to answer your question:
 http://stackoverflow.com/a/17454586/234901


 On Mon, Feb 17, 2014 at 7:30 PM, Abhishek Bhattacharjee 
 abhishek.bhattacharje...@gmail.com wrote:

 If you see practically number of workers should always be less than or
 equal to the number of tasks. This is not only valid for Comp sci but for
 any field. If we have more no. of workers than tasks available then we are
 just wasting our resources. That is we are not using our resources
 optimally.
 So in most cases there should at least be one task or more running on an
 executor.
 Now, consider an executor which normalizes a sentence into words, now
 when sentence arrives the executor normalizes that sentence after it is
 finished it is ready to receive another tuple.
 With storm there could be several executors doing the same job with
 different sentences simultaneously but that doesn't mean we have to have as
 many no. of executors as there are sentences. I hope this makes sense.


 On Mon, Feb 17, 2014 at 11:11 PM, Simon Cooper 
 simon.coo...@featurespace.co.uk wrote:

  I've been looking at the parallelism of a storm topology. In what
 situations is it useful to have more than one task running on the same
 executor? An executor is a thread, so if there's several tasks on that
 executor only one task can run at a time. So why would you want more than
 one task on the same thread, if only one can run at once?



 SimonC




 --
 *Abhishek Bhattacharjee*
 *Pune Institute of Computer Technology*





Storm 0.9.0.1 - Topology logs are not getting printed anywhere

2014-02-18 Thread Binita Bharati
Hi ,

I am using Storm 0.9.0.1. My topology files use slf4j logger (as
recommended here -
http://storm.incubator.apache.org/2013/12/08/storm090-released.html).

But, none of the log statements are being printed in the nimbus/supervisor
servers. I am searching for my log statements under - ${storm.home}/logs
directory.

Anyone knows why ?

Thanks
Binita


Re: Storm 0.9.0.1 - Topology logs are not getting printed anywhere

2014-02-18 Thread Kavi Kumar
Hi,

General Tip : install mlocate and then locate your log file.

yum install mlocate
updatedb
locate file_name

Regards,
Kavi Kumar Koneti



On Tue, Feb 18, 2014 at 3:09 PM, Binita Bharati binita.bhar...@gmail.comwrote:

 Hi ,

 I am using Storm 0.9.0.1. My topology files use slf4j logger (as
 recommended here -
 http://storm.incubator.apache.org/2013/12/08/storm090-released.html).

 But, none of the log statements are being printed in the nimbus/supervisor
 servers. I am searching for my log statements under - ${storm.home}/logs
 directory.

 Anyone knows why ?

 Thanks
 Binita



Re: Storm 0.9.0.1 - Topology logs are not getting printed anywhere

2014-02-18 Thread Binita Bharati
There was a Netty config related Exception in my worker logs, because of
which the topology wasn't executed at all.

After correcting the Netty related error, I am able to see my topology log
statements in the worker logs.




On Tue, Feb 18, 2014 at 3:09 PM, Binita Bharati binita.bhar...@gmail.comwrote:

 Hi ,

 I am using Storm 0.9.0.1. My topology files use slf4j logger (as
 recommended here -
 http://storm.incubator.apache.org/2013/12/08/storm090-released.html).

 But, none of the log statements are being printed in the nimbus/supervisor
 servers. I am searching for my log statements under - ${storm.home}/logs
 directory.

 Anyone knows why ?

 Thanks
 Binita



What is the structure of a fatjar, if any?

2014-02-18 Thread Harald Kirsch

Hi all,

I need to add dependent jars to my topology. With 'storm jar', the jar 
file shall contain all dependencies, but I miss information on how the 
jar must be structured. I only find references to maven plugins or 
gradle magic or leiningen whatever, none of which technologies I am using.


Could some kind soul just briefly describe how the jar needs to be 
structured:


1) Unpack all dependent jars and repack them into one jar. Is that what 
fatjar refers to?


2) Just add the dependent jars as jars and they are unpacked for use 
with the topology?


Thanks,
Harald.


Re: How to specify worker.childopts for a specified topology?

2014-02-18 Thread Derek Dagit

Try this:

conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, WORKER_OPTS);

Your WORKER_OPTS should be appended to WORKER_CHILDOPTS.
--
Derek

On 2/18/14, 1:47, Link Wang wrote:

Dear all,
I want to specify some worker.childopts for my topology inner it's code, and I 
use this way:
conf.put(Config.WORKER_CHILDOPTS, WORKER_OPTS);
but I found it doesn't work.

I don't use storm.yaml file to set worker.childopts, because the memory 
requirement of my topologies are widely different.

is there some one encounter the same problem?


Re: Reading state of a streaming topology via DRPC?

2014-02-18 Thread Niels Basjes
Thanks for the pointer. I''ll have a close look at this.
Is there also an example in 'plain' storm api?

Niels
On Feb 16, 2014 11:14 AM, Enno Shioji eshi...@gmail.com wrote:

 This uses Trident but I think it covers what you need:

 https://github.com/eshioji/trident-tutorial/blob/master/src/main/java/tutorial/storm/trident/Part04_BasicStateAndDRPC.java


 On Sat, Feb 15, 2014 at 8:17 PM, Niels Basjes ni...@basjes.nl wrote:

 Hi,

 I want to create a bolt that keeps some kind of state (aggregate) about
 the messages it has seen so far (i.e. web click stream).
 Once such a bolt has gathered information I would like to get to that
 information for an application I designing.

 So far I've come up with two way of getting at this data:
 1) Persist it into something like HBase (i.e push from the bolt).
 2) Use DRPC to query the bolt state directly.

 Regarding this last idea (using DRPC): Is this possible?
 If it is then where can I find an example on how to create a single
 topology that is essentially both streaming and drpc.

 Thanks.

 --
 Best regards,

 Niels Basjes





回复: Millisecond-level tick generation granularity does not work

2014-02-18 Thread Kang Xiao
Hi Antonio

James has created an JIRA issue 
https://issues.apache.org/jira/browse/STORM-186. And I just opened a PR 
https://github.com/apache/incubator-storm/pull/37 for it.  

--  
Best Regards!

肖康(Kang Xiao,kxiao.ti...@gmail.com (mailto:kxiao.ti...@gmail.com))
Distributed Software Engineer


在 2013年12月27日 星期五,3:02,Antonio Verardi 写道:

 Thanks, guys!
  
  
 On Fri, Dec 20, 2013 at 6:06 PM, James Xu xumingmi...@gmail.com 
 (mailto:xumingmi...@gmail.com) wrote:
  Yeah, it is a bug. can open an Pull Request Kang?
   
  On 2013年12月21日, at 上午2:20, 肖康(Kang Xiao) kxiao.ti...@gmail.com 
  (mailto:kxiao.ti...@gmail.com) wrote:  
   It seems that PR #547 is modified before it's merged to upstream. 
   @nathan, is it a bug?  

   https://github.com/apache/incubator-storm/commits/master/storm-core/src/clj/backtype/storm/timer.clj
 


   On Sat, Dec 21, 2013 at 2:08 AM, 肖康(Kang Xiao) kxiao.ti...@gmail.com 
   (mailto:kxiao.ti...@gmail.com) wrote:
Yes, it's truncated. The code is as follows.
 
(* 1000 (long delay-secs)))  
https://github.com/apache/incubator-storm/blob/master/storm-core/src/clj/backtype/storm/timer.clj#L85
 
 
On Fri, Dec 20, 2013 at 9:11 AM, Antonio Verardi anto...@yelp.com 
(mailto:anto...@yelp.com) wrote:
 Hi,
  
 I have already asked this question in the dev mailing list, but no 
 one answered. Maybe I'll be more lucky here. Sorry for the spam.
  
 I noticed that Storm 0.9.0 allows to set tick tuple frequencies at 
 the millisecond level:  
 https://github.com/nathanmarz/storm/pull/547
  
 However, I tried to use the feature, but, even though the config 
 seems to show the floating point value I specified (3.9 for example), 
 the tick tuple seems to be generated every truncate(float_value) (in 
 this case, every 3 seconds).
  
 Does anyone have experienced the same issue? Should I open an issue 
 on Apache JIRA?  
 
 
--  
Best Regards!
 
肖康(Kang Xiao,kxiao.ti...@gmail.com (mailto:kxiao.ti...@gmail.com))
Distributed Software Engineer



   --  
   Best Regards!

   肖康(Kang Xiao,kxiao.ti...@gmail.com (mailto:kxiao.ti...@gmail.com))
   Distributed Software Engineer
   
  



Bolts with long instantiation times -- A.K.A. Zookeeper shenanigans

2014-02-18 Thread Eddie Santos
Hi all,

How do you get bolts that take a ludicrously long time to load (we're
talking minutes here) to cooperate with Zookeeper?

I may not be understanding my problem properly, but on my test cluster
(**not** in local mode!) my bolt keeps getting restarted in the middle of
its prepare() method -- which may take up to two minutes to return.

The problem seems to be the  Client session timed out, but I'm not
knowledgable enough with Zookeeper to really know how to fix this.

Here's a portion of logs from the supervisor affected. The STDIO messages
come from a poorly-coded third party library that I have to use.

2014-01-17 23:19:28 o.a.z.ClientCnxn [INFO] Client session timed out,
have not heard from server in 2747ms for sessionid 0x143a22eb4060078,
closing socket connection and attempting reconnect
2014-01-17 23:19:28 b.s.d.worker [DEBUG] Doing heartbeat
#backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 139768,
:storm-id nlptools-test-1-139740, :executors #{[3 3] [6 6] [-1 -1]},
:port 6702}
2014-01-17 23:19:28 b.s.d.worker [DEBUG] Doing heartbeat
#backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 139768,
:storm-id nlptools-test-1-139740, :executors #{[3 3] [6 6] [-1 -1]},
:port 6702}
2014-01-17 23:19:28 c.n.c.f.s.ConnectionStateManager [INFO] State
change: SUSPENDED
2014-01-17 23:19:28 c.n.c.f.s.ConnectionStateManager [WARN] There are
no ConnectionStateListeners registered.
2014-01-17 23:19:28 b.s.cluster [WARN] Received event
:disconnected::none: with disconnected Zookeeper.
2014-01-17 23:19:28 b.s.cluster [WARN] Received event
:disconnected::none: with disconnected Zookeeper.
2014-01-17 23:19:28 STDIO [ERROR] done [7.2 sec].
2014-01-17 23:19:28 STDIO [ERROR] Adding annotator lemma
2014-01-17 23:19:28 STDIO [ERROR] Adding annotator ner
2014-01-17 23:19:28 STDIO [ERROR] Loading classifier from
edu/stanford/nlp/models/ner/english.all.3class.distsim.crf.ser.gz
2014-01-17 23:19:28 STDIO [ERROR] ...
2014-01-17 23:19:29 b.s.d.worker [DEBUG] Doing heartbeat
#backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 139769,
:storm-id nlptools-test-1-139740, :executors #{[3 3] [6 6] [-1 -1]},
:port 6702}
2014-01-17 23:19:29 b.s.d.worker [DEBUG] Doing heartbeat
#backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 139769,
:storm-id nlptools-test-1-139740, :executors #{[3 3] [6 6] [-1 -1]},
:port 6702}
2014-01-17 23:19:30 o.a.z.ClientCnxn [INFO] Opening socket connection
to server zookeeper/192.168.50.3:2181

  ^-- This is where the bolt gets restarted in its initialization.

Thanks,
Eddie


Re: Bolts with long instantiation times -- A.K.A. Zookeeper shenanigans

2014-02-18 Thread Michael Rose
You may need to configure your cluster to give it more time to start up.
Additionally, knowing how long it can take to load the Stanford NLP models,
make sure you're only doing it in a single bolt instance (e.g. static
initializer or double-check synch) and sharing it between all your bolt
instances.

supervisor.worker.start.timeout.secs 120
supervisor.worker.timeout.secs 60

I'd try tuning your worker start timeout here. Try setting it up to 300s
and (again) ensuring your prepare method only initializes expensive
resources once, then shares them between instances in the JVM.

Michael Rose (@Xorlev https://twitter.com/xorlev)
Senior Platform Engineer, FullContact http://www.fullcontact.com/
mich...@fullcontact.com


On Tue, Feb 18, 2014 at 1:45 PM, Eddie Santos easan...@ualberta.ca wrote:

 Hi all,

 How do you get bolts that take a ludicrously long time to load (we're
 talking minutes here) to cooperate with Zookeeper?

 I may not be understanding my problem properly, but on my test cluster
 (**not** in local mode!) my bolt keeps getting restarted in the middle of
 its prepare() method -- which may take up to two minutes to return.

 The problem seems to be the  Client session timed out, but I'm not
 knowledgable enough with Zookeeper to really know how to fix this.

 Here's a portion of logs from the supervisor affected. The STDIO messages
 come from a poorly-coded third party library that I have to use.

 2014-01-17 23:19:28 o.a.z.ClientCnxn [INFO] Client session timed out,
 have not heard from server in 2747ms for sessionid 0x143a22eb4060078,
 closing socket connection and attempting reconnect
 2014-01-17 23:19:28 b.s.d.worker [DEBUG] Doing heartbeat
 #backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 139768,
 :storm-id nlptools-test-1-139740, :executors #{[3 3] [6 6] [-1 -1]},
 :port 6702}
 2014-01-17 23:19:28 b.s.d.worker [DEBUG] Doing heartbeat
 #backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 139768,
 :storm-id nlptools-test-1-139740, :executors #{[3 3] [6 6] [-1 -1]},
 :port 6702}
 2014-01-17 23:19:28 c.n.c.f.s.ConnectionStateManager [INFO] State
 change: SUSPENDED
 2014-01-17 23:19:28 c.n.c.f.s.ConnectionStateManager [WARN] There are
 no ConnectionStateListeners registered.
 2014-01-17 23:19:28 b.s.cluster [WARN] Received event
 :disconnected::none: with disconnected Zookeeper.
 2014-01-17 23:19:28 b.s.cluster [WARN] Received event
 :disconnected::none: with disconnected Zookeeper.
 2014-01-17 23:19:28 STDIO [ERROR] done [7.2 sec].
 2014-01-17 23:19:28 STDIO [ERROR] Adding annotator lemma
 2014-01-17 23:19:28 STDIO [ERROR] Adding annotator ner
 2014-01-17 23:19:28 STDIO [ERROR] Loading classifier from
 edu/stanford/nlp/models/ner/english.all.3class.distsim.crf.ser.gz
 2014-01-17 23:19:28 STDIO [ERROR] ...
 2014-01-17 23:19:29 b.s.d.worker [DEBUG] Doing heartbeat
 #backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 139769,
 :storm-id nlptools-test-1-139740, :executors #{[3 3] [6 6] [-1 -1]},
 :port 6702}
 2014-01-17 23:19:29 b.s.d.worker [DEBUG] Doing heartbeat
 #backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 139769,
 :storm-id nlptools-test-1-139740, :executors #{[3 3] [6 6] [-1 -1]},
 :port 6702}
 2014-01-17 23:19:30 o.a.z.ClientCnxn [INFO] Opening socket connection
 to server zookeeper/192.168.50.3:2181

   ^-- This is where the bolt gets restarted in its initialization.

 Thanks,
 Eddie



Experiencing python (v2.7.5) thrift errors for v0.9 (getTopology / getTopologyInfo) ...

2014-02-18 Thread Noel Milton Vega

Hello:

I downloaded the latest Storm source yesterday (v0.9) and compiled 
python thrift bindings with it,

like so:

/usr/bin/thrift --gen py storm.thrift

/I installed the output 
into/opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/


The resulting interface works more or less, but I've run into two things 
variously while coding/testing
throughout the day (from within an IDE and also from within an 
interactive bpython session):


(1) Every other call to *client.getTopologyInfo(id)* / 
*client.getTopology(id)* hangs and I have to

*Ctrl-C* and re-issue it. And it's literally 50%/50%:
It-Hangs / Doesn't-Hang / It-Hangs / Doesn't-Hang / ... (etc.).

(2) Next, for the 50% of the time where calls to 
*client.getTopologyInfo(id)* / *client.getTopology(id)*

doesn't result in a hang, I'm getting 2 different types of exceptions:
a) TypeError: unhashable instance
b) unknown result

Below is a tabular paste of the typical sequence (using 
*/client./**getTopologyInfo(id)* here).


I also want to note that the issue became worse after pushing data 
through the previously
idle topology (and perhaps this altered elements of underlying data 
structures). Before
that*/client./**getTopologyInfo(id)* was working. Finally, I should also 
mention that
/*client.getUserTopology(id)*/ (not mentioned previously), has never 
worked. It always

returnes TypeError: unhashable instance

Does anyone know what might be happening here?

Thank you in advance (and see below).

 *client.getTopologyInfo('run01-2-1392747346')*

Traceback (most recent call last):
  File input, line 1, in module
  File 
/opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/Nimbus.py, line 
586, in getTopologyInfo

return self.recv_getTopologyInfo()
  File 
/opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/Nimbus.py, line 
610, in recv_getTopologyInfo
raise TApplicationException(TApplicationException.MISSING_RESULT, 
getTopologyInfo failed: unknown result);

TApplicationException: getTopologyInfo failed: unknown result



*
*

 *client.getTopologyInfo('run01-2-1392747346')
* ctrl-C  (because it hung)*
*

*
*

* client.getTopologyInfo('run01-2-1392747346')*

Traceback (most recent call last):
  File input, line 1, in module
  File 
/opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/Nimbus.py, line 
586, in getTopologyInfo

return self.recv_getTopologyInfo()
  File 
/opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/Nimbus.py, line 
604, in recv_getTopologyInfo

result.read(self._iprot)
  File 
/opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/Nimbus.py, line 
2832, in read

self.success.read(iprot)
  File 
/opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/ttypes.py, line 
2726, in read

_elem265.read(iprot)
  File 
/opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/ttypes.py, line 
2602, in read

self.stats.read(iprot)
  File 
/opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/ttypes.py, line 
2393, in read

self.specific.read(iprot)
  File 
/opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/ttypes.py, line 
2282, in read

self.bolt.read(iprot)
  File 
/opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/ttypes.py, line 
1927, in read

_val86[_key92] = _val93
TypeError: unhashable instance




Re: How to specify worker.childopts for a specified topology?

2014-02-18 Thread Link Wang
Yes! it works when using Config.TOPOLOGY_WORKER_CHILDOPTS.
Thanks a lot!


On Tue, Feb 18, 2014 at 11:50 PM, Derek Dagit der...@yahoo-inc.com wrote:

 Try this:

 conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, WORKER_OPTS);

 Your WORKER_OPTS should be appended to WORKER_CHILDOPTS.
 --
 Derek


 On 2/18/14, 1:47, Link Wang wrote:

 Dear all,
 I want to specify some worker.childopts for my topology inner it's code,
 and I use this way:
 conf.put(Config.WORKER_CHILDOPTS, WORKER_OPTS);
 but I found it doesn't work.

 I don't use storm.yaml file to set worker.childopts, because the memory
 requirement of my topologies are widely different.

 is there some one encounter the same problem?




Experiencing python (v2.7.5) thrift errors for v0.9 (getTopology / getTopologyInfo against nimbus) ...

2014-02-18 Thread Noel Milton Vega

Hello:

(Apologies. The first send of this email formatted poorly and wasn't 
very readable. Resent).



I downloaded the latest Storm source yesterday (v0.9) and compiled  
python thrift

bindings with it, like so:

/usr/bin/thrift --gen py storm.thrift


I installed the compile output into 
/opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d



The resulting thrift interface works more or less, but I've run into two 
fatal things
variously while coding/testing throughout the day (from within an IDE, 
and also

from within an interactive bpython session):

---
   (1) Every other call to client.getTopologyInfo(id) / 
client.getTopology(id) hangs and

   I have to Ctrl-C and re-issue it. And it's literally 50%/50%:
   It-Hangs / Doesn't-Hang / It-Hangs / Doesn't-Hang / ... (etc.).


   (2) Next, for the 50% of the time where calls to 
client.getTopologyInfo(id) /
   client.getTopology(id) don't result in a hang, I'm getting 2 
different types

   of exceptions:
  a) TypeError: unhashable instance
  b) unknown result
---


Below is a tabular paste of the interaction typical sequence (using
client.getTopologyInfo(id) here).

I also want to note that the issue became worse after pushing data 
through the previously
idle topology (and perhaps this altered elements of underlying data 
structures). Before
that client.getTopologyInfo(id) was working. Finally, I should also 
mention that
client.getUserTopology(id) (not mentioned previously), has never worked. 
It always

returns TypeError: unhashable instance

Does anyone know what might be happening here?


Thank you in advance (see below).

# 
=

 client.getTopologyInfo('run01-2-1392747346')

Traceback (most recent call last):
  File input, line 1, in module
  File 
/opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/Nimbus.py, line 
586, in getTopologyInfo

return self.recv_getTopologyInfo()
  File 
/opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/Nimbus.py, line 
610, in recv_getTopologyInfo
raise TApplicationException(TApplicationException.MISSING_RESULT, 
getTopologyInfo failed: unknown result);

TApplicationException: getTopologyInfo failed: unknown result

# 
===




# 
===

 client.getTopologyInfo('run01-2-1392747346')
 ctrl-C  (because it hung)
# 
===




# 
===

 client.getTopologyInfo('run01-2-1392747346')

Traceback (most recent call last):
  File input, line 1, in module
  File 
/opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/Nimbus.py, line 
586, in getTopologyInfo

return self.recv_getTopologyInfo()
  File 
/opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/Nimbus.py, line 
604, in recv_getTopologyInfo

result.read(self._iprot)
  File 
/opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/Nimbus.py, line 
2832, in read

self.success.read(iprot)
  File 
/opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/ttypes.py, line 
2726, in read

_elem265.read(iprot)
  File 
/opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/ttypes.py, line 
2602, in read

self.stats.read(iprot)
  File 
/opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/ttypes.py, line 
2393, in read

self.specific.read(iprot)
  File 
/opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/ttypes.py, line 
2282, in read

self.bolt.read(iprot)
  File 
/opt/STORM.d/latest/THRIFT.d/storm-0.9-python-thrift.d/storm/ttypes.py, line 
1927, in read

_val86[_key92] = _val93
  TypeError: unhashable instance
# 
=


trident: how to access groupBy() keys in a stream after aggregation

2014-02-18 Thread Matti Dahlbom
Hello, a beginner question coming up. I'm trying to build analytics
crunching with Storm Trident; a continuous stream of events of which I need
to group/aggregate things and then write the aggregated results over a
time-slice into a database for quick access later on. I am starting with
the following topology:

TridentState state = topology.newStream(logspout, spout)
.parallelismHint(8).each(new Fields(json),
 new ProcessJsonFunction(),
 new Fields(ad_id, zone,
impressions, clicks))
.groupBy(new Fields(ad_id, zone))
.chainedAgg()
.aggregate(new Fields(impressions), new Sum(),
   new Fields(impressions_sum))
.aggregate(new Fields(clicks), new Sum(),
   new Fields(clicks_sum))
.chainEnd()
.partitionPersist(new AnalyticsStateFactory(),
  new Fields(impressions_sum, clicks_sum),
  new AnalyticsStateUpdater());

And then in my class AnalyticsStateUpdater:s method updateState() I would
like to store the aggregated values (impressions_sum, clicks_sum) per
key-bucket -- and here I ran into problems; how do I - in the
StateUpdater - know to which groupBy() bucket the aggregated data belongs
to? In other words I would need to get the key formed of the values of the
fields (ad_id, zone). The aggregated values themselves end up properly
Sum():ed in the StateUpdater.

I am aware this is probably trivial but from the Trident documentation (or
the lack of it) I cannot seem to figure out how to do this.

BR,

- Matti


Re: trident: how to access groupBy() keys in a stream after aggregation

2014-02-18 Thread Svend Vanderveken
Hi Matti,

Use a persistent aggregate, it's doing precisely what you are describing:
giving the result of an Aggregator to a Trident State so we can save it
somewhere based on the groupby bucket it belongs to.

Here's a blog post where I explain my understanding of how it works:
http://svendvanderveken.wordpress.com/2013/07/30/scalable-real-time-state-update-with-storm/

Cheers,

Svend








On Wed, Feb 19, 2014 at 8:09 AM, Matti Dahlbom mdahlbom...@gmail.comwrote:

 Hello, a beginner question coming up. I'm trying to build analytics
 crunching with Storm Trident; a continuous stream of events of which I need
 to group/aggregate things and then write the aggregated results over a
 time-slice into a database for quick access later on. I am starting with
 the following topology:

 TridentState state = topology.newStream(logspout, spout)
 .parallelismHint(8).each(new Fields(json),
  new ProcessJsonFunction(),
  new Fields(ad_id, zone,
 impressions, clicks))
 .groupBy(new Fields(ad_id, zone))
 .chainedAgg()
 .aggregate(new Fields(impressions), new Sum(),
new Fields(impressions_sum))
 .aggregate(new Fields(clicks), new Sum(),
new Fields(clicks_sum))
 .chainEnd()
 .partitionPersist(new AnalyticsStateFactory(),
   new Fields(impressions_sum, clicks_sum),
   new AnalyticsStateUpdater());

 And then in my class AnalyticsStateUpdater:s method updateState() I would
 like to store the aggregated values (impressions_sum, clicks_sum) per
 key-bucket -- and here I ran into problems; how do I - in the
 StateUpdater - know to which groupBy() bucket the aggregated data belongs
 to? In other words I would need to get the key formed of the values of the
 fields (ad_id, zone). The aggregated values themselves end up properly
 Sum():ed in the StateUpdater.

 I am aware this is probably trivial but from the Trident documentation (or
 the lack of it) I cannot seem to figure out how to do this.

 BR,

 - Matti