Trident Metrics Consumer
I've been following the tutorials here ( http://www.bigdata-cookbook.com/post/72320512609/storm-metrics-how-to) to create metrics in Storm. However I am using Trident which abstracts bolts away from the user. How can I go about creating metrics in trident ? Thanks -- Raphael Hsieh
Re: LoggingMetricsConsumer
Thanks for your response John, Could you explain to me how this would work when I am using Trident? Since with Trident bolts are abstracted away from the user, how might I configure my own MetricsConsumerBolt / debug it to figure out why it isn't calling "handleDataPoints()" ? my metricsConsumer's "prepare()" and "cleanup()" methods are called, but never the handleDataPoints() function. Thanks On Thu, Sep 25, 2014 at 1:33 PM, John Reilly wrote: > It is called by the MetricsConsumerBolt which is created by storm when a > worker is starting up. When you define a metrics consumer, you should see > metrics output every 60 seconds. Also, I think the metrics code was only > introduced in 0.9.0 so you would need to be running at least that version. > > One other issue I ran into when registering a metrics consumer was that > the config args I was passing initially because of serialization issues. > When I used a Map instead of a serializable class that I created, it worked > fine. For the packaged LoggingMetricsConsumer there is no config though. > I think I did run into an issue when trying to configure > both LoggingMetricsConsumer and my own metrics consumer. IIRC, if > initialization of my consumer failed, the LoggingMetricsConsumer would also > failit may have depended on the order that I was registering them in, > but I don't remember exactly. > > Cheers, > John > > On Thu, Sep 25, 2014 at 10:07 AM, Raphael Hsieh > wrote: > >> Hi, I've been trying to figure out why registerinfg a >> LoggingMetricsConsumer isn't working for me. >> >> I've been able to figure out that it is indeed running, however the >> "handleDataPoints()" function is never called. Can someone explain to me >> how this class is used by Storm in order to log metrics? >> When is the handleDataPoints function called? >> >> Thanks >> >> -- >> Raphael Hsieh >> >> >> > > -- Raphael Hsieh
LoggingMetricsConsumer
Hi, I've been trying to figure out why registerinfg a LoggingMetricsConsumer isn't working for me. I've been able to figure out that it is indeed running, however the "handleDataPoints()" function is never called. Can someone explain to me how this class is used by Storm in order to log metrics? When is the handleDataPoints function called? Thanks -- Raphael Hsieh
Re: metrics consumer logging stormUI data
In order to get this / have the metrics consumer work, do I need to have the setDebug attribute set to true? On Mon, Sep 22, 2014 at 12:59 PM, Harsha wrote: > Here is what I see in the metrics.log > > 2014-09-22 09:44:31,321 731751411404271 localhost:6703 > 19:split __transfer-count{default=2680} > > 2014-09-22 09:44:31,321 731751411404271 localhost:6703 > 19:split __execute-latency {spout:default=0.0} > > 2014-09-22 09:44:31,321 731751411404271 localhost:6703 > 19:split __fail-count{} > > 2014-09-22 09:44:31,321 731751411404271 localhost:6703 > 19:split __emit-count{default=2680} > > 2014-09-22 09:44:31,321 731751411404271 localhost:6703 > 19:split __execute-count {spout:default=420} > > 2014-09-22 09:44:31,352 731791411404271 localhost:6703 > 22:split __ack-count {spout:default=420} > > 2014-09-22 09:44:31,352 731791411404271 localhost:6703 > 22:split __sendqueue {write_pos=2679, capacity=1024, > read_pos=2679, population=0} > I do see all the UI related counts coming in the metrics.log. > > -Harsha > > > On Mon, Sep 22, 2014, at 10:41 AM, Raphael Hsieh wrote: > > Hi Harsha, > Did you have to bind the metrics consumer to the default StormUI metrics > at all? Or do those automagically get included ? > > Thanks! > > On Mon, Sep 22, 2014 at 10:33 AM, Otis Gospodnetic < > otis.gospodne...@gmail.com> wrote: > > Hi Gezim, > > On Fri, Sep 19, 2014 at 7:27 PM, Gezim Musliaj wrote: > > Hey Otis, I was just registered at sematext and I can say that this is > what I have been looking for.I have just one question, what about the > delays between the SPM and the Storm Cluster (if they do exist), whats the > worst case? I mean because these metrics are not calculated locally, but > using an internet connection. > > > > The worst case is that somebody unplugs your servers from the network, but > if that happens you have bigger problems to deal with. In all seriousness, > Storm (local) => SPM (remote/cloud/saas) is not really a problem -- lots of > people successfully use SPM for monitoring Storm, Hadoop, Kafka, and other > types of systems. > > Otis > -- > Monitoring * Alerting * Anomaly Detection * Centralized Log Management > Solr & Elasticsearch Support * http://sematext.com/ > > > > > Thanks ! > > On Sat, Sep 20, 2014 at 1:15 AM, Otis Gospodnetic < > otis.gospodne...@gmail.com> wrote: > > Raphael, > > Not sure if this is what you are after, but SPM <http://sematext.com/spm/> > will collect and graph all Storm metrics, let you do alerting and anomaly > detection on them, etc. If you want to graph custom metrics (e.g. > something from your bolts), you can send them in as custom metrics > <https://sematext.atlassian.net/wiki/display/PUBSPM/Custom+Metrics> and > again graph them, alert on them, do anomaly detection on them, stick them > on dashboards, etc. If you want to emit events from your bolts, you can send > events to SPM > <https://sematext.atlassian.net/wiki/display/PUBSPM/Events+Integration>, > too, or you can send them to Logsene <http://www.sematext.com/logsene/>... > can be handy for correlation with alerts and performance graphs when > troubleshooting. Here are some Storm metrics graph: > http://blog.sematext.com/2014/01/30/announcement-apache-storm-monitoring-in-spm/ > > I hope this helps. > > Otis > -- > Monitoring * Alerting * Anomaly Detection * Centralized Log Management > Solr & Elasticsearch Support * http://sematext.com/ > > > On Fri, Sep 19, 2014 at 6:12 PM, Raphael Hsieh > wrote: > > Hi, > Using Storm/Trident, how do I register a metrics consumer to log the data > I get in the StormUI ? > I want to look at historical data of my topology, for example the execute > latency of the topology over time, as this would give me good insight as to > where things might be going wrong when the system breaks. > > I have been following the steps outlined in the BigData CookBook here: > http://www.bigdata-cookbook.com/post/72320512609/storm-metrics-how-to > > However I am not wanting to create my own metrics, instead I just want to > log the metrics that already exist built in to Storm. It is unclear to me > how I am supposed to go about doing that. > > Thanks > > -- > Raphael Hsieh > > > > > > > > > > > > > > > -- > Raphael Hsieh > > > > > > -- Raphael Hsieh
Re: metrics consumer logging stormUI data
Hi Harsha, Did you have to bind the metrics consumer to the default StormUI metrics at all? Or do those automagically get included ? Thanks! On Mon, Sep 22, 2014 at 10:33 AM, Otis Gospodnetic < otis.gospodne...@gmail.com> wrote: > Hi Gezim, > > On Fri, Sep 19, 2014 at 7:27 PM, Gezim Musliaj wrote: > >> Hey Otis, I was just registered at sematext and I can say that this is >> what I have been looking for.I have just one question, what about the >> delays between the SPM and the Storm Cluster (if they do exist), whats the >> worst case? I mean because these metrics are not calculated locally, but >> using an internet connection. >> > > The worst case is that somebody unplugs your servers from the network, but > if that happens you have bigger problems to deal with. In all seriousness, > Storm (local) => SPM (remote/cloud/saas) is not really a problem -- lots of > people successfully use SPM for monitoring Storm, Hadoop, Kafka, and other > types of systems. > > Otis > -- > Monitoring * Alerting * Anomaly Detection * Centralized Log Management > Solr & Elasticsearch Support * http://sematext.com/ > > > > >> Thanks ! >> >> On Sat, Sep 20, 2014 at 1:15 AM, Otis Gospodnetic < >> otis.gospodne...@gmail.com> wrote: >> >>> Raphael, >>> >>> Not sure if this is what you are after, but SPM >>> <http://sematext.com/spm/> will collect and graph all Storm metrics, >>> let you do alerting and anomaly detection on them, etc. If you want to >>> graph custom metrics (e.g. something from your bolts), you can send them in >>> as custom metrics >>> <https://sematext.atlassian.net/wiki/display/PUBSPM/Custom+Metrics> and >>> again graph them, alert on them, do anomaly detection on them, stick them >>> on dashboards, etc. If you want to emit events from your bolts, you can >>> send >>> events to SPM >>> <https://sematext.atlassian.net/wiki/display/PUBSPM/Events+Integration>, >>> too, or you can send them to Logsene <http://www.sematext.com/logsene/>... >>> can be handy for correlation with alerts and performance graphs when >>> troubleshooting. Here are some Storm metrics graph: >>> http://blog.sematext.com/2014/01/30/announcement-apache-storm-monitoring-in-spm/ >>> >>> >>> I hope this helps. >>> >>> Otis >>> -- >>> Monitoring * Alerting * Anomaly Detection * Centralized Log Management >>> Solr & Elasticsearch Support * http://sematext.com/ >>> >>> >>> On Fri, Sep 19, 2014 at 6:12 PM, Raphael Hsieh >>> wrote: >>> >>>> Hi, >>>> Using Storm/Trident, how do I register a metrics consumer to log the >>>> data I get in the StormUI ? >>>> I want to look at historical data of my topology, for example the >>>> execute latency of the topology over time, as this would give me good >>>> insight as to where things might be going wrong when the system breaks. >>>> >>>> I have been following the steps outlined in the BigData CookBook here: >>>> http://www.bigdata-cookbook.com/post/72320512609/storm-metrics-how-to >>>> >>>> However I am not wanting to create my own metrics, instead I just want >>>> to log the metrics that already exist built in to Storm. It is unclear to >>>> me how I am supposed to go about doing that. >>>> >>>> Thanks >>>> >>>> -- >>>> Raphael Hsieh >>>> >>>> >>>> >>>> >>> >>> >> > -- Raphael Hsieh
Re: metrics consumer logging stormUI data
Thanks Harsha and Otis for your prompt responses. I'm looking to somehow log these metrics to use for an in-house monitoring system. I don't want to get user provided metrics just yet. >From what I've gathered from the big data cookbook is that I just want to create a metrics consumer to read these metrics and print it out to a log file. In order to do this I have added to my config: config.registerMetricsConsumer(LoggingMetricsConsumer.class, 2); which should create a loggingMetricsConsumer with a parallelism of 2 (I believe). I was lead to believe that these logs would be put in a file called "metrics.log". However after adding this to my topology I have been unable to find such a log. If someone could explain to me what I might be missing that would be great. Thanks! On Fri, Sep 19, 2014 at 4:27 PM, Gezim Musliaj wrote: > Hey Otis, I was just registered at sematext and I can say that this is > what I have been looking for.I have just one question, what about the > delays between the SPM and the Storm Cluster (if they do exist), whats the > worst case? I mean because these metrics are not calculated locally, but > using an internet connection. > Thanks ! > > On Sat, Sep 20, 2014 at 1:15 AM, Otis Gospodnetic < > otis.gospodne...@gmail.com> wrote: > >> Raphael, >> >> Not sure if this is what you are after, but SPM >> <http://sematext.com/spm/> will collect and graph all Storm metrics, let >> you do alerting and anomaly detection on them, etc. If you want to graph >> custom metrics (e.g. something from your bolts), you can send them in as >> custom >> metrics >> <https://sematext.atlassian.net/wiki/display/PUBSPM/Custom+Metrics> and >> again graph them, alert on them, do anomaly detection on them, stick them >> on dashboards, etc. If you want to emit events from your bolts, you can send >> events to SPM >> <https://sematext.atlassian.net/wiki/display/PUBSPM/Events+Integration>, >> too, or you can send them to Logsene <http://www.sematext.com/logsene/>... >> can be handy for correlation with alerts and performance graphs when >> troubleshooting. Here are some Storm metrics graph: >> http://blog.sematext.com/2014/01/30/announcement-apache-storm-monitoring-in-spm/ >> >> >> I hope this helps. >> >> Otis >> -- >> Monitoring * Alerting * Anomaly Detection * Centralized Log Management >> Solr & Elasticsearch Support * http://sematext.com/ >> >> >> On Fri, Sep 19, 2014 at 6:12 PM, Raphael Hsieh >> wrote: >> >>> Hi, >>> Using Storm/Trident, how do I register a metrics consumer to log the >>> data I get in the StormUI ? >>> I want to look at historical data of my topology, for example the >>> execute latency of the topology over time, as this would give me good >>> insight as to where things might be going wrong when the system breaks. >>> >>> I have been following the steps outlined in the BigData CookBook here: >>> http://www.bigdata-cookbook.com/post/72320512609/storm-metrics-how-to >>> >>> However I am not wanting to create my own metrics, instead I just want >>> to log the metrics that already exist built in to Storm. It is unclear to >>> me how I am supposed to go about doing that. >>> >>> Thanks >>> >>> -- >>> Raphael Hsieh >>> >>> >>> >>> >> >> > -- Raphael Hsieh
metrics consumer logging stormUI data
Hi, Using Storm/Trident, how do I register a metrics consumer to log the data I get in the StormUI ? I want to look at historical data of my topology, for example the execute latency of the topology over time, as this would give me good insight as to where things might be going wrong when the system breaks. I have been following the steps outlined in the BigData CookBook here: http://www.bigdata-cookbook.com/post/72320512609/storm-metrics-how-to However I am not wanting to create my own metrics, instead I just want to log the metrics that already exist built in to Storm. It is unclear to me how I am supposed to go about doing that. Thanks -- Raphael Hsieh
Re: Parallelism for KafkaSpout
how do you tell how many partitions are used in a specific bolt on the topology ? On Wed, Jul 23, 2014 at 2:15 PM, Nathan Leung wrote: > In your example, five spouts would get data and the other five would not. > On Jul 23, 2014 5:11 PM, "Kashyap Mhaisekar" wrote: > >> Hi, >> Is the no. of executors for KafkaSpout dependent on the partitions for >> the topic? >> For E.g., >> Say kafka TopicA has 5 partitions. >> If I have a KafkaSpout that has the parallelism hint set to 10, will all >> the executors get the data? Or am i constrained by the no. of partitions >> declared for the topic? >> >> Regards, >> Kashyap >> > -- Raphael Hsieh
Re: Naming Components In Trident Topology
Is there any follow up to this thread? http://mail-archives.apache.org/mod_mbox/storm-user/201312.mbox/%3C29605543.1418.1386695558006.JavaMail.Laurent@Laurent-PC%3E For some reason when I make names for each bolt, some of the names seem to persist to the down stream bolts and this gets confusing. Is there a way to clearly label each of my bolts with unique names that don't spill over to other bolts ? On Thu, Jun 26, 2014 at 7:22 AM, Justin Workman wrote: > Thanks for the replies. This does seem to work! > > > On Wed, Jun 25, 2014 at 5:47 PM, Sam wrote: > >> I believe you can just call #name. E.g. persistentAggregate( ... >> ).name( "name" ) >> -- >> From: Justin Workman >> Sent: 6/25/2014 8:56 AM >> To: user@storm.incubator.apache.org >> Subject: Naming Components In Trident Topology >> >> This is probably a silly question, that I am just overlooking the answer >> to. We are playing with our first trident topology running under storm >> 0.9.1. Is there a way to descriptively name the components so they are >> meaningful in the UI. Similar to how the components appear in the normal >> storm topology. >> >> Currently the UI shows >> >> $mastercoord-bg0 for the spout >> >> then the following for the bolts >> $spoutcoord-spout0 >> b-1 >> b-0 >> >> Is there anyway to make this more friendly. >> >> Thanks >> Justin >> > > -- Raphael Hsieh Amazon.com Software Development Engineer I (978) 764-9014
Re: Max Spout Pending
Is there a way to tell how many batches my topology processes per second ? Or for that matter how many tuples are processed per second ? Aside from creating a new bolt purely for that aggregation ? On Mon, Jul 14, 2014 at 2:08 PM, Carlos Rodriguez wrote: > Max spout pending config specifies how many *batches* can be processed > simultaneously by your topology. > Thats why 48,000 seems absurdly high to you. Divide it between the batch > size and you'll get the max spout pending config that you were expecting. > > > 2014-07-14 19:00 GMT+02:00 Raphael Hsieh : > > What is the optimal max spout pending to use in a topology ? >> I found this thread here: >> http://mail-archives.apache.org/mod_mbox/storm-user/201402.mbox/%3cca+avhzatfg_s88lkombvommkh-rafwr6szy0i8b8tm3rfab...@mail.gmail.com%3E >> that didn't seem to have a follow up. >> >> Part of it says to >> >> "Start with a max spout pending that is for sure too small -- one for >> trident, or the number of executors for storm -- and increase it until you >> stop seeing changes in the flow. You'll probably end up with something >> near 2*(throughput >> in recs/sec)*(end-to-end latency) (2x the Little's law capacity)." >> >> Does this make sense for a Max Spout Pending value ? >> I expect my topology to have a throughput of around 80,000/s and I've >> been seeing a complete latency of around 300ms, so given this formula, I'd >> want 2*8*.3 = 48,000 Max Spout Pending. >> >> This seems absurdly high to me.. >> >> -- >> Raphael Hsieh >> >> >> >> > > > > -- > Carlos Rodríguez > Developer at ENEO Tecnología > http://redborder.net/ > http://lnkd.in/bgfCVF9 > -- Raphael Hsieh
Max Spout Pending
What is the optimal max spout pending to use in a topology ? I found this thread here: http://mail-archives.apache.org/mod_mbox/storm-user/201402.mbox/%3cca+avhzatfg_s88lkombvommkh-rafwr6szy0i8b8tm3rfab...@mail.gmail.com%3E that didn't seem to have a follow up. Part of it says to "Start with a max spout pending that is for sure too small -- one for trident, or the number of executors for storm -- and increase it until you stop seeing changes in the flow. You'll probably end up with something near 2*(throughput in recs/sec)*(end-to-end latency) (2x the Little's law capacity)." Does this make sense for a Max Spout Pending value ? I expect my topology to have a throughput of around 80,000/s and I've been seeing a complete latency of around 300ms, so given this formula, I'd want 2*8*.3 = 48,000 Max Spout Pending. This seems absurdly high to me.. -- Raphael Hsieh
Re: Spout process latency
So the issue i'm having is that when my system is healthy my "spout0" bolt will have around 66ms of latency. Every once in awhile my upstream dependency that pushes data into a Kafka Stream will spike to maybe 10,000ms latency. However, instead of increasing my topology's latency by only 10,000ms mine's shoots up to 100,000ms roughly, and I'm not entirely sure why. I took a look at the Storm UI and it seems that the latency of my system isn't too bad at all, EXCEPT for the spout0 bolt. That guy jumps to around 150ms process latency. So I'm trying to figure out why this might be. On Wed, Jul 9, 2014 at 1:28 PM, Derek Dagit wrote: > It should be a windowed average measure of the time between when the > component receives a tuple and when it acks the tuple. > > This can be slower if there is batching, aggregating, or joining happening > (the component must wait for a number of other tuples to arrive before it > can ack). > > On the UI, there are tool tips that explain the measurements. They appear > after hovering over the label. > -- > Derek > > > On 7/9/14, 15:22, Raphael Hsieh wrote: > >> Can somebody explain to me what might cause the spout to have a large >> process latency? Currently my spout0 and $spoutcoord-spout0 have latency's >> higher than I would like. >> I'm consuming data from a Kafka stream. >> >> How is this process latency measured ? >> Is it measuring the amount of time it takes to fill a batch with data and >> send it to the first bolt in the topology? >> >> Thanks >> >> -- Raphael Hsieh Amazon.com Software Development Engineer I (978) 764-9014
Spout process latency
Can somebody explain to me what might cause the spout to have a large process latency? Currently my spout0 and $spoutcoord-spout0 have latency's higher than I would like. I'm consuming data from a Kafka stream. How is this process latency measured ? Is it measuring the amount of time it takes to fill a batch with data and send it to the first bolt in the topology? Thanks -- Raphael Hsieh
topology system metrics
Is there a way to get a hold of the topology's system metrics and send it to an external datastore such as dynamoDb ? -- Raphael Hsieh
Re: key values in PersistentAggregate
actually I think this is a non-issue, given the field exists in the stream already, I should be able to access it right ? On Wed, Jul 2, 2014 at 10:27 AM, Raphael Hsieh wrote: > From my understanding, if I implement my own state factory to use in > PersistentAggregate, the grouping fields will be the key values in the > state, > However, if I want to have access to other fields in the aggregation, how > might I get those ? From my understanding, doing a groupBy() will create a > new GroupedStream which will only have the fields specified in the > groupBy(). > > Essentially what I want to do is: > stream > .groupBy(new Fields("a")) > .persistentAggregate( > new Factory(), > new Fields("b"), > ... > ) > > How would I do this ? > > -- > Raphael Hsieh > > > > -- Raphael Hsieh Amazon.com Software Development Engineer I (978) 764-9014
key values in PersistentAggregate
>From my understanding, if I implement my own state factory to use in PersistentAggregate, the grouping fields will be the key values in the state, However, if I want to have access to other fields in the aggregation, how might I get those ? From my understanding, doing a groupBy() will create a new GroupedStream which will only have the fields specified in the groupBy(). Essentially what I want to do is: stream .groupBy(new Fields("a")) .persistentAggregate( new Factory(), new Fields("b"), ... ) How would I do this ? -- Raphael Hsieh
Re: using CachedMap in a trident state
If we don't serialize the data when we store it in the cache, doesn't that defeat the purpose of having an OpaqueValue in order to keep transactional consistency and the processed exactly once semantics? On Thu, Jun 12, 2014 at 8:57 AM, Raphael Hsieh wrote: > How come we only serialize data when storing it into the external > datastore and not the local cache then? > > > On Thu, Jun 12, 2014 at 7:56 AM, Romain Leroux > wrote: > >> >> https://github.com/nathanmarz/trident-memcached/blob/master/src/jvm/trident/memcached/MemcachedState.java#L62 >> localCacheSize sets the size of the LRU cache used locally, so no >> CachedMap data are not replicated among all host. >> >> The idea is that you use this on a grouped stream to store the result of >> your aggregation. >> So based on the hash (the groupBy fields) all related tuples always go to >> the same host. >> You want to avoid storing the same cached data in all hosts, otherwise >> scaling out is meaningless. >> >> >> >> 2014-06-12 6:58 GMT+09:00 Raphael Hsieh : >> >> I am imitating the MemCachedState class found here: >>> https://github.com/nathanmarz/trident-memcached/blob/master/src/jvm/trident/memcached/MemcachedState.java >>> . >>> >>> I was wondering what the CachedMap is used for. I see that it creates a >>> cache layer between the final datastore in order to retrieve values much >>> quicker than accessing the datastore every time, however I am unsure about >>> a couple details. >>> >>> Is this cache replicated among all hosts? When I do a 'multiGet' I >>> expect to retrieve data I had previously stored. If the cache is specific >>> to each host, I wouldn't necessarily get the same data I had most recently >>> stored. >>> >>> Also, how does this work with Opaque Transactional consistency? It seems >>> that in the MemCachedState example we serialize the data once we store it >>> to the external datastore, however the data in the cache is not serialized. >>> why is this? >>> Shouldn't the local cache have the same data as the external datastore ? >>> >>> thanks >>> -- >>> Raphael Hsieh >>> >>> >>> >>> >> >> > > > -- > Raphael Hsieh > > -- Raphael Hsieh Amazon.com Software Development Engineer I (978) 764-9014
Re: using CachedMap in a trident state
How come we only serialize data when storing it into the external datastore and not the local cache then? On Thu, Jun 12, 2014 at 7:56 AM, Romain Leroux wrote: > > https://github.com/nathanmarz/trident-memcached/blob/master/src/jvm/trident/memcached/MemcachedState.java#L62 > localCacheSize sets the size of the LRU cache used locally, so no > CachedMap data are not replicated among all host. > > The idea is that you use this on a grouped stream to store the result of > your aggregation. > So based on the hash (the groupBy fields) all related tuples always go to > the same host. > You want to avoid storing the same cached data in all hosts, otherwise > scaling out is meaningless. > > > > 2014-06-12 6:58 GMT+09:00 Raphael Hsieh : > > I am imitating the MemCachedState class found here: >> https://github.com/nathanmarz/trident-memcached/blob/master/src/jvm/trident/memcached/MemcachedState.java >> . >> >> I was wondering what the CachedMap is used for. I see that it creates a >> cache layer between the final datastore in order to retrieve values much >> quicker than accessing the datastore every time, however I am unsure about >> a couple details. >> >> Is this cache replicated among all hosts? When I do a 'multiGet' I expect >> to retrieve data I had previously stored. If the cache is specific to each >> host, I wouldn't necessarily get the same data I had most recently stored. >> >> Also, how does this work with Opaque Transactional consistency? It seems >> that in the MemCachedState example we serialize the data once we store it >> to the external datastore, however the data in the cache is not serialized. >> why is this? >> Shouldn't the local cache have the same data as the external datastore ? >> >> thanks >> -- >> Raphael Hsieh >> >> >> >> > > -- Raphael Hsieh
using CachedMap in a trident state
I am imitating the MemCachedState class found here: https://github.com/nathanmarz/trident-memcached/blob/master/src/jvm/trident/memcached/MemcachedState.java . I was wondering what the CachedMap is used for. I see that it creates a cache layer between the final datastore in order to retrieve values much quicker than accessing the datastore every time, however I am unsure about a couple details. Is this cache replicated among all hosts? When I do a 'multiGet' I expect to retrieve data I had previously stored. If the cache is specific to each host, I wouldn't necessarily get the same data I had most recently stored. Also, how does this work with Opaque Transactional consistency? It seems that in the MemCachedState example we serialize the data once we store it to the external datastore, however the data in the cache is not serialized. why is this? Shouldn't the local cache have the same data as the external datastore ? thanks -- Raphael Hsieh
is storm.trident LRUMap distributed among hosts?
Is the storm.trident.util.LRUMap distributed among all the hosts in the storm cluster ? if not, is there any way to combine this with a memcache ? -- Raphael Hsieh
Re: how does PersistentAggregate distribute the DB Calls ?
Thanks for your quick reply nathan. So I'm doing some debugging of my topology, and I've removed all the logic from my MultiPut function, replacing it with a single System.out.println() Then i am monitoring my logs to check when this gets printed out. It looks like every single one of my hosts (workers) hits this. Does this then indicate that I am processing many many partitions that each hit this multiPut and prints out? Thanks. On Tue, Jun 3, 2014 at 3:29 PM, Nathan Marz wrote: > When possible it will do as much aggregation Storm-side so as to minimize > amount it needs to interact with database. So if you do a persistent global > count, for example, it will compute the count for the batch (in parallel), > and then the task that finishes the global count will do a single > get/update/put to the database. > > > On Tue, Jun 3, 2014 at 3:08 PM, Raphael Hsieh > wrote: > >> How does PersistentAggregate distribute the database calls across all the >> worked nodes ? >> Does it do the global aggregation then choose a single host to do a >> multiget/multiput to the external db ? >> >> Thanks >> -- >> Raphael Hsieh >> >> >> >> > > > > -- > Twitter: @nathanmarz > http://nathanmarz.com > -- Raphael Hsieh
how does PersistentAggregate distribute the DB Calls ?
How does PersistentAggregate distribute the database calls across all the worked nodes ? Does it do the global aggregation then choose a single host to do a multiget/multiput to the external db ? Thanks -- Raphael Hsieh
Batch size never seems to increase
I'm trying to figure out why my batch size never seems to get any bigger than 83K tuples. It could be because this is the throughput of my spout, however I don't believe this to be the case as I believe the spout is backing up (i'm not processing the tuples as quickly as they are being produced) Currently I'm just using a barebones topology that looks like this: Stream spout = topology.newStream("...", ...) .parallelismHint(x) .groupBy("new Fields("time")) .aggregate(new Count(), new Fields("count")) .parallelismHint(x) .each(new Fields("time", "count"), new PrintFilter()); All the stream is doing is aggregating on like timestamps and printing out the count. in my config I've set batch size to 10mb like so: Config config = new Config(); config.(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 1024*1024*10); when I have the batch size to 5mb or even 1mb there is no difference, everything always adds up to roughly 83K tuples. in order to count up how many tuples are in the batch, I take a look at the system timestamp of when things are printed out (in the print filter) and all the print statements that have the same timestamp, I add the count values up together. When I compare the system timestamp of when the batch was processed, and the tuple timestamps (that they were aggregated on) I am falling behind. This leads me to believe that the spout is emitting more than the number of tuples I am processing, so there should be more than 83K tuples per batch. If anyone has insight to this it would be greatly appreciated. Thanks! -- Raphael Hsieh
Re: Optimizing Kafka Stream
Oh ok. Thanks Chi! Do you have any ideas about why my batch size never seems to get any bigger than 83K tuples ? Currently I'm just using a barebones topology that looks like this: Stream spout = topology.newStream("...", ...) .parallelismHint() .groupBy("new Fields("time")) .aggregate(new Count(), new Fields("Count")) .parallelismHint() .each(new Fields("time", "count"), new PrintFilter()); All the stream is doing is aggregating on like timestamps and printing out the count. in my config I've set batch size to 10mb like so: Config config = new Config(); config.(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 1024*1024*10); when I have the batch size to 5mb or even 1mb there is no difference, everything always adds up to roughly 83K tuples. in order to count up how many tuples are in the batch, I take a look at the system timestamp of when things are printed out (in the print filter) and all the print statements that have the same timestamp, I add the count values up together. On Mon, Jun 2, 2014 at 10:09 AM, Chi Hoang wrote: > Raphael, > The number of partitions is defined in your Kafka configuration - > http://kafka.apache.org/documentation.html#brokerconfigs (num.partitions) > - or when you create the topic. The behavior is different for each version > of Kafka, so you should read more documentation. Your topology needs to > match the Kafka configuration for the topic. > > Chi > > > On Mon, Jun 2, 2014 at 8:46 AM, Raphael Hsieh > wrote: > >> Thanks for the tips Chi, >> I'm a little confused about the partitioning. I had thought that the >> number of partitions was determined by the amount of parallelism in the >> topology. For example if I said .parallelismHint(4), then I would have 4 >> different partitions. Is this not the case ? >> Is there a set number of partitions my topology has that I need to >> increase in order to have higher parallelism ? >> >> Thanks >> >> >> On Sat, May 31, 2014 at 11:50 AM, Chi Hoang wrote: >> >>> Raphael, >>> You can try tuning your parallelism (and num workers). >>> >>> For Kafka 0.7, your spout parallelism could max out at: # brokers x # >>> partitions (for the topic). If you have 4 Kafka brokers, and your topic >>> has 5 partitions, then you could set the spout parallelism to 20 to >>> maximize the throughput. >>> >>> For Kafka 0.8+, your spout parallelism could max out at # partitions for >>> the topic, so if your topic has 5 partitions, then you would set the spout >>> parallelism to 5. To increase parallelism, you would need to increase the >>> number of partitions for your topic (by using the add partitions utility). >>> >>> As for the number of workers, setting it to 1 means that your spout will >>> only run on a single Storm node, and would likely share resources with >>> other Storm processes (spouts and bolts). I recommend to increase the >>> number of workers so Storm has a chance to spread out the work, and keep a >>> good balance. >>> >>> Hope this helps. >>> >>> Chi >>> >>> >>> On Fri, May 30, 2014 at 4:24 PM, Raphael Hsieh >>> wrote: >>> >>>> I am in the process of optimizing my stream. Currently I expect 5 000 >>>> 000 tuples to come out of my spout per minute. I am trying to beef up my >>>> topology in order to process this in real time without falling behind. >>>> >>>> For some reason my batch size is capping out at 83 thousand tuples. I >>>> can't seem to make it any bigger. the processing time doesn't seem to get >>>> any smaller than 2-3 seconds either. >>>> I'm not sure how to configure the topology to get any faster / more >>>> efficient. >>>> >>>> Currently all the topology does is a groupby on time and an aggregation >>>> (Count) to aggregate everything. >>>> >>>> Here are some data points i've figured out. >>>> >>>> Batch Size:5mb >>>> num-workers: 1 >>>> parallelismHint: 2 >>>> (I'll write this a 5mb, 1, 2) >>>> >>>> 5mb, 1, 2 = 83K tuples / 6s >>>> 10mb, 1, 2 = 83k / 7s >>>> 5mb, 1, 4 = 83k / 6s >>>> 5mb, 2, 4 = 83k / 3s >>>> 5mb, 3, 6 = 83k / 3s >>>> 10mb, 3, 6 = 83k / 3s >>>> >>>> Can anybody help me figure out how to get it to process things faster ? >>>> >>>> My maxSpoutPending is at 1, but when I increased it to 2 it was the >>>> same. MessageTimeoutSec = 100 >>>> >>>> I've been following this blog: https://gist.github.com/mrflip/5958028 >>>> to an extent, not everything word for word though. >>>> >>>> I need to be able to process around 66,000 tuples per second and I'm >>>> starting to run out of ideas. >>>> >>>> Thanks >>>> >>>> -- >>>> Raphael Hsieh >>>> >>>> >>>> >>> >>> >>> >> >> >> -- >> Raphael Hsieh >> >> >> >> > > > > -- > Data Systems Engineering > data-syst...@groupon.com > -- Raphael Hsieh
Re: Optimizing Kafka Stream
Thanks for the tips Chi, I'm a little confused about the partitioning. I had thought that the number of partitions was determined by the amount of parallelism in the topology. For example if I said .parallelismHint(4), then I would have 4 different partitions. Is this not the case ? Is there a set number of partitions my topology has that I need to increase in order to have higher parallelism ? Thanks On Sat, May 31, 2014 at 11:50 AM, Chi Hoang wrote: > Raphael, > You can try tuning your parallelism (and num workers). > > For Kafka 0.7, your spout parallelism could max out at: # brokers x # > partitions (for the topic). If you have 4 Kafka brokers, and your topic > has 5 partitions, then you could set the spout parallelism to 20 to > maximize the throughput. > > For Kafka 0.8+, your spout parallelism could max out at # partitions for > the topic, so if your topic has 5 partitions, then you would set the spout > parallelism to 5. To increase parallelism, you would need to increase the > number of partitions for your topic (by using the add partitions utility). > > As for the number of workers, setting it to 1 means that your spout will > only run on a single Storm node, and would likely share resources with > other Storm processes (spouts and bolts). I recommend to increase the > number of workers so Storm has a chance to spread out the work, and keep a > good balance. > > Hope this helps. > > Chi > > > On Fri, May 30, 2014 at 4:24 PM, Raphael Hsieh > wrote: > >> I am in the process of optimizing my stream. Currently I expect 5 000 000 >> tuples to come out of my spout per minute. I am trying to beef up my >> topology in order to process this in real time without falling behind. >> >> For some reason my batch size is capping out at 83 thousand tuples. I >> can't seem to make it any bigger. the processing time doesn't seem to get >> any smaller than 2-3 seconds either. >> I'm not sure how to configure the topology to get any faster / more >> efficient. >> >> Currently all the topology does is a groupby on time and an aggregation >> (Count) to aggregate everything. >> >> Here are some data points i've figured out. >> >> Batch Size:5mb >> num-workers: 1 >> parallelismHint: 2 >> (I'll write this a 5mb, 1, 2) >> >> 5mb, 1, 2 = 83K tuples / 6s >> 10mb, 1, 2 = 83k / 7s >> 5mb, 1, 4 = 83k / 6s >> 5mb, 2, 4 = 83k / 3s >> 5mb, 3, 6 = 83k / 3s >> 10mb, 3, 6 = 83k / 3s >> >> Can anybody help me figure out how to get it to process things faster ? >> >> My maxSpoutPending is at 1, but when I increased it to 2 it was the same. >> MessageTimeoutSec = 100 >> >> I've been following this blog: https://gist.github.com/mrflip/5958028 >> to an extent, not everything word for word though. >> >> I need to be able to process around 66,000 tuples per second and I'm >> starting to run out of ideas. >> >> Thanks >> >> -- >> Raphael Hsieh >> >> >> > > > -- Raphael Hsieh
Optimizing Kafka Stream
I am in the process of optimizing my stream. Currently I expect 5 000 000 tuples to come out of my spout per minute. I am trying to beef up my topology in order to process this in real time without falling behind. For some reason my batch size is capping out at 83 thousand tuples. I can't seem to make it any bigger. the processing time doesn't seem to get any smaller than 2-3 seconds either. I'm not sure how to configure the topology to get any faster / more efficient. Currently all the topology does is a groupby on time and an aggregation (Count) to aggregate everything. Here are some data points i've figured out. Batch Size:5mb num-workers: 1 parallelismHint: 2 (I'll write this a 5mb, 1, 2) 5mb, 1, 2 = 83K tuples / 6s 10mb, 1, 2 = 83k / 7s 5mb, 1, 4 = 83k / 6s 5mb, 2, 4 = 83k / 3s 5mb, 3, 6 = 83k / 3s 10mb, 3, 6 = 83k / 3s Can anybody help me figure out how to get it to process things faster ? My maxSpoutPending is at 1, but when I increased it to 2 it was the same. MessageTimeoutSec = 100 I've been following this blog: https://gist.github.com/mrflip/5958028 to an extent, not everything word for word though. I need to be able to process around 66,000 tuples per second and I'm starting to run out of ideas. Thanks -- Raphael Hsieh
Re: Position in Kafka Stream
Thanks Tyson! This blog is super helpful. I've been able to get LoggingMetrics working to an extent, however if I try to create multiple CountMetrics in the same function, I only see one show up in my NimbusUI. Does anybody know why this is ? On Thu, May 29, 2014 at 8:57 AM, Tyson Norris wrote: > I found this blog helpful: > http://www.bigdata-cookbook.com/post/72320512609/storm-metrics-how-to > > Best regards, > Tyson > > On May 29, 2014, at 8:41 AM, Raphael Hsieh wrote: > > Can someone explain to me what LoggingMetrics is ? > I've heard of it and people have told me to use it, but I can't find any > documentation on it or any resources on how to use it. > > Thanks > > > On Thu, May 29, 2014 at 12:06 AM, Tyson Norris wrote: > >> Hi - >> Thanks - it turns out that the JSON parsing is actually fine with HEAD, >> although inaccurate without the required message format (comments mention >> expecting an “s” property with timestamp value). >> >> My problem was that I was not specifying the spout root properly, >> i.e. --spoutroot /transactional//user/ (in my case I had >> specified a path that was valid, but not a spout) >> >> Now I get offset info properly via monitor.py - Thanks! >> >> Tyson >> >> On May 28, 2014, at 10:12 AM, Cody A. Ray wrote: >> >> Right, its trying to read your kafka messages and parse as JSON. See >> the error: >> >> simplejson.scanner.JSONDecodeError: Expecting value: line 1 column 1 >> (char 0) >> >> If you want to use the BrightTag branch, you'll need to go a couple >> commits back. Try this: >> >> git clone https://github.com/BrightTag/stormkafkamon >> git checkout 07eede9ec72329fe2cad893d087541b583e11148 >> >> -Cody >> >> >> On Wed, May 28, 2014 at 10:39 AM, Tyson Norris wrote: >> >>> Thanks Cody - >>> I tried the BrightTag fork and still have problems with >>> storm 0.9.1-incubating and kafka 0.8.1, I get an error with my trident >>> topology (haven’t tried non-trident yet): >>> (venv)tnorris-osx:stormkafkamon tnorris$ ./monitor.py --topology >>> TrendingTagTopology --spoutroot storm --friendly >>> Traceback (most recent call last): >>> File "./monitor.py", line 112, in >>> sys.exit(main()) >>> File "./monitor.py", line 96, in main >>> zk_data = process(zc.spouts(options.spoutroot, options.topology)) >>> File "/git/github/stormkafkamon/stormkafkamon/zkclient.py", line 76, >>> in spouts >>> j = json.loads(self.client.get(self._zjoin([spout_root, c, p]))[0]) >>> File >>> "/git/github/stormkafkamon/venv/lib/python2.7/site-packages/simplejson/__init__.py", >>> line 501, in loads >>> return _default_decoder.decode(s) >>> File >>> "/git/github/stormkafkamon/venv/lib/python2.7/site-packages/simplejson/decoder.py", >>> line 370, in decode >>> obj, end = self.raw_decode(s) >>> File >>> "/git/github/stormkafkamon/venv/lib/python2.7/site-packages/simplejson/decoder.py", >>> line 389, in raw_decode >>> return self.scan_once(s, idx=_w(s, idx).end()) >>> simplejson.scanner.JSONDecodeError: Expecting value: line 1 column 1 >>> (char 0) >>> (venv)tnorris-osx:stormkafkamon tnorris$ >>> >>> I’m not too familiar with python but will try to debug it as time >>> allows - let me know if you have advice. >>> >>> Thanks >>> Tyson >>> >>> >>> >>> >>> On May 28, 2014, at 7:20 AM, Cody A. Ray wrote: >>> >>> You can also use stormkafkamon to track this stuff. Its not good for >>> historical analysis like graphite/ganglia, but its good if you just want to >>> see how things currently stand. >>> >>> The original: https://github.com/otoolep/stormkafkamon >>> >>> This didn't work for us without some updates (incompatibility with the >>> latest python-kafka dep). Here are those updates: >>> https://github.com/BrightTag/stormkafkamon/commit/07eede9ec72329fe2cad893d087541b583e11148 >>> >>> (Our branch has a couple more things that parse the kafka messages >>> with our format (which embeds a timestamp) to determine how long (in time) >>> storm is behind... planning to clean that up soon so it can be a bit more >>> reusable) >>> >>> https://github.com/BrightTag/stormkafkamon >>> >>> -Cody >>>
Re: Nimbus UI fields
what might cause a tuple to be 'Acked' vs jus 'Executed'? How should I interpret these values ? Thanks On Tue, May 20, 2014 at 9:20 PM, Cody A. Ray wrote: > The two bolts which emit/transfer 0 are likely your persistentAggregate > bolts. These are *sinks* so they don't logically emit/transfer tuples any > farther. > > You can add add a name which will show up in the UI to help you see how > Trident compiles into your Storm topology. > > .name("Aggregator 1") > .persistentAggregate(...) > > .name("Aggregator 2") > .persistentAggregate(...) > > -Cody > > > On Tue, May 20, 2014 at 7:38 PM, Harsha wrote: > >> Executed refers to number of incoming tuples processed. >> >> capacity is determined by (executed * latency) / window (time duration). >> >> UI should give you description of those stats if you hover over table >> headers. >> >> >> >> >> On Tue, May 20, 2014, at 03:36 PM, Raphael Hsieh wrote: >> >> I reattached the previous image in case it was too difficult to read >> before >> >> >> On Tue, May 20, 2014 at 3:31 PM, Raphael Hsieh >> wrote: >> >> Hi I'm confused as to what each field in the StormUI represents and how >> to use the information. >> [image: Inline image 1] >> >> The bolts I have above are formed from trident. This is what operations I >> believe each bolt represents >> b-0 : .each(function) -> .each(filter) >> b-1 : .aggregate >> --split-- >> b-2 : .persistentAggregate >> b-3 : .persistentAggregate >> >> What does it mean for the first two bolts to emit and transfer 0 ? >> What is the Capacity field ? What does that represent ? >> Does Execute refer to the tuples acked and successfully processed? >> >> Thanks >> -- >> Raphael Hsieh >> >> >> >> >> >> >> >> -- >> Raphael Hsieh >> >> >> >> >> Email had 2 attachments: >> >>- image.png >>- 41k (image/png) >>- NimbusUI.PNG >>- 22k (image/png) >> >> > > > -- > Cody A. Ray, LEED AP > cody.a@gmail.com > 215.501.7891 > -- Raphael Hsieh Amazon.com Software Development Engineer I (978) 764-9014
Re: Position in Kafka Stream
Can someone explain to me what LoggingMetrics is ? I've heard of it and people have told me to use it, but I can't find any documentation on it or any resources on how to use it. Thanks On Thu, May 29, 2014 at 12:06 AM, Tyson Norris wrote: > Hi - > Thanks - it turns out that the JSON parsing is actually fine with HEAD, > although inaccurate without the required message format (comments mention > expecting an “s” property with timestamp value). > > My problem was that I was not specifying the spout root properly, > i.e. --spoutroot /transactional//user/ (in my case I had > specified a path that was valid, but not a spout) > > Now I get offset info properly via monitor.py - Thanks! > > Tyson > > On May 28, 2014, at 10:12 AM, Cody A. Ray wrote: > > Right, its trying to read your kafka messages and parse as JSON. See > the error: > > simplejson.scanner.JSONDecodeError: Expecting value: line 1 column 1 > (char 0) > > If you want to use the BrightTag branch, you'll need to go a couple > commits back. Try this: > > git clone https://github.com/BrightTag/stormkafkamon > git checkout 07eede9ec72329fe2cad893d087541b583e11148 > > -Cody > > > On Wed, May 28, 2014 at 10:39 AM, Tyson Norris wrote: > >> Thanks Cody - >> I tried the BrightTag fork and still have problems with >> storm 0.9.1-incubating and kafka 0.8.1, I get an error with my trident >> topology (haven’t tried non-trident yet): >> (venv)tnorris-osx:stormkafkamon tnorris$ ./monitor.py --topology >> TrendingTagTopology --spoutroot storm --friendly >> Traceback (most recent call last): >> File "./monitor.py", line 112, in >> sys.exit(main()) >> File "./monitor.py", line 96, in main >> zk_data = process(zc.spouts(options.spoutroot, options.topology)) >> File "/git/github/stormkafkamon/stormkafkamon/zkclient.py", line 76, in >> spouts >> j = json.loads(self.client.get(self._zjoin([spout_root, c, p]))[0]) >> File >> "/git/github/stormkafkamon/venv/lib/python2.7/site-packages/simplejson/__init__.py", >> line 501, in loads >> return _default_decoder.decode(s) >> File >> "/git/github/stormkafkamon/venv/lib/python2.7/site-packages/simplejson/decoder.py", >> line 370, in decode >> obj, end = self.raw_decode(s) >> File >> "/git/github/stormkafkamon/venv/lib/python2.7/site-packages/simplejson/decoder.py", >> line 389, in raw_decode >> return self.scan_once(s, idx=_w(s, idx).end()) >> simplejson.scanner.JSONDecodeError: Expecting value: line 1 column 1 >> (char 0) >> (venv)tnorris-osx:stormkafkamon tnorris$ >> >> I’m not too familiar with python but will try to debug it as time >> allows - let me know if you have advice. >> >> Thanks >> Tyson >> >> >> >> >> On May 28, 2014, at 7:20 AM, Cody A. Ray wrote: >> >> You can also use stormkafkamon to track this stuff. Its not good for >> historical analysis like graphite/ganglia, but its good if you just want to >> see how things currently stand. >> >> The original: https://github.com/otoolep/stormkafkamon >> >> This didn't work for us without some updates (incompatibility with the >> latest python-kafka dep). Here are those updates: >> https://github.com/BrightTag/stormkafkamon/commit/07eede9ec72329fe2cad893d087541b583e11148 >> >> (Our branch has a couple more things that parse the kafka messages with >> our format (which embeds a timestamp) to determine how long (in time) storm >> is behind... planning to clean that up soon so it can be a bit more >> reusable) >> >> https://github.com/BrightTag/stormkafkamon >> >> -Cody >> >> >> On Wed, May 28, 2014 at 4:50 AM, Danijel Schiavuzzi < >> dani...@schiavuzzi.com> wrote: >> >>> Yes, Trident Kafka spouts give you the same metrics. Take a look at the >>> code to find out what's available. >>> >>> >>> On Wed, May 28, 2014 at 3:55 AM, Tyson Norris wrote: >>> >>>> Do Trident variants of kafka spouts do something similar? >>>> Thanks >>>> Tyson >>>> >>>> > On May 27, 2014, at 3:19 PM, "Harsha" wrote: >>>> > >>>> > Raphael, >>>> >kafka spout sends metrics for kafkaOffset and kafkaPartition >>>> you can look at those by using LoggingMetrics or setting up a ganglia. >>>> Kafka uses its own zookeeper to store state info per topic & group.id >>>
Re: Trident, ZooKeeper and Kafka
By setting forceFromStart to true, aren't I telling it to start from the beginning or earliest time then ? On Thu, May 29, 2014 at 12:59 AM, Danijel Schiavuzzi wrote: > You must set both forceFromStart to true and startOffsetTime to -1 or -2. > > > On Thu, May 29, 2014 at 12:23 AM, Raphael Hsieh > wrote: > >> I'm doing both tridentKafkaConfig.forceFromStart = false; as well as >> tridentKafkaConfig.startOffsetTime = -1; >> >> Neither are working for me. Looking at my nimbus UI, I still get a large >> spike in processed data, before it levels off and seems to not process >> anything. >> >> >> >> On Wed, May 28, 2014 at 3:17 PM, Shaikh Riyaz >> wrote: >> >>> I think you can use kafkaConfig.forceFromStart = *false*; >>> >>> We have implemented this and its working fine. >>> >>> Regards, >>> Riyaz >>> >>> >>> >>> On Thu, May 29, 2014 at 1:02 AM, Raphael Hsieh >>> wrote: >>> >>>> This is still not working for me. I've set the offset to -1 and it is >>>> still backfilling data. >>>> Is there any documentation on the start offsets that I could take a >>>> look at ? >>>> Or even documentation on kafka.api.OffsetRequest.LatestTime() ? >>>> >>>> >>>> On Wed, May 28, 2014 at 1:01 PM, Raphael Hsieh >>>> wrote: >>>> >>>>> would the Trident version of this be >>>>> tridentKafkaConfig.startOffsetTime ? >>>>> >>>>> >>>>> On Wed, May 28, 2014 at 12:23 PM, Danijel Schiavuzzi < >>>>> dani...@schiavuzzi.com> wrote: >>>>> >>>>>> By default, the Kafka spout resumes consuming where it last left off. >>>>>> That offset is stored in ZooKeeper. >>>>>> >>>>>> You can set forceStartOffset to -2 to start consuming from the >>>>>> earliest available offset, or -1 to start consuming from the latest >>>>>> available offset. >>>>>> >>>>>> >>>>>> On Wednesday, May 28, 2014, Raphael Hsieh >>>>>> wrote: >>>>>> >>>>>>> If I don't tell trident to start consuming data from the beginning >>>>>>> of the Kafka stream, where does it start from? >>>>>>> If I were to do: >>>>>>>tridentKafkaConfig.forceFromStart = true; >>>>>>> Then it will tell the spout to start consuming from the start of the >>>>>>> stream. If that is not set, then where does it start consuming from? and >>>>>>> How might I go about telling it to start consuming from the very end of >>>>>>> the >>>>>>> stream? >>>>>>> >>>>>>> If a disaster were to happen and all my hosts died, when I start my >>>>>>> cluster back up, it might start consuming from where it left off. I >>>>>>> would >>>>>>> rather manually process that old data, and have my storm system start >>>>>>> processing the live data. >>>>>>> >>>>>>> Thanks >>>>>>> -- >>>>>>> Raphael Hsieh >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Danijel Schiavuzzi >>>>>> >>>>>> E: dani...@schiavuzzi.com >>>>>> W: www.schiavuzzi.com >>>>>> T: +385989035562 >>>>>> Skype: danijels7 >>>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> Raphael Hsieh >>>>> Amazon.com >>>>> Software Development Engineer I >>>>> (978) 764-9014 >>>>> >>>>> >>>>> >>>>> >>>> >>>> >>>> >>>> -- >>>> Raphael Hsieh >>>> Amazon.com >>>> Software Development Engineer I >>>> (978) 764-9014 >>>> >>>> >>>> >>>> >>> >>> >>> >>> -- >>> Regards, >>> >>> Riyaz >>> >>> >> >> >> -- >> Raphael Hsieh >> >> >> > > > > -- > Danijel Schiavuzzi > > E: dani...@schiavuzzi.com > W: www.schiavuzzi.com > T: +385989035562 > Skype: danijels7 > -- Raphael Hsieh Amazon.com Software Development Engineer I (978) 764-9014
Re: Different ZooKeeper Cluster for storm and kafka ?
Never mind I figured this out. Thanks On Wed, May 28, 2014 at 3:59 PM, Raphael Hsieh wrote: > Hi I believe it is possible to have my Storm topology run on a different > ZooKeeper cluster than the source of my data (this case being Kafka). I > cannot seem to find documentation on how to do this? > > Can anybody help me figure this out? Or point me to some docs that will > explain this? > > Thanks > -- > Raphael Hsieh > > > > -- Raphael Hsieh Amazon.com Software Development Engineer I (978) 764-9014
Different ZooKeeper Cluster for storm and kafka ?
Hi I believe it is possible to have my Storm topology run on a different ZooKeeper cluster than the source of my data (this case being Kafka). I cannot seem to find documentation on how to do this? Can anybody help me figure this out? Or point me to some docs that will explain this? Thanks -- Raphael Hsieh
Re: Trident, ZooKeeper and Kafka
I'm doing both tridentKafkaConfig.forceFromStart = false; as well as tridentKafkaConfig.startOffsetTime = -1; Neither are working for me. Looking at my nimbus UI, I still get a large spike in processed data, before it levels off and seems to not process anything. On Wed, May 28, 2014 at 3:17 PM, Shaikh Riyaz wrote: > I think you can use kafkaConfig.forceFromStart = *false*; > > We have implemented this and its working fine. > > Regards, > Riyaz > > > > On Thu, May 29, 2014 at 1:02 AM, Raphael Hsieh wrote: > >> This is still not working for me. I've set the offset to -1 and it is >> still backfilling data. >> Is there any documentation on the start offsets that I could take a look >> at ? >> Or even documentation on kafka.api.OffsetRequest.LatestTime() ? >> >> >> On Wed, May 28, 2014 at 1:01 PM, Raphael Hsieh wrote: >> >>> would the Trident version of this be >>> tridentKafkaConfig.startOffsetTime ? >>> >>> >>> On Wed, May 28, 2014 at 12:23 PM, Danijel Schiavuzzi < >>> dani...@schiavuzzi.com> wrote: >>> >>>> By default, the Kafka spout resumes consuming where it last left off. >>>> That offset is stored in ZooKeeper. >>>> >>>> You can set forceStartOffset to -2 to start consuming from the earliest >>>> available offset, or -1 to start consuming from the latest available >>>> offset. >>>> >>>> >>>> On Wednesday, May 28, 2014, Raphael Hsieh wrote: >>>> >>>>> If I don't tell trident to start consuming data from the beginning of >>>>> the Kafka stream, where does it start from? >>>>> If I were to do: >>>>>tridentKafkaConfig.forceFromStart = true; >>>>> Then it will tell the spout to start consuming from the start of the >>>>> stream. If that is not set, then where does it start consuming from? and >>>>> How might I go about telling it to start consuming from the very end of >>>>> the >>>>> stream? >>>>> >>>>> If a disaster were to happen and all my hosts died, when I start my >>>>> cluster back up, it might start consuming from where it left off. I would >>>>> rather manually process that old data, and have my storm system start >>>>> processing the live data. >>>>> >>>>> Thanks >>>>> -- >>>>> Raphael Hsieh >>>>> >>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> Danijel Schiavuzzi >>>> >>>> E: dani...@schiavuzzi.com >>>> W: www.schiavuzzi.com >>>> T: +385989035562 >>>> Skype: danijels7 >>>> >>> >>> >>> >>> -- >>> Raphael Hsieh >>> Amazon.com >>> Software Development Engineer I >>> (978) 764-9014 >>> >>> >>> >>> >> >> >> >> -- >> Raphael Hsieh >> Amazon.com >> Software Development Engineer I >> (978) 764-9014 >> >> >> >> > > > > -- > Regards, > > Riyaz > > -- Raphael Hsieh
Re: logging 'failed' tuples in mastercoord-bg0
thanks for your help Taylor, Do you think you could point me to some documentation on where I can set those values in Storm Trident? I can't seem to find anything or figure that out. Thanks On Wed, May 28, 2014 at 2:58 PM, P. Taylor Goetz wrote: > "Silent replays" are usually a sign of batches timing out. > > By default storm uses a timeout value of thirty seconds. > > Try upping that value and setting TOPOLOGY_SPOUT_MAX_PENDING to a very low > value like 1. In trident that controls how many batches can be in-flight at > a time. > > -Taylor > > > On May 28, 2014, at 5:24 PM, Raphael Hsieh wrote: > > > > Is there a way to add logging to the master coordinator? > > For some reason my spout is failing batches and not giving me any error > messages. > > The log files on the host don't provide any error messages and I'm not > sure where the logic for this resides in Storm-Trident. > > > > Is there a particular string other than 'failed' that I can grep for? > > > > Thanks > > -- > > Raphael Hsieh > > > > > -- Raphael Hsieh
Re: Trident, ZooKeeper and Kafka
This is still not working for me. I've set the offset to -1 and it is still backfilling data. Is there any documentation on the start offsets that I could take a look at ? Or even documentation on kafka.api.OffsetRequest.LatestTime() ? On Wed, May 28, 2014 at 1:01 PM, Raphael Hsieh wrote: > would the Trident version of this be > tridentKafkaConfig.startOffsetTime ? > > > On Wed, May 28, 2014 at 12:23 PM, Danijel Schiavuzzi < > dani...@schiavuzzi.com> wrote: > >> By default, the Kafka spout resumes consuming where it last left off. >> That offset is stored in ZooKeeper. >> >> You can set forceStartOffset to -2 to start consuming from the earliest >> available offset, or -1 to start consuming from the latest available offset. >> >> >> On Wednesday, May 28, 2014, Raphael Hsieh wrote: >> >>> If I don't tell trident to start consuming data from the beginning of >>> the Kafka stream, where does it start from? >>> If I were to do: >>>tridentKafkaConfig.forceFromStart = true; >>> Then it will tell the spout to start consuming from the start of the >>> stream. If that is not set, then where does it start consuming from? and >>> How might I go about telling it to start consuming from the very end of the >>> stream? >>> >>> If a disaster were to happen and all my hosts died, when I start my >>> cluster back up, it might start consuming from where it left off. I would >>> rather manually process that old data, and have my storm system start >>> processing the live data. >>> >>> Thanks >>> -- >>> Raphael Hsieh >>> >>> >>> >>> >> >> >> -- >> Danijel Schiavuzzi >> >> E: dani...@schiavuzzi.com >> W: www.schiavuzzi.com >> T: +385989035562 >> Skype: danijels7 >> > > > > -- > Raphael Hsieh > Amazon.com > Software Development Engineer I > (978) 764-9014 > > > > -- Raphael Hsieh Amazon.com Software Development Engineer I (978) 764-9014
logging 'failed' tuples in mastercoord-bg0
Is there a way to add logging to the master coordinator? For some reason my spout is failing batches and not giving me any error messages. The log files on the host don't provide any error messages and I'm not sure where the logic for this resides in Storm-Trident. Is there a particular string other than 'failed' that I can grep for? Thanks -- Raphael Hsieh
Specify a different ZooKeeper Cluster
When using Storm Trident with a kafka spout, I need to give it a ZooKeeper cluster and the broker path so that it can know where to find the Kafka hosts and get data. How would I go about specifying a different ZooKeeper cluster for storm to use? Thanks -- Raphael Hsieh
Re: Trident, ZooKeeper and Kafka
would the Trident version of this be tridentKafkaConfig.startOffsetTime ? On Wed, May 28, 2014 at 12:23 PM, Danijel Schiavuzzi wrote: > By default, the Kafka spout resumes consuming where it last left off. That > offset is stored in ZooKeeper. > > You can set forceStartOffset to -2 to start consuming from the earliest > available offset, or -1 to start consuming from the latest available offset. > > > On Wednesday, May 28, 2014, Raphael Hsieh wrote: > >> If I don't tell trident to start consuming data from the beginning of the >> Kafka stream, where does it start from? >> If I were to do: >>tridentKafkaConfig.forceFromStart = true; >> Then it will tell the spout to start consuming from the start of the >> stream. If that is not set, then where does it start consuming from? and >> How might I go about telling it to start consuming from the very end of the >> stream? >> >> If a disaster were to happen and all my hosts died, when I start my >> cluster back up, it might start consuming from where it left off. I would >> rather manually process that old data, and have my storm system start >> processing the live data. >> >> Thanks >> -- >> Raphael Hsieh >> >> >> >> > > > -- > Danijel Schiavuzzi > > E: dani...@schiavuzzi.com > W: www.schiavuzzi.com > T: +385989035562 > Skype: danijels7 > -- Raphael Hsieh Amazon.com Software Development Engineer I (978) 764-9014
Trident, ZooKeeper and Kafka
If I don't tell trident to start consuming data from the beginning of the Kafka stream, where does it start from? If I were to do: tridentKafkaConfig.forceFromStart = true; Then it will tell the spout to start consuming from the start of the stream. If that is not set, then where does it start consuming from? and How might I go about telling it to start consuming from the very end of the stream? If a disaster were to happen and all my hosts died, when I start my cluster back up, it might start consuming from where it left off. I would rather manually process that old data, and have my storm system start processing the live data. Thanks -- Raphael Hsieh
Position in Kafka Stream
Is there a way to tell where in the kafka stream my topology is starting from? >From my understanding Storm will use zookeeper in order to tell its place in the Kafka stream. Where can I find metrics on this ? How can I see how large the stream is? What how much data is sitting in the stream and what the most recent/oldest position is? Thanks -- Raphael Hsieh
Batches per second
Is there a way to tell how many batches per second are being processed by my topology? Thanks -- Raphael Hsieh
PersistentAggregate
>From my understanding, PersistentAggregate should first aggregate the batch, then once the batch has finished aggregating, send it to whatever datastore is specified. Is this the case ? Or will the Persistent Aggregate use the external datastore in order to do the aggregations ? -- Raphael Hsieh
Re: $mastercoord-bg0
Also is there any way to look at logs for this master coordinator spout ? I'd like to be able to see what exactly is failing. Currently my mastercoord-bg0 spout says that there are 20 things failing, but there are no error messages displayed. and I don't see anything when I ssh into the box and look at logs there. On Wed, May 21, 2014 at 4:47 PM, Raphael Hsieh wrote: > what does the $mastercoord-bg0 represent ? It seems to have much less work > that my bolt spout. Also how can I set the parallelism of this master spout > ? > > when my other bolts are emitting and acking millions of tuples, this > master spout only emits/acks a couple dozen. > > Also currently something in my code is broken and the mastercoord spout is > failing 20, and nothing is being passed through. > > Is this mastercoord-bg0 spout acking batches ? How might I go about > troubleshooting this to figure out why it is broken ? > > Thanks > -- > Raphael Hsieh > > -- Raphael Hsieh Amazon.com Software Development Engineer I (978) 764-9014
$mastercoord-bg0
what does the $mastercoord-bg0 represent ? It seems to have much less work that my bolt spout. Also how can I set the parallelism of this master spout ? when my other bolts are emitting and acking millions of tuples, this master spout only emits/acks a couple dozen. Also currently something in my code is broken and the mastercoord spout is failing 20, and nothing is being passed through. Is this mastercoord-bg0 spout acking batches ? How might I go about troubleshooting this to figure out why it is broken ? Thanks -- Raphael Hsieh
Nimbus UI fields
Hi I'm confused as to what each field in the StormUI represents and how to use the information. [image: Inline image 1] The bolts I have above are formed from trident. This is what operations I believe each bolt represents b-0 : .each(function) -> .each(filter) b-1 : .aggregate --split-- b-2 : .persistentAggregate b-3 : .persistentAggregate What does it mean for the first two bolts to emit and transfer 0 ? What is the Capacity field ? What does that represent ? Does Execute refer to the tuples acked and successfully processed? Thanks -- Raphael Hsieh
Are batches processed sequentially or in parallel?
In Storm Trident are batches processed sequentially? Or are they all processed in parallel? If they are processed in parallel how does it handle multiple writers to a datastore ? I can understand this making sense for append-only implementations, but for cases where we are updating values in a database, how does it make sure that values are written, and in the database before another thread reads it and tries to update it with different data? Thanks -- Raphael Hsieh
Re: Memcached
Also, How fault tolerant is using the MemcachedState? If a host/worker node dies, does its in memory map get lost forever ? Or is this map distributed among worker nodes ? On Mon, Apr 28, 2014 at 3:57 PM, Raphael Hsieh wrote: > How would one pull data from a Memcached in a separate process ? > > I see the examples for how to put data into memcached, but what is the > purpose of doing so ? > > How might I have a different process pulling data from the memcache every > X seconds, and pushing it to a different external datastore ? > > -- > Raphael Hsieh > > > > -- Raphael Hsieh
Memcached
How would one pull data from a Memcached in a separate process ? I see the examples for how to put data into memcached, but what is the purpose of doing so ? How might I have a different process pulling data from the memcache every X seconds, and pushing it to a different external datastore ? -- Raphael Hsieh
Multiple writers to datastore ?
How does Storm handle multiple writers writing data to an external datastore ? If there are multiple workers working to process the data, each of them must also do the "write" to the database. If there are multiple writers to the database that are storing data, how does it make sure that the writers don't overwrite each others data ? I understand that it will read from the datastore first and check the batchTxId. However, if two writers look at a previous batchID, lets pretend it's 26. Now batch 27 and 28 want to write a continued aggregate data. 27 increments on 26. 28 increments on 26. 27 writes first, then 28. However, 28 did not aggregate on top of 27's aggregate, and hence the final data in the datastore is wrong. How does storm handle this ? -- Raphael Hsieh
Re: Flush aggregated data every X seconds
Thank you very much for your quick reply Corey, Unfortunately I don't believe the TickSpout exists within Trident yet. I have seen the threads discussing the implementation of a sliding window and I've read Michael Noll's blog<http://www.michael-noll.com/blog/2013/01/18/implementing-real-time-trending-topics-in-storm/>about it as well. I don't need a sliding window, as much as just multiple window chunks if that makes sense haha. What I'm thinking about resorting to is increasing my Batch size to be much larger than the throughput of the spout, then at the end of my topology, doing an aggregation such that everything aggregates to a single tuple, and running a ".each" on that single tuple with the function just sleeping for X time. My theory is that this should allow the stream to back up enough such that the next batch takes (roughly) the entire next X time amount of data. Can anyone validate that this technique will work ? On Thu, Apr 24, 2014 at 8:36 PM, Corey Nolet wrote: > Raphael, in your case it sounds like a "TickSpout" could be useful where > you emit a tuple every n time slices and then sleep until needing to emit > another. I'm not sure how that'd work in a Trident aggregator, however. > > I'm not sure if this is something Nathan or the community would approve > of, but I've been writing my own framework for doing sliding/tumbling > windows in Storm that allow aggregations and triggering/eviction by count, > time, and other policies like "when the time difference between the first > item and the last item in a window is less than x". The bolts could easily > be ripped out for doing your own aggregations. > > It's located here: https://github.com/calrissian/flowbox > > It's very much so in the proof of concept stage. My other requirement (and > the reason I cared so much to implement this) was that the rules need to be > dynamic and the topology needs to be static as to make the best use of > resources while users are defining that they need. > > > > On Thu, Apr 24, 2014 at 11:27 PM, Raphael Hsieh wrote: > >> Is there a way in Storm Trident to aggregate data over a certain time >> period and have it flush the data out to an external data store after that >> time period is up ? >> >> Trident does not have the functionality of Tick Tuples yet, so I cannot >> use that. Everything I've been researching leads to believe that this is >> not possible in Storm/Trident, however this seems to me to be a fairly >> standard use case of any streaming map reduce library. >> >> For example, >> If I am receiving a stream of integers >> I want to aggregate all those integers over a period of 1 second, then >> persist it into an external datastore. >> >> This is not in order to count how much it will add up to over X amount of >> time, rather I would like to minimize the read/write/updates I do to said >> datastore. >> >> There are many ways in order to reduce these variables, however all of >> them force me to modify my schema in ways that are unpleasant. Also, I >> would rather not have my final external datastore be my scratch space, >> where my program is reading/updating/writing and checking to make sure that >> the transaction id's line up. >> Instead I want that scratch work to be done separately, then the final >> result stored into a final database that no longer needs to do constant >> updating. >> >> Thanks >> -- >> Raphael Hsieh >> >> >> >> > > -- Raphael Hsieh
Flush aggregated data every X seconds
Is there a way in Storm Trident to aggregate data over a certain time period and have it flush the data out to an external data store after that time period is up ? Trident does not have the functionality of Tick Tuples yet, so I cannot use that. Everything I've been researching leads to believe that this is not possible in Storm/Trident, however this seems to me to be a fairly standard use case of any streaming map reduce library. For example, If I am receiving a stream of integers I want to aggregate all those integers over a period of 1 second, then persist it into an external datastore. This is not in order to count how much it will add up to over X amount of time, rather I would like to minimize the read/write/updates I do to said datastore. There are many ways in order to reduce these variables, however all of them force me to modify my schema in ways that are unpleasant. Also, I would rather not have my final external datastore be my scratch space, where my program is reading/updating/writing and checking to make sure that the transaction id's line up. Instead I want that scratch work to be done separately, then the final result stored into a final database that no longer needs to do constant updating. Thanks -- Raphael Hsieh
Re: PersistentAggregate across batches
Are you saying that there is no purpose to do a "groupBy" followed by a PersistentAggregate ? The documentation states: "If you run aggregators on a grouped stream, the aggregation will be run within each group instead of against the whole batch." On Wed, Apr 23, 2014 at 2:17 AM, Danijel Schiavuzzi wrote: > > When I do something like >> >> Stream >> .groupBy(new Fields("a") >> .persistentAggregate(new MyStateFactory(), new Fields("a", "b", "c", >> "d"), new MyAggregator(), new Fields("resultMap")) >> >> What happens (as described >> here<https://github.com/nathanmarz/storm/wiki/Trident-API-Overview>) >> is the stream is split into different groups based on field "a": >> > > This is not true. The stream will be grouped based on all the keys you > specified in persistentAggregate, i.e. new Fields("a", "b", "c", "d"). This > will produce as many GroupedStreams as there are distinct groupings among > those keys. Those groupings will then be combined/reduced with the existing > values gathered from the IBackingMap#multiGet(), and Trident will then call > multiPut()) to persist the updated aggregations back to the underlying data > store. > > Take a look at the Storm sources under the package "storm.trident.*". A > good starting point for understanding Trident would be the Java class > "storm.trident.state.map.TransactionalMap" (or OpaqueMap or > NonTransactionalMap). > > Danijel Schiavuzzi > www.schiavuzzi.com > > > > > >> [image: Grouping] >> like so. >> then, PartitionPersist will run a MultiGet on the fields ("a", "b", "c", >> "d"), since that is what we are using as our keys. So in each of the >> "groups" described above, we would have not only the raw tuples resulting >> from the grouping, but also a single tuple with the result of the previous >> aggregation. >> These would all be run through the aggregator, which should be able to >> handle aggregating with this semi-complete aggregation (The "Reduce" >> function in a ReducerAggregator, or the "Combine" function in the >> CombinerAggregator). >> >> How does it know not to treat the previous aggregation as a single new >> tuple? (hence not running the "init" function ? For example if I was >> aggregating a count, having that previous value (say 60) as a single extra >> tuple would only increment the count by 1, instead of 60. >> would I then just need to implement my own "init" function such that it >> has checks for the tuple value, whether it is a raw new tuple, vs a >> previous tuple aggregation? >> >> >> On Tue, Apr 22, 2014 at 9:59 AM, Cody A. Ray wrote: >> >>> My understanding is that the process is >>> 1. multiGet from the IBackingMap is called and returns a value for each >>> key (or null if not present) >>> 2. For each key, the old value from the get and new values in the batch >>> are fed through the aggregator to produce one value per key >>> 3. This value is then stored back into the state through the multiPut in >>> the IBackingMap. >>> >>> If you just want to use nathanmarz's trident-memcached integration, you >>> don't have to write an IBackingMap yourself. The MemcachedState itself >>> implements IBackingMap to do the get and put. To use it, just decide what >>> you want to groupBy (these become your keys) and how you want it aggregated >>> (this is the reduced/combiner implementation). You don't have to write the >>> memcache connection logic or the aggregation logic yourself unless you want >>> to change how it's aggregated or stored. >>> I've not used the trident-memcached state in particular, but in general >>> this would look something like this: >>> >>> topology.newStream("spout1", spout1) >>> .groupBy(new Fields("mykeyfield")) >>> .persistentAggregate(MemcachedState.opaque(servers), new >>> Fields("myvaluefield"), new Sum(), new Fields("sum")) >>> >>> (Sorry for any code errors; writing in my phone) >>> >>> Does that answer your question? >>> >>> -Cody >>> On Apr 22, 2014 10:32 AM, "Raphael Hsieh" wrote: >>> >>>> The Reducer/Combiner Aggregators hold logic in order to aggregate >>>> across an entire batch, however it does not have
Re: PersistentAggregate across batches
the previous link didn't work, https://github.com/nathanmarz/storm/wiki/Trident-API-Overview#operations-on-grouped-streams On Tue, Apr 22, 2014 at 10:30 AM, Raphael Hsieh wrote: > Yes partially, > The part I was missing was getting old values and feeding it through the > aggregator again, which still doesn't quite make sense to me. > > I am using an external datastore, so I am not able to use the vanilla > MemcachedState, hence why I am implementing my own version of the > IBackingMap. > > So let me try and explain what I am understanding. > When I do something like > > Stream > .groupBy(new Fields("a") > .persistentAggregate(new MyStateFactory(), new Fields("a", "b", "c", > "d"), new MyAggregator(), new Fields("resultMap")) > > What happens (as described > here<https://github.com/nathanmarz/storm/wiki/Trident-API-Overview>) > is the stream is split into different groups based on field "a": > [image: Grouping] > like so. > then, PartitionPersist will run a MultiGet on the fields ("a", "b", "c", > "d"), since that is what we are using as our keys. So in each of the > "groups" described above, we would have not only the raw tuples resulting > from the grouping, but also a single tuple with the result of the previous > aggregation. > These would all be run through the aggregator, which should be able to > handle aggregating with this semi-complete aggregation (The "Reduce" > function in a ReducerAggregator, or the "Combine" function in the > CombinerAggregator). > > How does it know not to treat the previous aggregation as a single new > tuple? (hence not running the "init" function ? For example if I was > aggregating a count, having that previous value (say 60) as a single extra > tuple would only increment the count by 1, instead of 60. > would I then just need to implement my own "init" function such that it > has checks for the tuple value, whether it is a raw new tuple, vs a > previous tuple aggregation? > > > On Tue, Apr 22, 2014 at 9:59 AM, Cody A. Ray wrote: > >> My understanding is that the process is >> 1. multiGet from the IBackingMap is called and returns a value for each >> key (or null if not present) >> 2. For each key, the old value from the get and new values in the batch >> are fed through the aggregator to produce one value per key >> 3. This value is then stored back into the state through the multiPut in >> the IBackingMap. >> >> If you just want to use nathanmarz's trident-memcached integration, you >> don't have to write an IBackingMap yourself. The MemcachedState itself >> implements IBackingMap to do the get and put. To use it, just decide what >> you want to groupBy (these become your keys) and how you want it aggregated >> (this is the reduced/combiner implementation). You don't have to write the >> memcache connection logic or the aggregation logic yourself unless you want >> to change how it's aggregated or stored. >> I've not used the trident-memcached state in particular, but in general >> this would look something like this: >> >> topology.newStream("spout1", spout1) >> .groupBy(new Fields("mykeyfield")) >> .persistentAggregate(MemcachedState.opaque(servers), new >> Fields("myvaluefield"), new Sum(), new Fields("sum")) >> >> (Sorry for any code errors; writing in my phone) >> >> Does that answer your question? >> >> -Cody >> On Apr 22, 2014 10:32 AM, "Raphael Hsieh" wrote: >> >>> The Reducer/Combiner Aggregators hold logic in order to aggregate across >>> an entire batch, however it does not have the logic to aggregate between >>> batches. >>> In order for this to happen, it must read the previous TransactionId and >>> value from the datastore, determine whether this incoming data is in the >>> right sequence, then increment the value within the datastore. >>> >>> I am asking about this second part. Where the logic goes in order to >>> read previous data from the datastore, and add it to the new incoming >>> aggregate data. >>> >>> >>> On Mon, Apr 21, 2014 at 6:58 PM, Cody A. Ray wrote: >>> >>>> Its the ReducerAggregate/CombinerAggregator's job to implement this >>>> logic. Look at Count and Sum that are built-in to Trident. You can also >>>> implement your own aggregator. >>>> >>>> -Cody >>>> >>>>
Re: PersistentAggregate across batches
Yes partially, The part I was missing was getting old values and feeding it through the aggregator again, which still doesn't quite make sense to me. I am using an external datastore, so I am not able to use the vanilla MemcachedState, hence why I am implementing my own version of the IBackingMap. So let me try and explain what I am understanding. When I do something like Stream .groupBy(new Fields("a") .persistentAggregate(new MyStateFactory(), new Fields("a", "b", "c", "d"), new MyAggregator(), new Fields("resultMap")) What happens (as described here<https://github.com/nathanmarz/storm/wiki/Trident-API-Overview>) is the stream is split into different groups based on field "a": [image: Grouping] like so. then, PartitionPersist will run a MultiGet on the fields ("a", "b", "c", "d"), since that is what we are using as our keys. So in each of the "groups" described above, we would have not only the raw tuples resulting from the grouping, but also a single tuple with the result of the previous aggregation. These would all be run through the aggregator, which should be able to handle aggregating with this semi-complete aggregation (The "Reduce" function in a ReducerAggregator, or the "Combine" function in the CombinerAggregator). How does it know not to treat the previous aggregation as a single new tuple? (hence not running the "init" function ? For example if I was aggregating a count, having that previous value (say 60) as a single extra tuple would only increment the count by 1, instead of 60. would I then just need to implement my own "init" function such that it has checks for the tuple value, whether it is a raw new tuple, vs a previous tuple aggregation? On Tue, Apr 22, 2014 at 9:59 AM, Cody A. Ray wrote: > My understanding is that the process is > 1. multiGet from the IBackingMap is called and returns a value for each > key (or null if not present) > 2. For each key, the old value from the get and new values in the batch > are fed through the aggregator to produce one value per key > 3. This value is then stored back into the state through the multiPut in > the IBackingMap. > > If you just want to use nathanmarz's trident-memcached integration, you > don't have to write an IBackingMap yourself. The MemcachedState itself > implements IBackingMap to do the get and put. To use it, just decide what > you want to groupBy (these become your keys) and how you want it aggregated > (this is the reduced/combiner implementation). You don't have to write the > memcache connection logic or the aggregation logic yourself unless you want > to change how it's aggregated or stored. > I've not used the trident-memcached state in particular, but in general > this would look something like this: > > topology.newStream("spout1", spout1) > .groupBy(new Fields("mykeyfield")) > .persistentAggregate(MemcachedState.opaque(servers), new > Fields("myvaluefield"), new Sum(), new Fields("sum")) > > (Sorry for any code errors; writing in my phone) > > Does that answer your question? > > -Cody > On Apr 22, 2014 10:32 AM, "Raphael Hsieh" wrote: > >> The Reducer/Combiner Aggregators hold logic in order to aggregate across >> an entire batch, however it does not have the logic to aggregate between >> batches. >> In order for this to happen, it must read the previous TransactionId and >> value from the datastore, determine whether this incoming data is in the >> right sequence, then increment the value within the datastore. >> >> I am asking about this second part. Where the logic goes in order to read >> previous data from the datastore, and add it to the new incoming aggregate >> data. >> >> >> On Mon, Apr 21, 2014 at 6:58 PM, Cody A. Ray wrote: >> >>> Its the ReducerAggregate/CombinerAggregator's job to implement this >>> logic. Look at Count and Sum that are built-in to Trident. You can also >>> implement your own aggregator. >>> >>> -Cody >>> >>> >>> On Mon, Apr 21, 2014 at 2:57 PM, Raphael Hsieh wrote: >>> >>>> If I am using an opaque spout and doing a persistent aggregate to a >>>> MemcachedState, how is it aggregating/incrementing the values across all >>>> batches ? >>>> >>>> I'm wanting to implement an IBackingMap so that I can use an external >>>> datastore. However, I'm unsure where the logic goes that will read the >>>> previous data, and aggregate it with the new data. >>>> >>>> From what I've been told, I need to implement the IBackingMap and the >>>> multiput/multiget functions. So logically, I think it makes sense that I >>>> would put this update logiv in the multiput function. However, the >>>> OpaqueMap class already has multiGet logic in order to check the TxId of >>>> the batch. >>>> Instead of using an OpaqueMap class, should I just make my own >>>> implementation ? >>>> >>>> Thanks >>>> -- >>>> Raphael Hsieh >>>> >>>> >>>> >>>> >>> >>> >>> >>> -- >>> Cody A. Ray, LEED AP >>> cody.a@gmail.com >>> 215.501.7891 >>> >> >> >> >> -- >> Raphael Hsieh >> >> >> >> > -- Raphael Hsieh
Re: PersistentAggregate across batches
The Reducer/Combiner Aggregators hold logic in order to aggregate across an entire batch, however it does not have the logic to aggregate between batches. In order for this to happen, it must read the previous TransactionId and value from the datastore, determine whether this incoming data is in the right sequence, then increment the value within the datastore. I am asking about this second part. Where the logic goes in order to read previous data from the datastore, and add it to the new incoming aggregate data. On Mon, Apr 21, 2014 at 6:58 PM, Cody A. Ray wrote: > Its the ReducerAggregate/CombinerAggregator's job to implement this logic. > Look at Count and Sum that are built-in to Trident. You can also implement > your own aggregator. > > -Cody > > > On Mon, Apr 21, 2014 at 2:57 PM, Raphael Hsieh wrote: > >> If I am using an opaque spout and doing a persistent aggregate to a >> MemcachedState, how is it aggregating/incrementing the values across all >> batches ? >> >> I'm wanting to implement an IBackingMap so that I can use an external >> datastore. However, I'm unsure where the logic goes that will read the >> previous data, and aggregate it with the new data. >> >> From what I've been told, I need to implement the IBackingMap and the >> multiput/multiget functions. So logically, I think it makes sense that I >> would put this update logiv in the multiput function. However, the >> OpaqueMap class already has multiGet logic in order to check the TxId of >> the batch. >> Instead of using an OpaqueMap class, should I just make my own >> implementation ? >> >> Thanks >> -- >> Raphael Hsieh >> >> >> >> > > > > -- > Cody A. Ray, LEED AP > cody.a@gmail.com > 215.501.7891 > -- Raphael Hsieh Amazon.com Software Development Engineer I (978) 764-9014
PersistentAggregate across batches
If I am using an opaque spout and doing a persistent aggregate to a MemcachedState, how is it aggregating/incrementing the values across all batches ? I'm wanting to implement an IBackingMap so that I can use an external datastore. However, I'm unsure where the logic goes that will read the previous data, and aggregate it with the new data. >From what I've been told, I need to implement the IBackingMap and the multiput/multiget functions. So logically, I think it makes sense that I would put this update logiv in the multiput function. However, the OpaqueMap class already has multiGet logic in order to check the TxId of the batch. Instead of using an OpaqueMap class, should I just make my own implementation ? Thanks -- Raphael Hsieh
In memory state, drop data after X time
At the end of the documentation here https://github.com/nathanmarz/storm/wiki/Trident-tutorial#state it mentions: State's are not required to hold onto state forever. For example, you could have an in-memory State implementation that only keeps the last X hours of data available and drops anything older. Take a look at the implementation of the Memcached integration<https://github.com/nathanmarz/trident-memcached/blob/master/src/jvm/trident/memcached/MemcachedState.java> for an example State implementation. How would I go about doing this ? In Trident I haven't found an easy way to store a state for a period of time, then flush it out to an external datastore. -- Raphael Hsieh
Re: How to think of batches vs partitions
oh ok, So if I want the guarantee of single message processing when using an external datastore, I need to implement the methods described here<https://github.com/nathanmarz/storm/wiki/Trident-state#opaque-transactional-spouts> ( https://github.com/nathanmarz/storm/wiki/Trident-state#opaque-transactional-spouts) myself? On Thu, Apr 17, 2014 at 11:00 AM, Nathan Marz wrote: > aggregate / partitionAggregate are only aggregations within the current > batch, the persistent equivalents are aggregations across all batches. > > The logic for querying states, updating them, and keeping track of batch > ids happens in the states themselves. For example, look at the multiUpdate > method in TransactionalMap: > https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/storm/trident/state/map/TransactionalMap.java > > Things are structured so that TransactionalMap delegates to an > "IBackingMap" which handles the actual persistence. IBackingMap just has > multiGet and multiPut methods. An implementation for a database (like > Cassandra, Riak, HBase, etc.) just has to implement IBackingMap. > > > On Thu, Apr 17, 2014 at 10:15 AM, Raphael Hsieh wrote: > >> I guess I'm just confused as to when "multiGet" and "multiPut" are called >> when using an implementation of the IBackingMap >> >> >> On Thu, Apr 17, 2014 at 8:33 AM, Raphael Hsieh wrote: >> >>> So from my understanding, this is how the different spout types >>> guarantee single message processing. For example, an opaque transactional >>> spout will look at transaction id's in order to guarantee in order batch >>> processing, making sure that the txid's are processed in order, and using >>> the previous and current values to fix any mixups. >>> >>> When doing an aggregation does it aggregation across all batches ? If >>> so, how does this happen ? Will it query the datastore for the current >>> value, then add the current aggregate value to the stored value in order to >>> create the global aggregate ? Where does this logic happen ? I can't seem >>> to find where this happens in the persistentAggregate or even >>> partitionPersist... >>> >>> >>> On Wed, Apr 16, 2014 at 8:30 PM, Nathan Marz wrote: >>> >>>> Batches are processed sequentially, but each batch is partitioned (and >>>> therefore processed in parallel). As a batch is processed, it can be >>>> repartitioned an arbitrary number of times throughout the Trident topology. >>>> >>>> >>>> On Wed, Apr 16, 2014 at 4:28 PM, Raphael Hsieh wrote: >>>> >>>>> Hi I found myself being confused on how to think of Storm/Trident >>>>> processing batches. >>>>> Are batches processed sequentially, but split into multiple partitions >>>>> that are spread throughout the worker nodes ? >>>>> >>>>> Or are batches processed in parrallel and spread among worker nodes to >>>>> be split into partitions within each host running on multiple threads ? >>>>> >>>>> Thanks! >>>>> -- >>>>> Raphael Hsieh >>>>> >>>>> >>>>> >>>>> >>>> >>>> >>>> >>>> -- >>>> Twitter: @nathanmarz >>>> http://nathanmarz.com >>>> >>> >>> >>> >>> -- >>> Raphael Hsieh >>> >>> >>> >> >> >> >> -- >> Raphael Hsieh >> >> >> >> > > > > -- > Twitter: @nathanmarz > http://nathanmarz.com > -- Raphael Hsieh
Re: How to think of batches vs partitions
I guess I'm just confused as to when "multiGet" and "multiPut" are called when using an implementation of the IBackingMap On Thu, Apr 17, 2014 at 8:33 AM, Raphael Hsieh wrote: > So from my understanding, this is how the different spout types guarantee > single message processing. For example, an opaque transactional spout will > look at transaction id's in order to guarantee in order batch processing, > making sure that the txid's are processed in order, and using the previous > and current values to fix any mixups. > > When doing an aggregation does it aggregation across all batches ? If so, > how does this happen ? Will it query the datastore for the current value, > then add the current aggregate value to the stored value in order to create > the global aggregate ? Where does this logic happen ? I can't seem to find > where this happens in the persistentAggregate or even partitionPersist... > > > On Wed, Apr 16, 2014 at 8:30 PM, Nathan Marz wrote: > >> Batches are processed sequentially, but each batch is partitioned (and >> therefore processed in parallel). As a batch is processed, it can be >> repartitioned an arbitrary number of times throughout the Trident topology. >> >> >> On Wed, Apr 16, 2014 at 4:28 PM, Raphael Hsieh wrote: >> >>> Hi I found myself being confused on how to think of Storm/Trident >>> processing batches. >>> Are batches processed sequentially, but split into multiple partitions >>> that are spread throughout the worker nodes ? >>> >>> Or are batches processed in parrallel and spread among worker nodes to >>> be split into partitions within each host running on multiple threads ? >>> >>> Thanks! >>> -- >>> Raphael Hsieh >>> >>> >>> >>> >> >> >> >> -- >> Twitter: @nathanmarz >> http://nathanmarz.com >> > > > > -- > Raphael Hsieh > > > -- Raphael Hsieh Amazon.com Software Development Engineer I (978) 764-9014
Re: How to think of batches vs partitions
So from my understanding, this is how the different spout types guarantee single message processing. For example, an opaque transactional spout will look at transaction id's in order to guarantee in order batch processing, making sure that the txid's are processed in order, and using the previous and current values to fix any mixups. When doing an aggregation does it aggregation across all batches ? If so, how does this happen ? Will it query the datastore for the current value, then add the current aggregate value to the stored value in order to create the global aggregate ? Where does this logic happen ? I can't seem to find where this happens in the persistentAggregate or even partitionPersist... On Wed, Apr 16, 2014 at 8:30 PM, Nathan Marz wrote: > Batches are processed sequentially, but each batch is partitioned (and > therefore processed in parallel). As a batch is processed, it can be > repartitioned an arbitrary number of times throughout the Trident topology. > > > On Wed, Apr 16, 2014 at 4:28 PM, Raphael Hsieh wrote: > >> Hi I found myself being confused on how to think of Storm/Trident >> processing batches. >> Are batches processed sequentially, but split into multiple partitions >> that are spread throughout the worker nodes ? >> >> Or are batches processed in parrallel and spread among worker nodes to be >> split into partitions within each host running on multiple threads ? >> >> Thanks! >> -- >> Raphael Hsieh >> >> >> >> > > > > -- > Twitter: @nathanmarz > http://nathanmarz.com > -- Raphael Hsieh
How to think of batches vs partitions
Hi I found myself being confused on how to think of Storm/Trident processing batches. Are batches processed sequentially, but split into multiple partitions that are spread throughout the worker nodes ? Or are batches processed in parrallel and spread among worker nodes to be split into partitions within each host running on multiple threads ? Thanks! -- Raphael Hsieh
Re: setting trident transaction window
The documentation for trident aggregations mentions that aggregations are done globally across all batches. https://github.com/nathanmarz/storm/wiki/Trident-API-Overview#aggregation-operations Is this incorrect ? When the documentation says "all batches" it does mean all batches across all worker nodes right ? On Thu, Apr 10, 2014 at 3:21 PM, Jason Jackson wrote: > > > > stream.aggregate(...) computes aggregations per batch > stream.groupBy(..).aggregate(..) computes aggregations per key per batch. > > stream.persistentAggregate(..) computes partial aggregations per batch, DB > is updated with partial aggregates per batch > stream.groupBy(..).persistentAggregate(..) computes partial aggregations > per batch per key, DB is updated with partial aggregates per batch. > > you can also think of persistent aggregate as computing DB deltas, and > only sending the deltas to the DB. > > Whether increasing the batch size reduces the total amount of writes to > persistent store is not strictly true, but in practice it does. E.g. > imagine our stream has 5 unique keys and you do a > groupBy.persistentAggregate, and the throughput is 1B/sec. If you have > batches of 5B items, then after 5 seconds you sent ~10 key/vals updates to > the DB, if you have batches of 0.5B items, then after 10 seconds you've > sent ~100 key/vals to the DB. > > Some of the tradeoffs here are that if a batch fails you have to do more > recomputation. And greater chance for a batch to fail as there's more > tuples per batch. This should though, definitely give it a shot. > > > > > > > On Thu, Apr 10, 2014 at 1:33 PM, Raphael Hsieh wrote: > >> Thanks for your reply Jason, >> So what I'm hearing is that there is no nice way of doing temporal >> flushes to a database. My main reason for wanting to do this is because I >> want to use DynamoDB for my external datastore, but it gets expensive. I >> would like to limit my reads and writes as much as I can so that the cost >> does not add up. >> >> Increasing the batch size seems like the best solution so far, however >> from my understanding doing an aggregation in storm/trident does a global >> aggregation, so do batch sizes really make a difference ? Or is my >> understanding of the aggregation process wrong. I am had though that >> aggregating is global among all partitions (and storm nodes). >> >> >> On Thu, Apr 10, 2014 at 1:58 AM, Jason Jackson wrote: >> >>> trident doesn't expose tick tuples in it's API yet, even though it was >>> added in storm a while ago. >>> >>> There's two problems I think you're talking about (1) windowed >>> aggregations (2) reducing DB load. >>> >>> For (1) >>> Trident can do aggregations at the batch level but this doesn't really >>> help you for doing aggregations over a range of timestamps. The way to do >>> that is you would include the timebucket in your key when >>> persistentAggregate. E.g. your key could be "apple-2014-01-02-12:40:64" >>> for minutely buckets. Then when serving the data you would query all keys >>> across the time range. Certain databases such as cassandra can make this >>> query very fast. >>> >>> (2) You'll need to implement your own IBackingMap persistent store >>> plugin and pass it to persistentAggregate. Look other examples such as the >>> trident-memcache for how to implement these. So for your custom persistent >>> store plugin you could use a combination of in-memory map and DB. 4/5 >>> batches would just commit their state updates to the in-memory map. The 5th >>> batch would commit to the in-memory map, and then flush that map to the >>> database. You could even launch a separate thread to do the flushing, >>> incase it takes a while. This design however is not going to give you >>> exactly once semantics. As if you loose the in-memory map because your >>> worker died for example, then when it comes back online it will still >>> resume from the last successful batch (not the last flushed batch). >>> >>> To retain exactly once semantics you could also make your batch sizes >>> much larger, by default they read 1MB from each kafka partition (see >>> bufferSize and fetchSize configuration option in Kafka Spout). IF you >>> increased batch size, and you're doing some kind of key based aggregations, >>> then this would reduce the total number of writes you would need to do your >>> persistent storage. >>> >>> Trident could definitely be improved here, so y
Re: setting trident transaction window
Thanks for your reply Jason, So what I'm hearing is that there is no nice way of doing temporal flushes to a database. My main reason for wanting to do this is because I want to use DynamoDB for my external datastore, but it gets expensive. I would like to limit my reads and writes as much as I can so that the cost does not add up. Increasing the batch size seems like the best solution so far, however from my understanding doing an aggregation in storm/trident does a global aggregation, so do batch sizes really make a difference ? Or is my understanding of the aggregation process wrong. I am had though that aggregating is global among all partitions (and storm nodes). On Thu, Apr 10, 2014 at 1:58 AM, Jason Jackson wrote: > trident doesn't expose tick tuples in it's API yet, even though it was > added in storm a while ago. > > There's two problems I think you're talking about (1) windowed > aggregations (2) reducing DB load. > > For (1) > Trident can do aggregations at the batch level but this doesn't really > help you for doing aggregations over a range of timestamps. The way to do > that is you would include the timebucket in your key when > persistentAggregate. E.g. your key could be "apple-2014-01-02-12:40:64" > for minutely buckets. Then when serving the data you would query all keys > across the time range. Certain databases such as cassandra can make this > query very fast. > > (2) You'll need to implement your own IBackingMap persistent store plugin > and pass it to persistentAggregate. Look other examples such as the > trident-memcache for how to implement these. So for your custom persistent > store plugin you could use a combination of in-memory map and DB. 4/5 > batches would just commit their state updates to the in-memory map. The 5th > batch would commit to the in-memory map, and then flush that map to the > database. You could even launch a separate thread to do the flushing, > incase it takes a while. This design however is not going to give you > exactly once semantics. As if you loose the in-memory map because your > worker died for example, then when it comes back online it will still > resume from the last successful batch (not the last flushed batch). > > To retain exactly once semantics you could also make your batch sizes much > larger, by default they read 1MB from each kafka partition (see bufferSize > and fetchSize configuration option in Kafka Spout). IF you increased batch > size, and you're doing some kind of key based aggregations, then this would > reduce the total number of writes you would need to do your persistent > storage. > > Trident could definitely be improved here, so your mileage may vary. > > > On Wed, Apr 9, 2014 at 9:15 AM, Raphael Hsieh wrote: > >> I have been struggling to figure out how to get trident to aggregate data >> over a certain time period then flush the data to an external data store. >> The reasoning behind this is to reduce the number of reads and writes >> sent to the database. >> >> I've seen that Storm allows for tick tuples to be inserted into the >> stream, however I can't figure out how to do this with trident. I had >> thought that this functionality was added with Storm version 0.8.0 ? >> Is this the case ? >> >> One thing I had tried was to create a new stream that emitted a tuple >> once every X time period, then I tried to merge this stream into my actual >> data stream. However, doing this would result in a non transactional stream >> which would be no good. Also it didn't work, as the resulting stream only >> consisted of tuples from my clock stream. >> >> Can anybody help me figure out how to have Trident aggregate data over a >> certain time frame, flush it out to an external datastore, then rinse and >> repeat ? >> >> there are some blogs out there regarding how to use a sliding window in >> storm, however I just want sequential windows in Trident. >> >> Thanks >> >> -- >> Raphael Hsieh >> >> >> >> > > -- Raphael Hsieh Amazon.com Software Development Engineer I (978) 764-9014
setting trident transaction window
I have been struggling to figure out how to get trident to aggregate data over a certain time period then flush the data to an external data store. The reasoning behind this is to reduce the number of reads and writes sent to the database. I've seen that Storm allows for tick tuples to be inserted into the stream, however I can't figure out how to do this with trident. I had thought that this functionality was added with Storm version 0.8.0 ? Is this the case ? One thing I had tried was to create a new stream that emitted a tuple once every X time period, then I tried to merge this stream into my actual data stream. However, doing this would result in a non transactional stream which would be no good. Also it didn't work, as the resulting stream only consisted of tuples from my clock stream. Can anybody help me figure out how to have Trident aggregate data over a certain time frame, flush it out to an external datastore, then rinse and repeat ? there are some blogs out there regarding how to use a sliding window in storm, however I just want sequential windows in Trident. Thanks -- Raphael Hsieh