Trident Metrics Consumer

2014-09-26 Thread Raphael Hsieh
I've been following the tutorials here (
http://www.bigdata-cookbook.com/post/72320512609/storm-metrics-how-to)  to
create metrics in Storm.

However I am using Trident which abstracts bolts away from the user. How
can I go about creating metrics in trident ?

Thanks

-- 
Raphael Hsieh


Re: LoggingMetricsConsumer

2014-09-26 Thread Raphael Hsieh
Thanks for your response John,
Could you explain to me how this would work when I am using Trident? Since
with Trident bolts are abstracted away from the user, how might I configure
my own MetricsConsumerBolt / debug it to figure out why it isn't calling
"handleDataPoints()" ? my metricsConsumer's "prepare()" and "cleanup()"
methods are called, but never the handleDataPoints() function.

Thanks

On Thu, Sep 25, 2014 at 1:33 PM, John Reilly  wrote:

> It is called by the MetricsConsumerBolt which is created by storm when a
> worker is starting up.  When you define a metrics consumer, you should see
> metrics output every 60 seconds.  Also, I think the metrics code was only
> introduced in 0.9.0 so you would need to be running at least that version.
>
> One other issue I ran into when registering a metrics consumer was that
> the config args I was passing initially because of serialization issues.
> When I used a Map instead of a serializable class that I created, it worked
> fine.  For the packaged LoggingMetricsConsumer there is no config though.
> I think I did run into an issue when trying to configure
> both LoggingMetricsConsumer and my own metrics consumer.  IIRC, if
> initialization of my consumer failed, the LoggingMetricsConsumer would also
> failit may have depended on the order that I was registering them in,
> but I don't remember exactly.
>
> Cheers,
> John
>
> On Thu, Sep 25, 2014 at 10:07 AM, Raphael Hsieh 
> wrote:
>
>> Hi, I've been trying to figure out why registerinfg a
>> LoggingMetricsConsumer isn't working for me.
>>
>> I've been able to figure out that it is indeed running, however the
>> "handleDataPoints()" function is never called. Can someone explain to me
>> how this class is used by Storm in order to log metrics?
>> When is the handleDataPoints function called?
>>
>> Thanks
>>
>> --
>> Raphael Hsieh
>>
>>
>>
>
>


-- 
Raphael Hsieh


LoggingMetricsConsumer

2014-09-25 Thread Raphael Hsieh
Hi, I've been trying to figure out why registerinfg a
LoggingMetricsConsumer isn't working for me.

I've been able to figure out that it is indeed running, however the
"handleDataPoints()" function is never called. Can someone explain to me
how this class is used by Storm in order to log metrics?
When is the handleDataPoints function called?

Thanks

-- 
Raphael Hsieh


Re: metrics consumer logging stormUI data

2014-09-24 Thread Raphael Hsieh
In order to get this / have the metrics consumer work, do I need to have
the setDebug attribute set to true?

On Mon, Sep 22, 2014 at 12:59 PM, Harsha  wrote:

>  Here is what I see in the metrics.log
>
> 2014-09-22 09:44:31,321 731751411404271   localhost:6703
> 19:split   __transfer-count{default=2680}
>
> 2014-09-22 09:44:31,321 731751411404271   localhost:6703
> 19:split   __execute-latency   {spout:default=0.0}
>
> 2014-09-22 09:44:31,321 731751411404271   localhost:6703
> 19:split   __fail-count{}
>
> 2014-09-22 09:44:31,321 731751411404271   localhost:6703
> 19:split   __emit-count{default=2680}
>
> 2014-09-22 09:44:31,321 731751411404271   localhost:6703
> 19:split   __execute-count {spout:default=420}
>
> 2014-09-22 09:44:31,352 731791411404271   localhost:6703
> 22:split   __ack-count {spout:default=420}
>
> 2014-09-22 09:44:31,352 731791411404271   localhost:6703
> 22:split   __sendqueue {write_pos=2679, capacity=1024,
> read_pos=2679, population=0}
> I do see all the UI related counts coming in the metrics.log.
>
> -Harsha
>
>
> On Mon, Sep 22, 2014, at 10:41 AM, Raphael Hsieh wrote:
>
> Hi Harsha,
> Did you have to bind the metrics consumer to the default StormUI metrics
> at all? Or do those automagically get included ?
>
> Thanks!
>
> On Mon, Sep 22, 2014 at 10:33 AM, Otis Gospodnetic <
> otis.gospodne...@gmail.com> wrote:
>
> Hi Gezim,
>
> On Fri, Sep 19, 2014 at 7:27 PM, Gezim Musliaj  wrote:
>
> Hey Otis, I was just registered at sematext and I can say that this is
> what I have been looking for.I have just one question, what about the
> delays between the SPM and the Storm Cluster (if they do exist), whats the
> worst case? I mean because these metrics are not calculated locally, but
> using an internet connection.
>
>
>
> The worst case is that somebody unplugs your servers from the network, but
> if that happens you have bigger problems to deal with.  In all seriousness,
> Storm (local) => SPM (remote/cloud/saas) is not really a problem -- lots of
> people successfully use SPM for monitoring Storm, Hadoop, Kafka, and other
> types of systems.
>
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/
>
>
>
>
> Thanks !
>
> On Sat, Sep 20, 2014 at 1:15 AM, Otis Gospodnetic <
> otis.gospodne...@gmail.com> wrote:
>
> Raphael,
>
> Not sure if this is what you are after, but SPM <http://sematext.com/spm/>
> will collect and graph all Storm metrics, let you do alerting and anomaly
> detection on them, etc.  If you want to graph custom metrics (e.g.
> something from your bolts), you can send them in as custom metrics
> <https://sematext.atlassian.net/wiki/display/PUBSPM/Custom+Metrics> and
> again graph them, alert on them, do anomaly detection on them, stick them
> on dashboards, etc.  If you want to emit events from your bolts, you can send
> events to SPM
> <https://sematext.atlassian.net/wiki/display/PUBSPM/Events+Integration>,
> too, or you can send them to Logsene <http://www.sematext.com/logsene/>...
> can be handy for correlation with alerts and performance graphs when
> troubleshooting.  Here are some Storm metrics graph:
> http://blog.sematext.com/2014/01/30/announcement-apache-storm-monitoring-in-spm/
>
> I hope this helps.
>
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/
>
>
> On Fri, Sep 19, 2014 at 6:12 PM, Raphael Hsieh 
> wrote:
>
> Hi,
> Using Storm/Trident, how do I register a metrics consumer to log the data
> I get in the StormUI ?
> I want to look at historical data of my topology, for example the execute
> latency of the topology over time, as this would give me good insight as to
> where things might be going wrong when the system breaks.
>
> I have been following the steps outlined in the BigData CookBook here:
> http://www.bigdata-cookbook.com/post/72320512609/storm-metrics-how-to
>
> However I am not wanting to create my own metrics, instead I just want to
> log the metrics that already exist built in to Storm. It is unclear to me
> how I am supposed to go about doing that.
>
> Thanks
>
> --
> Raphael Hsieh
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
> Raphael Hsieh
>
>
>
>
>
>



-- 
Raphael Hsieh


Re: metrics consumer logging stormUI data

2014-09-22 Thread Raphael Hsieh
Hi Harsha,
Did you have to bind the metrics consumer to the default StormUI metrics at
all? Or do those automagically get included ?

Thanks!

On Mon, Sep 22, 2014 at 10:33 AM, Otis Gospodnetic <
otis.gospodne...@gmail.com> wrote:

> Hi Gezim,
>
> On Fri, Sep 19, 2014 at 7:27 PM, Gezim Musliaj  wrote:
>
>> Hey Otis, I was just registered at sematext and I can say that this is
>> what I have been looking for.I have just one question, what about the
>> delays between the SPM and the Storm Cluster (if they do exist), whats the
>> worst case? I mean because these metrics are not calculated locally, but
>> using an internet connection.
>>
>
> The worst case is that somebody unplugs your servers from the network, but
> if that happens you have bigger problems to deal with.  In all seriousness,
> Storm (local) => SPM (remote/cloud/saas) is not really a problem -- lots of
> people successfully use SPM for monitoring Storm, Hadoop, Kafka, and other
> types of systems.
>
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/
>
>
>
>
>> Thanks !
>>
>> On Sat, Sep 20, 2014 at 1:15 AM, Otis Gospodnetic <
>> otis.gospodne...@gmail.com> wrote:
>>
>>> Raphael,
>>>
>>> Not sure if this is what you are after, but SPM
>>> <http://sematext.com/spm/> will collect and graph all Storm metrics,
>>> let you do alerting and anomaly detection on them, etc.  If you want to
>>> graph custom metrics (e.g. something from your bolts), you can send them in
>>> as custom metrics
>>> <https://sematext.atlassian.net/wiki/display/PUBSPM/Custom+Metrics> and
>>> again graph them, alert on them, do anomaly detection on them, stick them
>>> on dashboards, etc.  If you want to emit events from your bolts, you can 
>>> send
>>> events to SPM
>>> <https://sematext.atlassian.net/wiki/display/PUBSPM/Events+Integration>,
>>> too, or you can send them to Logsene <http://www.sematext.com/logsene/>...
>>> can be handy for correlation with alerts and performance graphs when
>>> troubleshooting.  Here are some Storm metrics graph:
>>> http://blog.sematext.com/2014/01/30/announcement-apache-storm-monitoring-in-spm/
>>>
>>>
>>> I hope this helps.
>>>
>>> Otis
>>> --
>>> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
>>> Solr & Elasticsearch Support * http://sematext.com/
>>>
>>>
>>> On Fri, Sep 19, 2014 at 6:12 PM, Raphael Hsieh 
>>> wrote:
>>>
>>>> Hi,
>>>> Using Storm/Trident, how do I register a metrics consumer to log the
>>>> data I get in the StormUI ?
>>>> I want to look at historical data of my topology, for example the
>>>> execute latency of the topology over time, as this would give me good
>>>> insight as to where things might be going wrong when the system breaks.
>>>>
>>>> I have been following the steps outlined in the BigData CookBook here:
>>>> http://www.bigdata-cookbook.com/post/72320512609/storm-metrics-how-to
>>>>
>>>> However I am not wanting to create my own metrics, instead I just want
>>>> to log the metrics that already exist built in to Storm. It is unclear to
>>>> me how I am supposed to go about doing that.
>>>>
>>>> Thanks
>>>>
>>>> --
>>>> Raphael Hsieh
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>
>


-- 
Raphael Hsieh


Re: metrics consumer logging stormUI data

2014-09-22 Thread Raphael Hsieh
Thanks Harsha and Otis for your prompt responses.
I'm looking to somehow log these metrics to use for an in-house monitoring
system. I don't want to get user provided metrics just yet.

>From what I've gathered from the big data cookbook is that I just want to
create a metrics consumer to read these metrics and print it out to a log
file. In order to do this I have added to my config:

config.registerMetricsConsumer(LoggingMetricsConsumer.class, 2);

which should create a loggingMetricsConsumer with a parallelism of 2 (I
believe). I was lead to believe that these logs would be put in a file
called "metrics.log". However after adding this to my topology I have been
unable to find such a log.
If someone could explain to me what I might be missing that would be great.

Thanks!

On Fri, Sep 19, 2014 at 4:27 PM, Gezim Musliaj  wrote:

> Hey Otis, I was just registered at sematext and I can say that this is
> what I have been looking for.I have just one question, what about the
> delays between the SPM and the Storm Cluster (if they do exist), whats the
> worst case? I mean because these metrics are not calculated locally, but
> using an internet connection.
> Thanks !
>
> On Sat, Sep 20, 2014 at 1:15 AM, Otis Gospodnetic <
> otis.gospodne...@gmail.com> wrote:
>
>> Raphael,
>>
>> Not sure if this is what you are after, but SPM
>> <http://sematext.com/spm/> will collect and graph all Storm metrics, let
>> you do alerting and anomaly detection on them, etc.  If you want to graph
>> custom metrics (e.g. something from your bolts), you can send them in as 
>> custom
>> metrics
>> <https://sematext.atlassian.net/wiki/display/PUBSPM/Custom+Metrics> and
>> again graph them, alert on them, do anomaly detection on them, stick them
>> on dashboards, etc.  If you want to emit events from your bolts, you can send
>> events to SPM
>> <https://sematext.atlassian.net/wiki/display/PUBSPM/Events+Integration>,
>> too, or you can send them to Logsene <http://www.sematext.com/logsene/>...
>> can be handy for correlation with alerts and performance graphs when
>> troubleshooting.  Here are some Storm metrics graph:
>> http://blog.sematext.com/2014/01/30/announcement-apache-storm-monitoring-in-spm/
>>
>>
>> I hope this helps.
>>
>> Otis
>> --
>> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
>> Solr & Elasticsearch Support * http://sematext.com/
>>
>>
>> On Fri, Sep 19, 2014 at 6:12 PM, Raphael Hsieh 
>> wrote:
>>
>>> Hi,
>>> Using Storm/Trident, how do I register a metrics consumer to log the
>>> data I get in the StormUI ?
>>> I want to look at historical data of my topology, for example the
>>> execute latency of the topology over time, as this would give me good
>>> insight as to where things might be going wrong when the system breaks.
>>>
>>> I have been following the steps outlined in the BigData CookBook here:
>>> http://www.bigdata-cookbook.com/post/72320512609/storm-metrics-how-to
>>>
>>> However I am not wanting to create my own metrics, instead I just want
>>> to log the metrics that already exist built in to Storm. It is unclear to
>>> me how I am supposed to go about doing that.
>>>
>>> Thanks
>>>
>>> --
>>> Raphael Hsieh
>>>
>>>
>>>
>>>
>>
>>
>


-- 
Raphael Hsieh


metrics consumer logging stormUI data

2014-09-19 Thread Raphael Hsieh
Hi,
Using Storm/Trident, how do I register a metrics consumer to log the data I
get in the StormUI ?
I want to look at historical data of my topology, for example the execute
latency of the topology over time, as this would give me good insight as to
where things might be going wrong when the system breaks.

I have been following the steps outlined in the BigData CookBook here:
http://www.bigdata-cookbook.com/post/72320512609/storm-metrics-how-to

However I am not wanting to create my own metrics, instead I just want to
log the metrics that already exist built in to Storm. It is unclear to me
how I am supposed to go about doing that.

Thanks

-- 
Raphael Hsieh


Re: Parallelism for KafkaSpout

2014-07-23 Thread Raphael Hsieh
how do you tell how many partitions are used in a specific bolt on the
topology ?


On Wed, Jul 23, 2014 at 2:15 PM, Nathan Leung  wrote:

> In your example, five spouts would get data and the other five would not.
> On Jul 23, 2014 5:11 PM, "Kashyap Mhaisekar"  wrote:
>
>> Hi,
>> Is the no. of executors for KafkaSpout dependent on the partitions for
>> the topic?
>> For E.g.,
>> Say kafka TopicA has 5 partitions.
>> If I have a KafkaSpout that has the parallelism hint set to 10, will all
>> the executors get the data? Or am i constrained by the no. of partitions
>> declared for the topic?
>>
>> Regards,
>> Kashyap
>>
>


-- 
Raphael Hsieh


Re: Naming Components In Trident Topology

2014-07-22 Thread Raphael Hsieh
Is there any follow up to this thread?
http://mail-archives.apache.org/mod_mbox/storm-user/201312.mbox/%3C29605543.1418.1386695558006.JavaMail.Laurent@Laurent-PC%3E

For some reason when I make names for each bolt, some of the names seem to
persist to the down stream bolts and this gets confusing.

Is there a way to clearly label each of my bolts with unique names that
don't spill over to other bolts ?


On Thu, Jun 26, 2014 at 7:22 AM, Justin Workman 
wrote:

> Thanks for the replies. This does seem to work!
>
>
> On Wed, Jun 25, 2014 at 5:47 PM, Sam  wrote:
>
>>  I believe you can just call #name. E.g. persistentAggregate( ...
>> ).name( "name" )
>>  --
>> From: Justin Workman 
>> Sent: ‎6/‎25/‎2014 8:56 AM
>> To: user@storm.incubator.apache.org
>> Subject: Naming Components In Trident Topology
>>
>> This is probably a silly question, that I am just overlooking the answer
>> to. We are playing with our first trident topology running under storm
>> 0.9.1. Is there a way to descriptively name the components so they are
>> meaningful in the UI. Similar to how the components appear in the normal
>> storm topology.
>>
>> Currently the UI shows
>>
>> $mastercoord-bg0 for the spout
>>
>> then the following for the bolts
>> $spoutcoord-spout0
>> b-1
>> b-0
>>
>> Is there anyway to make this more friendly.
>>
>> Thanks
>> Justin
>>
>
>


-- 
Raphael Hsieh
Amazon.com
Software Development Engineer I
(978) 764-9014


Re: Max Spout Pending

2014-07-14 Thread Raphael Hsieh
Is there a way to tell how many batches my topology processes per second ?
Or for that matter how many tuples are processed per second ?
Aside from creating a new bolt purely for that aggregation ?


On Mon, Jul 14, 2014 at 2:08 PM, Carlos Rodriguez 
wrote:

> Max spout pending config specifies how many *batches* can be processed
> simultaneously by your topology.
> Thats why 48,000 seems absurdly high to you. Divide it between the batch
> size and you'll get the max spout pending config that you were expecting.
>
>
> 2014-07-14 19:00 GMT+02:00 Raphael Hsieh :
>
> What is the optimal max spout pending to use in a topology ?
>> I found this thread here:
>> http://mail-archives.apache.org/mod_mbox/storm-user/201402.mbox/%3cca+avhzatfg_s88lkombvommkh-rafwr6szy0i8b8tm3rfab...@mail.gmail.com%3E
>> that didn't seem to have a follow up.
>>
>> Part of it says to
>>
>> "Start with a max spout pending that is for sure too small -- one for
>> trident, or the number of executors for storm -- and increase it until you
>> stop seeing changes in the flow. You'll probably end up with something
>> near 2*(throughput
>> in recs/sec)*(end-to-end latency) (2x the Little's law capacity)."
>>
>> Does this make sense for a Max Spout Pending value ?
>> I expect my topology to have a throughput of around 80,000/s and I've
>> been seeing a complete latency of around 300ms, so given this formula, I'd
>> want 2*8*.3 = 48,000 Max Spout Pending.
>>
>> This seems absurdly high to me..
>>
>> --
>> Raphael Hsieh
>>
>>
>>
>>
>
>
>
> --
> Carlos Rodríguez
> Developer at ENEO Tecnología
> http://redborder.net/
> http://lnkd.in/bgfCVF9
>



-- 
Raphael Hsieh


Max Spout Pending

2014-07-14 Thread Raphael Hsieh
What is the optimal max spout pending to use in a topology ?
I found this thread here:
http://mail-archives.apache.org/mod_mbox/storm-user/201402.mbox/%3cca+avhzatfg_s88lkombvommkh-rafwr6szy0i8b8tm3rfab...@mail.gmail.com%3E
that didn't seem to have a follow up.

Part of it says to

"Start with a max spout pending that is for sure too small -- one for
trident, or the number of executors for storm -- and increase it until you
stop seeing changes in the flow. You'll probably end up with something
near 2*(throughput
in recs/sec)*(end-to-end latency) (2x the Little's law capacity)."

Does this make sense for a Max Spout Pending value ?
I expect my topology to have a throughput of around 80,000/s and I've been
seeing a complete latency of around 300ms, so given this formula, I'd want
2*8*.3 = 48,000 Max Spout Pending.

This seems absurdly high to me..

-- 
Raphael Hsieh


Re: Spout process latency

2014-07-09 Thread Raphael Hsieh
So the issue i'm having is that when my system is healthy my "spout0" bolt
will have around 66ms of latency. Every once in awhile my upstream
dependency that pushes data into a Kafka Stream will spike to maybe
10,000ms latency. However, instead of increasing my topology's latency by
only 10,000ms mine's shoots up to 100,000ms roughly, and I'm not entirely
sure why. I took a look at the Storm UI and it seems that the latency of my
system isn't too bad at all, EXCEPT for the spout0 bolt. That guy jumps to
around 150ms process latency.

So I'm trying to figure out why this might be.


On Wed, Jul 9, 2014 at 1:28 PM, Derek Dagit  wrote:

> It should be a windowed average measure of the time between when the
> component receives a tuple and when it acks the tuple.
>
> This can be slower if there is batching, aggregating, or joining happening
> (the component must wait for a number of other tuples to arrive before it
> can ack).
>
> On the UI, there are tool tips that explain the measurements.  They appear
> after hovering over the label.
> --
> Derek
>
>
> On 7/9/14, 15:22, Raphael Hsieh wrote:
>
>> Can somebody explain to me what might cause the spout to have a large
>> process latency? Currently my spout0 and $spoutcoord-spout0 have latency's
>> higher than I would like.
>> I'm consuming data from a Kafka stream.
>>
>> How is this process latency measured ?
>> Is it measuring the amount of time it takes to fill a batch with data and
>> send it to the first bolt in the topology?
>>
>> Thanks
>>
>>


-- 
Raphael Hsieh
Amazon.com
Software Development Engineer I
(978) 764-9014


Spout process latency

2014-07-09 Thread Raphael Hsieh
Can somebody explain to me what might cause the spout to have a large
process latency? Currently my spout0 and $spoutcoord-spout0 have latency's
higher than I would like.
I'm consuming data from a Kafka stream.

How is this process latency measured ?
Is it measuring the amount of time it takes to fill a batch with data and
send it to the first bolt in the topology?

Thanks

-- 
Raphael Hsieh


topology system metrics

2014-07-08 Thread Raphael Hsieh
Is there a way to get a hold of the topology's system metrics and send it
to an external datastore such as dynamoDb ?

-- 
Raphael Hsieh


Re: key values in PersistentAggregate

2014-07-02 Thread Raphael Hsieh
actually I think this is a non-issue,
given the field exists in the stream already, I should be able to access it
right ?


On Wed, Jul 2, 2014 at 10:27 AM, Raphael Hsieh  wrote:

> From my understanding, if I implement my own state factory to use in
> PersistentAggregate, the grouping fields will be the key values in the
> state,
> However, if I want to have access to other fields in the aggregation, how
> might I get those ? From my understanding, doing a groupBy() will create a
> new GroupedStream which will only have the fields specified in the
> groupBy().
>
> Essentially what I want to do is:
> stream
> .groupBy(new Fields("a"))
> .persistentAggregate(
> new Factory(),
> new Fields("b"),
> ...
>     )
>
> How would I do this ?
>
> --
> Raphael Hsieh
>
>
>
>



-- 
Raphael Hsieh
Amazon.com
Software Development Engineer I
(978) 764-9014


key values in PersistentAggregate

2014-07-02 Thread Raphael Hsieh
>From my understanding, if I implement my own state factory to use in
PersistentAggregate, the grouping fields will be the key values in the
state,
However, if I want to have access to other fields in the aggregation, how
might I get those ? From my understanding, doing a groupBy() will create a
new GroupedStream which will only have the fields specified in the
groupBy().

Essentially what I want to do is:
stream
.groupBy(new Fields("a"))
.persistentAggregate(
new Factory(),
new Fields("b"),
...
)

How would I do this ?

-- 
Raphael Hsieh


Re: using CachedMap in a trident state

2014-06-13 Thread Raphael Hsieh
If we don't serialize the data when we store it in the cache, doesn't that
defeat the purpose of having an OpaqueValue in order to keep transactional
consistency and the processed exactly once semantics?


On Thu, Jun 12, 2014 at 8:57 AM, Raphael Hsieh  wrote:

> How come we only serialize data when storing it into the external
> datastore and not the local cache then?
>
>
> On Thu, Jun 12, 2014 at 7:56 AM, Romain Leroux 
> wrote:
>
>>
>> https://github.com/nathanmarz/trident-memcached/blob/master/src/jvm/trident/memcached/MemcachedState.java#L62
>> localCacheSize sets the size of the LRU cache used locally, so no
>> CachedMap data are not replicated among all host.
>>
>> The idea is that you use this on a grouped stream to store the result of
>> your aggregation.
>> So based on the hash (the groupBy fields) all related tuples always go to
>> the same host.
>> You want to avoid storing the same cached data in all hosts, otherwise
>> scaling out is meaningless.
>>
>>
>>
>> 2014-06-12 6:58 GMT+09:00 Raphael Hsieh :
>>
>> I am imitating the MemCachedState class found here:
>>> https://github.com/nathanmarz/trident-memcached/blob/master/src/jvm/trident/memcached/MemcachedState.java
>>> .
>>>
>>> I was wondering what the CachedMap is used for. I see that it creates a
>>> cache layer between the final datastore in order to retrieve values much
>>> quicker than accessing the datastore every time, however I am unsure about
>>> a couple details.
>>>
>>> Is this cache replicated among all hosts? When I do a 'multiGet' I
>>> expect to retrieve data I had previously stored. If the cache is specific
>>> to each host, I wouldn't necessarily get the same data I had most recently
>>> stored.
>>>
>>> Also, how does this work with Opaque Transactional consistency? It seems
>>> that in the MemCachedState example we serialize the data once we store it
>>> to the external datastore, however the data in the cache is not serialized.
>>> why is this?
>>> Shouldn't the local cache have the same data as the external datastore ?
>>>
>>> thanks
>>> --
>>> Raphael Hsieh
>>>
>>>
>>>
>>>
>>
>>
>
>
> --
> Raphael Hsieh
>
>


-- 
Raphael Hsieh
Amazon.com
Software Development Engineer I
(978) 764-9014


Re: using CachedMap in a trident state

2014-06-12 Thread Raphael Hsieh
How come we only serialize data when storing it into the external datastore
and not the local cache then?


On Thu, Jun 12, 2014 at 7:56 AM, Romain Leroux  wrote:

>
> https://github.com/nathanmarz/trident-memcached/blob/master/src/jvm/trident/memcached/MemcachedState.java#L62
> localCacheSize sets the size of the LRU cache used locally, so no
> CachedMap data are not replicated among all host.
>
> The idea is that you use this on a grouped stream to store the result of
> your aggregation.
> So based on the hash (the groupBy fields) all related tuples always go to
> the same host.
> You want to avoid storing the same cached data in all hosts, otherwise
> scaling out is meaningless.
>
>
>
> 2014-06-12 6:58 GMT+09:00 Raphael Hsieh :
>
> I am imitating the MemCachedState class found here:
>> https://github.com/nathanmarz/trident-memcached/blob/master/src/jvm/trident/memcached/MemcachedState.java
>> .
>>
>> I was wondering what the CachedMap is used for. I see that it creates a
>> cache layer between the final datastore in order to retrieve values much
>> quicker than accessing the datastore every time, however I am unsure about
>> a couple details.
>>
>> Is this cache replicated among all hosts? When I do a 'multiGet' I expect
>> to retrieve data I had previously stored. If the cache is specific to each
>> host, I wouldn't necessarily get the same data I had most recently stored.
>>
>> Also, how does this work with Opaque Transactional consistency? It seems
>> that in the MemCachedState example we serialize the data once we store it
>> to the external datastore, however the data in the cache is not serialized.
>> why is this?
>> Shouldn't the local cache have the same data as the external datastore ?
>>
>> thanks
>> --
>> Raphael Hsieh
>>
>>
>>
>>
>
>


-- 
Raphael Hsieh


using CachedMap in a trident state

2014-06-11 Thread Raphael Hsieh
I am imitating the MemCachedState class found here:
https://github.com/nathanmarz/trident-memcached/blob/master/src/jvm/trident/memcached/MemcachedState.java
.

I was wondering what the CachedMap is used for. I see that it creates a
cache layer between the final datastore in order to retrieve values much
quicker than accessing the datastore every time, however I am unsure about
a couple details.

Is this cache replicated among all hosts? When I do a 'multiGet' I expect
to retrieve data I had previously stored. If the cache is specific to each
host, I wouldn't necessarily get the same data I had most recently stored.

Also, how does this work with Opaque Transactional consistency? It seems
that in the MemCachedState example we serialize the data once we store it
to the external datastore, however the data in the cache is not serialized.
why is this?
Shouldn't the local cache have the same data as the external datastore ?

thanks
-- 
Raphael Hsieh


is storm.trident LRUMap distributed among hosts?

2014-06-10 Thread Raphael Hsieh
Is the storm.trident.util.LRUMap distributed among all the hosts in the
storm cluster ?

if not, is there any way to combine this with a memcache ?

-- 
Raphael Hsieh


Re: how does PersistentAggregate distribute the DB Calls ?

2014-06-03 Thread Raphael Hsieh
Thanks for your quick reply nathan.
So I'm doing some debugging of my topology, and I've removed all the logic
from my MultiPut function, replacing it with a single System.out.println()
Then i am monitoring my logs to check when this gets printed out.
It looks like every single one of my hosts (workers) hits this. Does this
then indicate that I am processing many many partitions that each hit this
multiPut and prints out?
Thanks.


On Tue, Jun 3, 2014 at 3:29 PM, Nathan Marz  wrote:

> When possible it will do as much aggregation Storm-side so as to minimize
> amount it needs to interact with database. So if you do a persistent global
> count, for example, it will compute the count for the batch (in parallel),
> and then the task that finishes the global count will do a single
> get/update/put to the database.
>
>
> On Tue, Jun 3, 2014 at 3:08 PM, Raphael Hsieh 
> wrote:
>
>> How does PersistentAggregate distribute the database calls across all the
>> worked nodes ?
>> Does it do the global aggregation then choose a single host to do a
>> multiget/multiput to the external db ?
>>
>> Thanks
>> --
>> Raphael Hsieh
>>
>>
>>
>>
>
>
>
> --
> Twitter: @nathanmarz
> http://nathanmarz.com
>



-- 
Raphael Hsieh


how does PersistentAggregate distribute the DB Calls ?

2014-06-03 Thread Raphael Hsieh
How does PersistentAggregate distribute the database calls across all the
worked nodes ?
Does it do the global aggregation then choose a single host to do a
multiget/multiput to the external db ?

Thanks
-- 
Raphael Hsieh


Batch size never seems to increase

2014-06-02 Thread Raphael Hsieh
I'm trying to figure out why my batch size never seems to get any bigger
than 83K tuples.
It could be because this is the throughput of my spout, however I don't
believe this to be the case as I believe the spout is backing up (i'm not
processing the tuples as quickly as they are being produced)

Currently I'm just using a barebones topology that looks like this:

Stream spout = topology.newStream("...", ...)
   .parallelismHint(x)
   .groupBy("new Fields("time"))
   .aggregate(new Count(), new Fields("count"))
   .parallelismHint(x)
   .each(new Fields("time", "count"), new PrintFilter());

All the stream is doing is aggregating on like timestamps and printing out
the count.

in my config I've set batch size to 10mb like so:
Config config = new Config();
config.(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 1024*1024*10);

when I have the batch size to 5mb or even 1mb there is no difference,
everything always adds up to roughly 83K tuples.

in order to count up how many tuples are in the batch, I take a look at the
system timestamp of when things are printed out (in the print filter) and
all the print statements that have the same timestamp, I add the count
values up together.

When I compare the system timestamp of when the batch was processed, and
the tuple timestamps (that they were aggregated on) I am falling behind.
This leads me to believe that the spout is emitting more than the number of
tuples I am processing, so there should be more than 83K tuples per batch.

If anyone has insight to this it would be greatly appreciated.
Thanks!
-- 
Raphael Hsieh


Re: Optimizing Kafka Stream

2014-06-02 Thread Raphael Hsieh
Oh ok. Thanks Chi!
Do you have any ideas about why my batch size never seems to get any bigger
than 83K tuples ?
Currently I'm just using a barebones topology that looks like this:

Stream spout = topology.newStream("...", ...)
   .parallelismHint()
   .groupBy("new Fields("time"))
   .aggregate(new Count(), new Fields("Count"))
   .parallelismHint()
   .each(new Fields("time", "count"), new PrintFilter());

All the stream is doing is aggregating on like timestamps and printing out
the count.

in my config I've set batch size to 10mb like so:
Config config = new Config();
config.(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 1024*1024*10);

when I have the batch size to 5mb or even 1mb there is no difference,
everything always adds up to roughly 83K tuples.

in order to count up how many tuples are in the batch, I take a look at the
system timestamp of when things are printed out (in the print filter) and
all the print statements that have the same timestamp, I add the count
values up together.

On Mon, Jun 2, 2014 at 10:09 AM, Chi Hoang  wrote:

> Raphael,
> The number of partitions is defined in your Kafka configuration -
> http://kafka.apache.org/documentation.html#brokerconfigs (num.partitions)
> - or when you create the topic.  The behavior is different for each version
> of Kafka, so you should read more documentation.  Your topology needs to
> match the Kafka configuration for the topic.
>
> Chi
>
>
> On Mon, Jun 2, 2014 at 8:46 AM, Raphael Hsieh 
> wrote:
>
>> Thanks for the tips Chi,
>> I'm a little confused about the partitioning. I had thought that the
>> number of partitions was determined by the amount of parallelism in the
>> topology. For example if I said .parallelismHint(4), then I would have 4
>> different partitions. Is this not the case ?
>> Is there a set number of partitions my topology has that I need to
>> increase in order to have higher parallelism ?
>>
>> Thanks
>>
>>
>> On Sat, May 31, 2014 at 11:50 AM, Chi Hoang  wrote:
>>
>>> Raphael,
>>> You can try tuning your parallelism (and num workers).
>>>
>>> For Kafka 0.7, your spout parallelism could max out at: # brokers x #
>>> partitions (for the topic).  If you have 4 Kafka brokers, and your topic
>>> has 5 partitions, then you could set the spout parallelism to 20 to
>>> maximize the throughput.
>>>
>>> For Kafka 0.8+, your spout parallelism could max out at # partitions for
>>> the topic, so if your topic has 5 partitions, then you would set the spout
>>> parallelism to 5.  To increase parallelism, you would need to increase the
>>> number of partitions for your topic (by using the add partitions utility).
>>>
>>> As for the number of workers, setting it to 1 means that your spout will
>>> only run on a single Storm node, and would likely share resources with
>>> other Storm processes (spouts and bolts).  I recommend to increase the
>>> number of workers so Storm has a chance to spread out the work, and keep a
>>> good balance.
>>>
>>> Hope this helps.
>>>
>>> Chi
>>>
>>>
>>> On Fri, May 30, 2014 at 4:24 PM, Raphael Hsieh 
>>> wrote:
>>>
>>>> I am in the process of optimizing my stream. Currently I expect 5 000
>>>> 000 tuples to come out of my spout per minute. I am trying to beef up my
>>>> topology in order to process this in real time without falling behind.
>>>>
>>>> For some reason my batch size is capping out at 83 thousand tuples. I
>>>> can't seem to make it any bigger. the processing time doesn't seem to get
>>>> any smaller than 2-3 seconds either.
>>>> I'm not sure how to configure the topology to get any faster / more
>>>> efficient.
>>>>
>>>> Currently all the topology does is a groupby on time and an aggregation
>>>> (Count) to aggregate everything.
>>>>
>>>> Here are some data points i've figured out.
>>>>
>>>> Batch Size:5mb
>>>> num-workers: 1
>>>> parallelismHint: 2
>>>> (I'll write this a 5mb, 1, 2)
>>>>
>>>> 5mb, 1, 2 = 83K tuples / 6s
>>>> 10mb, 1, 2 = 83k / 7s
>>>> 5mb, 1, 4 = 83k / 6s
>>>> 5mb, 2, 4 = 83k / 3s
>>>> 5mb, 3, 6 = 83k / 3s
>>>> 10mb, 3, 6 = 83k / 3s
>>>>
>>>> Can anybody help me figure out how to get it to process things faster ?
>>>>
>>>> My maxSpoutPending is at 1, but when I increased it to 2 it was the
>>>> same. MessageTimeoutSec = 100
>>>>
>>>> I've been following this blog: https://gist.github.com/mrflip/5958028
>>>> to an extent, not everything word for word though.
>>>>
>>>> I need to be able to process around 66,000 tuples per second and I'm
>>>> starting to run out of ideas.
>>>>
>>>> Thanks
>>>>
>>>> --
>>>> Raphael Hsieh
>>>>
>>>>
>>>>
>>>
>>>
>>>
>>
>>
>> --
>> Raphael Hsieh
>>
>>
>>
>>
>
>
>
> --
> Data Systems Engineering
> data-syst...@groupon.com
>



-- 
Raphael Hsieh


Re: Optimizing Kafka Stream

2014-06-02 Thread Raphael Hsieh
Thanks for the tips Chi,
I'm a little confused about the partitioning. I had thought that the number
of partitions was determined by the amount of parallelism in the topology.
For example if I said .parallelismHint(4), then I would have 4 different
partitions. Is this not the case ?
Is there a set number of partitions my topology has that I need to increase
in order to have higher parallelism ?

Thanks


On Sat, May 31, 2014 at 11:50 AM, Chi Hoang  wrote:

> Raphael,
> You can try tuning your parallelism (and num workers).
>
> For Kafka 0.7, your spout parallelism could max out at: # brokers x #
> partitions (for the topic).  If you have 4 Kafka brokers, and your topic
> has 5 partitions, then you could set the spout parallelism to 20 to
> maximize the throughput.
>
> For Kafka 0.8+, your spout parallelism could max out at # partitions for
> the topic, so if your topic has 5 partitions, then you would set the spout
> parallelism to 5.  To increase parallelism, you would need to increase the
> number of partitions for your topic (by using the add partitions utility).
>
> As for the number of workers, setting it to 1 means that your spout will
> only run on a single Storm node, and would likely share resources with
> other Storm processes (spouts and bolts).  I recommend to increase the
> number of workers so Storm has a chance to spread out the work, and keep a
> good balance.
>
> Hope this helps.
>
> Chi
>
>
> On Fri, May 30, 2014 at 4:24 PM, Raphael Hsieh 
> wrote:
>
>> I am in the process of optimizing my stream. Currently I expect 5 000 000
>> tuples to come out of my spout per minute. I am trying to beef up my
>> topology in order to process this in real time without falling behind.
>>
>> For some reason my batch size is capping out at 83 thousand tuples. I
>> can't seem to make it any bigger. the processing time doesn't seem to get
>> any smaller than 2-3 seconds either.
>> I'm not sure how to configure the topology to get any faster / more
>> efficient.
>>
>> Currently all the topology does is a groupby on time and an aggregation
>> (Count) to aggregate everything.
>>
>> Here are some data points i've figured out.
>>
>> Batch Size:5mb
>> num-workers: 1
>> parallelismHint: 2
>> (I'll write this a 5mb, 1, 2)
>>
>> 5mb, 1, 2 = 83K tuples / 6s
>> 10mb, 1, 2 = 83k / 7s
>> 5mb, 1, 4 = 83k / 6s
>> 5mb, 2, 4 = 83k / 3s
>> 5mb, 3, 6 = 83k / 3s
>> 10mb, 3, 6 = 83k / 3s
>>
>> Can anybody help me figure out how to get it to process things faster ?
>>
>> My maxSpoutPending is at 1, but when I increased it to 2 it was the same.
>> MessageTimeoutSec = 100
>>
>> I've been following this blog: https://gist.github.com/mrflip/5958028
>> to an extent, not everything word for word though.
>>
>> I need to be able to process around 66,000 tuples per second and I'm
>> starting to run out of ideas.
>>
>> Thanks
>>
>> --
>> Raphael Hsieh
>>
>>
>>
>
>
>


-- 
Raphael Hsieh


Optimizing Kafka Stream

2014-05-30 Thread Raphael Hsieh
I am in the process of optimizing my stream. Currently I expect 5 000 000
tuples to come out of my spout per minute. I am trying to beef up my
topology in order to process this in real time without falling behind.

For some reason my batch size is capping out at 83 thousand tuples. I can't
seem to make it any bigger. the processing time doesn't seem to get any
smaller than 2-3 seconds either.
I'm not sure how to configure the topology to get any faster / more
efficient.

Currently all the topology does is a groupby on time and an aggregation
(Count) to aggregate everything.

Here are some data points i've figured out.

Batch Size:5mb
num-workers: 1
parallelismHint: 2
(I'll write this a 5mb, 1, 2)

5mb, 1, 2 = 83K tuples / 6s
10mb, 1, 2 = 83k / 7s
5mb, 1, 4 = 83k / 6s
5mb, 2, 4 = 83k / 3s
5mb, 3, 6 = 83k / 3s
10mb, 3, 6 = 83k / 3s

Can anybody help me figure out how to get it to process things faster ?

My maxSpoutPending is at 1, but when I increased it to 2 it was the same.
MessageTimeoutSec = 100

I've been following this blog: https://gist.github.com/mrflip/5958028
to an extent, not everything word for word though.

I need to be able to process around 66,000 tuples per second and I'm
starting to run out of ideas.

Thanks

-- 
Raphael Hsieh


Re: Position in Kafka Stream

2014-05-29 Thread Raphael Hsieh
Thanks Tyson!
This blog is super helpful.
I've been able to get LoggingMetrics working to an extent, however if I try
to create multiple CountMetrics in the same function, I only see one show
up in my NimbusUI. Does anybody know why this is ?


On Thu, May 29, 2014 at 8:57 AM, Tyson Norris  wrote:

>  I found this blog helpful:
> http://www.bigdata-cookbook.com/post/72320512609/storm-metrics-how-to
>
>  Best regards,
> Tyson
>
>  On May 29, 2014, at 8:41 AM, Raphael Hsieh  wrote:
>
>  Can someone explain to me what LoggingMetrics is ?
> I've heard of it and people have told me to use it, but I can't find any
> documentation on it or any resources on how to use it.
>
> Thanks
>
>
> On Thu, May 29, 2014 at 12:06 AM, Tyson Norris  wrote:
>
>> Hi -
>> Thanks - it turns out that the JSON parsing is actually fine with HEAD,
>> although inaccurate without the required message format (comments mention
>> expecting an “s” property with timestamp value).
>>
>>  My problem was that I was not specifying the spout root properly,
>> i.e. --spoutroot /transactional//user/  (in my case I had
>> specified a path that was valid, but not a spout)
>>
>>  Now I get offset info properly via monitor.py - Thanks!
>>
>>  Tyson
>>
>>  On May 28, 2014, at 10:12 AM, Cody A. Ray  wrote:
>>
>>  Right, its trying to read your kafka messages and parse as JSON.  See
>> the error:
>>
>>  simplejson.scanner.JSONDecodeError: Expecting value: line 1 column 1
>> (char 0)
>>
>>  If you want to use the BrightTag branch, you'll need to go a couple
>> commits back. Try this:
>>
>>  git clone https://github.com/BrightTag/stormkafkamon
>>  git checkout 07eede9ec72329fe2cad893d087541b583e11148
>>
>>  -Cody
>>
>>
>> On Wed, May 28, 2014 at 10:39 AM, Tyson Norris  wrote:
>>
>>> Thanks Cody -
>>> I tried the BrightTag fork and still have problems with
>>> storm 0.9.1-incubating and kafka 0.8.1, I get an error with my trident
>>> topology (haven’t tried non-trident yet):
>>> (venv)tnorris-osx:stormkafkamon tnorris$ ./monitor.py --topology
>>> TrendingTagTopology --spoutroot storm --friendly
>>> Traceback (most recent call last):
>>>   File "./monitor.py", line 112, in 
>>> sys.exit(main())
>>>   File "./monitor.py", line 96, in main
>>> zk_data = process(zc.spouts(options.spoutroot, options.topology))
>>>   File "/git/github/stormkafkamon/stormkafkamon/zkclient.py", line 76,
>>> in spouts
>>> j = json.loads(self.client.get(self._zjoin([spout_root, c, p]))[0])
>>>   File
>>> "/git/github/stormkafkamon/venv/lib/python2.7/site-packages/simplejson/__init__.py",
>>> line 501, in loads
>>> return _default_decoder.decode(s)
>>>   File
>>> "/git/github/stormkafkamon/venv/lib/python2.7/site-packages/simplejson/decoder.py",
>>> line 370, in decode
>>> obj, end = self.raw_decode(s)
>>>   File
>>> "/git/github/stormkafkamon/venv/lib/python2.7/site-packages/simplejson/decoder.py",
>>> line 389, in raw_decode
>>> return self.scan_once(s, idx=_w(s, idx).end())
>>> simplejson.scanner.JSONDecodeError: Expecting value: line 1 column 1
>>> (char 0)
>>> (venv)tnorris-osx:stormkafkamon tnorris$
>>>
>>>  I’m not too familiar with python but will try to debug it as time
>>> allows - let me know if you have advice.
>>>
>>>  Thanks
>>>  Tyson
>>>
>>>
>>>
>>>
>>>  On May 28, 2014, at 7:20 AM, Cody A. Ray  wrote:
>>>
>>>  You can also use stormkafkamon to track this stuff. Its not good for
>>> historical analysis like graphite/ganglia, but its good if you just want to
>>> see how things currently stand.
>>>
>>>  The original: https://github.com/otoolep/stormkafkamon
>>>
>>>  This didn't work for us without some updates (incompatibility with the
>>> latest python-kafka dep). Here are those updates:
>>> https://github.com/BrightTag/stormkafkamon/commit/07eede9ec72329fe2cad893d087541b583e11148
>>>
>>>  (Our branch has a couple more things that parse the kafka messages
>>> with our format (which embeds a timestamp) to determine how long (in time)
>>> storm is behind... planning to clean that up soon so it can be a bit more
>>> reusable)
>>>
>>>  https://github.com/BrightTag/stormkafkamon
>>>
>>>  -Cody
>>>

Re: Nimbus UI fields

2014-05-29 Thread Raphael Hsieh
what might cause a tuple to be 'Acked' vs jus 'Executed'?
How should I interpret these values ?

Thanks


On Tue, May 20, 2014 at 9:20 PM, Cody A. Ray  wrote:

> The two bolts which emit/transfer 0 are likely your persistentAggregate
> bolts. These are *sinks* so they don't logically emit/transfer tuples any
> farther.
>
> You can add add a name which will show up in the UI to help you see how
> Trident compiles into your Storm topology.
>
> .name("Aggregator 1")
> .persistentAggregate(...)
>
> .name("Aggregator 2")
> .persistentAggregate(...)
>
> -Cody
>
>
> On Tue, May 20, 2014 at 7:38 PM, Harsha  wrote:
>
>>  Executed refers to number of incoming tuples processed.
>>
>> capacity is determined by (executed * latency) / window (time duration).
>>
>> UI should give you description of those stats if you hover over table
>> headers.
>>
>>
>>
>>
>> On Tue, May 20, 2014, at 03:36 PM, Raphael Hsieh wrote:
>>
>> I reattached the previous image in case it was too difficult to read
>> before
>>
>>
>> On Tue, May 20, 2014 at 3:31 PM, Raphael Hsieh 
>> wrote:
>>
>> Hi I'm confused as to what each field in the StormUI represents and how
>> to use the information.
>> [image: Inline image 1]
>>
>> The bolts I have above are formed from trident. This is what operations I
>> believe each bolt represents
>>  b-0 : .each(function) -> .each(filter)
>> b-1 : .aggregate
>> --split--
>> b-2 : .persistentAggregate
>> b-3 : .persistentAggregate
>>
>> What does it mean for the first two bolts to emit and transfer 0 ?
>>  What is the Capacity field ? What does that represent ?
>> Does Execute refer to the tuples acked and successfully processed?
>>
>> Thanks
>> --
>> Raphael Hsieh
>>
>>
>>
>>
>>
>>
>>
>> --
>> Raphael Hsieh
>>
>>
>>
>>
>> Email had 2 attachments:
>>
>>- image.png
>>-   41k (image/png)
>>- NimbusUI.PNG
>>-   22k (image/png)
>>
>>
>
>
> --
> Cody A. Ray, LEED AP
> cody.a@gmail.com
> 215.501.7891
>



-- 
Raphael Hsieh
Amazon.com
Software Development Engineer I
(978) 764-9014


Re: Position in Kafka Stream

2014-05-29 Thread Raphael Hsieh
Can someone explain to me what LoggingMetrics is ?
I've heard of it and people have told me to use it, but I can't find any
documentation on it or any resources on how to use it.

Thanks


On Thu, May 29, 2014 at 12:06 AM, Tyson Norris  wrote:

>  Hi -
> Thanks - it turns out that the JSON parsing is actually fine with HEAD,
> although inaccurate without the required message format (comments mention
> expecting an “s” property with timestamp value).
>
>  My problem was that I was not specifying the spout root properly,
> i.e. --spoutroot /transactional//user/  (in my case I had
> specified a path that was valid, but not a spout)
>
>  Now I get offset info properly via monitor.py - Thanks!
>
>  Tyson
>
>  On May 28, 2014, at 10:12 AM, Cody A. Ray  wrote:
>
>  Right, its trying to read your kafka messages and parse as JSON.  See
> the error:
>
>  simplejson.scanner.JSONDecodeError: Expecting value: line 1 column 1
> (char 0)
>
>  If you want to use the BrightTag branch, you'll need to go a couple
> commits back. Try this:
>
>  git clone https://github.com/BrightTag/stormkafkamon
>  git checkout 07eede9ec72329fe2cad893d087541b583e11148
>
>  -Cody
>
>
> On Wed, May 28, 2014 at 10:39 AM, Tyson Norris  wrote:
>
>> Thanks Cody -
>> I tried the BrightTag fork and still have problems with
>> storm 0.9.1-incubating and kafka 0.8.1, I get an error with my trident
>> topology (haven’t tried non-trident yet):
>> (venv)tnorris-osx:stormkafkamon tnorris$ ./monitor.py --topology
>> TrendingTagTopology --spoutroot storm --friendly
>> Traceback (most recent call last):
>>   File "./monitor.py", line 112, in 
>> sys.exit(main())
>>   File "./monitor.py", line 96, in main
>> zk_data = process(zc.spouts(options.spoutroot, options.topology))
>>   File "/git/github/stormkafkamon/stormkafkamon/zkclient.py", line 76, in
>> spouts
>> j = json.loads(self.client.get(self._zjoin([spout_root, c, p]))[0])
>>   File
>> "/git/github/stormkafkamon/venv/lib/python2.7/site-packages/simplejson/__init__.py",
>> line 501, in loads
>> return _default_decoder.decode(s)
>>   File
>> "/git/github/stormkafkamon/venv/lib/python2.7/site-packages/simplejson/decoder.py",
>> line 370, in decode
>> obj, end = self.raw_decode(s)
>>   File
>> "/git/github/stormkafkamon/venv/lib/python2.7/site-packages/simplejson/decoder.py",
>> line 389, in raw_decode
>> return self.scan_once(s, idx=_w(s, idx).end())
>> simplejson.scanner.JSONDecodeError: Expecting value: line 1 column 1
>> (char 0)
>> (venv)tnorris-osx:stormkafkamon tnorris$
>>
>>  I’m not too familiar with python but will try to debug it as time
>> allows - let me know if you have advice.
>>
>>  Thanks
>>  Tyson
>>
>>
>>
>>
>>  On May 28, 2014, at 7:20 AM, Cody A. Ray  wrote:
>>
>>  You can also use stormkafkamon to track this stuff. Its not good for
>> historical analysis like graphite/ganglia, but its good if you just want to
>> see how things currently stand.
>>
>>  The original: https://github.com/otoolep/stormkafkamon
>>
>>  This didn't work for us without some updates (incompatibility with the
>> latest python-kafka dep). Here are those updates:
>> https://github.com/BrightTag/stormkafkamon/commit/07eede9ec72329fe2cad893d087541b583e11148
>>
>>  (Our branch has a couple more things that parse the kafka messages with
>> our format (which embeds a timestamp) to determine how long (in time) storm
>> is behind... planning to clean that up soon so it can be a bit more
>> reusable)
>>
>>  https://github.com/BrightTag/stormkafkamon
>>
>>  -Cody
>>
>>
>> On Wed, May 28, 2014 at 4:50 AM, Danijel Schiavuzzi <
>> dani...@schiavuzzi.com> wrote:
>>
>>> Yes, Trident Kafka spouts give you the same metrics. Take a look at the
>>> code to find out what's available.
>>>
>>>
>>> On Wed, May 28, 2014 at 3:55 AM, Tyson Norris  wrote:
>>>
>>>> Do Trident variants of kafka spouts do something similar?
>>>> Thanks
>>>> Tyson
>>>>
>>>> > On May 27, 2014, at 3:19 PM, "Harsha"  wrote:
>>>> >
>>>> > Raphael,
>>>> >kafka spout sends metrics for kafkaOffset and kafkaPartition
>>>> you can look at those by using LoggingMetrics or setting up a ganglia.
>>>> Kafka uses its own zookeeper to store state info per topic & group.id
>>>

Re: Trident, ZooKeeper and Kafka

2014-05-29 Thread Raphael Hsieh
By setting forceFromStart to true, aren't I telling it to start from the
beginning or earliest time then ?


On Thu, May 29, 2014 at 12:59 AM, Danijel Schiavuzzi  wrote:

> You must set both forceFromStart to true and startOffsetTime to -1 or -2.
>
>
> On Thu, May 29, 2014 at 12:23 AM, Raphael Hsieh 
> wrote:
>
>> I'm doing both tridentKafkaConfig.forceFromStart = false; as well as
>> tridentKafkaConfig.startOffsetTime = -1;
>>
>> Neither are working for me. Looking at my nimbus UI, I still get a large
>> spike in processed data, before it levels off and seems to not process
>> anything.
>>
>>
>>
>> On Wed, May 28, 2014 at 3:17 PM, Shaikh Riyaz 
>> wrote:
>>
>>> I think you can use  kafkaConfig.forceFromStart = *false*;
>>>
>>> We have implemented this and its working fine.
>>>
>>> Regards,
>>> Riyaz
>>>
>>>
>>>
>>> On Thu, May 29, 2014 at 1:02 AM, Raphael Hsieh 
>>> wrote:
>>>
>>>> This is still not working for me. I've set the offset to -1 and it is
>>>> still backfilling data.
>>>> Is there any documentation on the start offsets that I could take a
>>>> look at ?
>>>> Or even documentation on kafka.api.OffsetRequest.LatestTime() ?
>>>>
>>>>
>>>> On Wed, May 28, 2014 at 1:01 PM, Raphael Hsieh 
>>>> wrote:
>>>>
>>>>> would the Trident version of this be
>>>>> tridentKafkaConfig.startOffsetTime ?
>>>>>
>>>>>
>>>>> On Wed, May 28, 2014 at 12:23 PM, Danijel Schiavuzzi <
>>>>> dani...@schiavuzzi.com> wrote:
>>>>>
>>>>>> By default, the Kafka spout resumes consuming where it last left off.
>>>>>> That offset is stored in ZooKeeper.
>>>>>>
>>>>>> You can set forceStartOffset to -2 to start consuming from the
>>>>>> earliest available offset, or -1 to start consuming from the latest
>>>>>> available offset.
>>>>>>
>>>>>>
>>>>>> On Wednesday, May 28, 2014, Raphael Hsieh 
>>>>>> wrote:
>>>>>>
>>>>>>> If I don't tell trident to start consuming data from the beginning
>>>>>>> of the Kafka stream, where does it start from?
>>>>>>> If I were to do:
>>>>>>>tridentKafkaConfig.forceFromStart = true;
>>>>>>> Then it will tell the spout to start consuming from the start of the
>>>>>>> stream. If that is not set, then where does it start consuming from? and
>>>>>>> How might I go about telling it to start consuming from the very end of 
>>>>>>> the
>>>>>>> stream?
>>>>>>>
>>>>>>> If a disaster were to happen and all my hosts died, when I start my
>>>>>>> cluster back up, it might start consuming from where it left off. I 
>>>>>>> would
>>>>>>> rather manually process that old data, and have my storm system start
>>>>>>> processing the live data.
>>>>>>>
>>>>>>> Thanks
>>>>>>> --
>>>>>>> Raphael Hsieh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Danijel Schiavuzzi
>>>>>>
>>>>>> E: dani...@schiavuzzi.com
>>>>>> W: www.schiavuzzi.com
>>>>>> T: +385989035562
>>>>>> Skype: danijels7
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Raphael Hsieh
>>>>> Amazon.com
>>>>> Software Development Engineer I
>>>>> (978) 764-9014
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Raphael Hsieh
>>>> Amazon.com
>>>> Software Development Engineer I
>>>> (978) 764-9014
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>>
>>> --
>>> Regards,
>>>
>>> Riyaz
>>>
>>>
>>
>>
>> --
>> Raphael Hsieh
>>
>>
>>
>
>
>
> --
> Danijel Schiavuzzi
>
> E: dani...@schiavuzzi.com
> W: www.schiavuzzi.com
> T: +385989035562
> Skype: danijels7
>



-- 
Raphael Hsieh
Amazon.com
Software Development Engineer I
(978) 764-9014


Re: Different ZooKeeper Cluster for storm and kafka ?

2014-05-28 Thread Raphael Hsieh
Never mind I figured this out.

Thanks


On Wed, May 28, 2014 at 3:59 PM, Raphael Hsieh  wrote:

> Hi I believe it is possible to have my Storm topology run on a different
> ZooKeeper cluster than the source of my data (this case being Kafka). I
> cannot seem to find documentation on how to do this?
>
> Can anybody help me figure this out? Or point me to some docs that will
> explain this?
>
> Thanks
> --
> Raphael Hsieh
>
>
>
>



-- 
Raphael Hsieh
Amazon.com
Software Development Engineer I
(978) 764-9014


Different ZooKeeper Cluster for storm and kafka ?

2014-05-28 Thread Raphael Hsieh
Hi I believe it is possible to have my Storm topology run on a different
ZooKeeper cluster than the source of my data (this case being Kafka). I
cannot seem to find documentation on how to do this?

Can anybody help me figure this out? Or point me to some docs that will
explain this?

Thanks
-- 
Raphael Hsieh


Re: Trident, ZooKeeper and Kafka

2014-05-28 Thread Raphael Hsieh
I'm doing both tridentKafkaConfig.forceFromStart = false; as well as
tridentKafkaConfig.startOffsetTime = -1;

Neither are working for me. Looking at my nimbus UI, I still get a large
spike in processed data, before it levels off and seems to not process
anything.



On Wed, May 28, 2014 at 3:17 PM, Shaikh Riyaz  wrote:

> I think you can use  kafkaConfig.forceFromStart = *false*;
>
> We have implemented this and its working fine.
>
> Regards,
> Riyaz
>
>
>
> On Thu, May 29, 2014 at 1:02 AM, Raphael Hsieh wrote:
>
>> This is still not working for me. I've set the offset to -1 and it is
>> still backfilling data.
>> Is there any documentation on the start offsets that I could take a look
>> at ?
>> Or even documentation on kafka.api.OffsetRequest.LatestTime() ?
>>
>>
>> On Wed, May 28, 2014 at 1:01 PM, Raphael Hsieh wrote:
>>
>>> would the Trident version of this be
>>> tridentKafkaConfig.startOffsetTime ?
>>>
>>>
>>> On Wed, May 28, 2014 at 12:23 PM, Danijel Schiavuzzi <
>>> dani...@schiavuzzi.com> wrote:
>>>
>>>> By default, the Kafka spout resumes consuming where it last left off.
>>>> That offset is stored in ZooKeeper.
>>>>
>>>> You can set forceStartOffset to -2 to start consuming from the earliest
>>>> available offset, or -1 to start consuming from the latest available 
>>>> offset.
>>>>
>>>>
>>>> On Wednesday, May 28, 2014, Raphael Hsieh  wrote:
>>>>
>>>>> If I don't tell trident to start consuming data from the beginning of
>>>>> the Kafka stream, where does it start from?
>>>>> If I were to do:
>>>>>tridentKafkaConfig.forceFromStart = true;
>>>>> Then it will tell the spout to start consuming from the start of the
>>>>> stream. If that is not set, then where does it start consuming from? and
>>>>> How might I go about telling it to start consuming from the very end of 
>>>>> the
>>>>> stream?
>>>>>
>>>>> If a disaster were to happen and all my hosts died, when I start my
>>>>> cluster back up, it might start consuming from where it left off. I would
>>>>> rather manually process that old data, and have my storm system start
>>>>> processing the live data.
>>>>>
>>>>> Thanks
>>>>> --
>>>>> Raphael Hsieh
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Danijel Schiavuzzi
>>>>
>>>> E: dani...@schiavuzzi.com
>>>> W: www.schiavuzzi.com
>>>> T: +385989035562
>>>> Skype: danijels7
>>>>
>>>
>>>
>>>
>>> --
>>> Raphael Hsieh
>>> Amazon.com
>>> Software Development Engineer I
>>> (978) 764-9014
>>>
>>>
>>>
>>>
>>
>>
>>
>> --
>> Raphael Hsieh
>> Amazon.com
>> Software Development Engineer I
>> (978) 764-9014
>>
>>
>>
>>
>
>
>
> --
> Regards,
>
> Riyaz
>
>


-- 
Raphael Hsieh


Re: logging 'failed' tuples in mastercoord-bg0

2014-05-28 Thread Raphael Hsieh
thanks for your help Taylor,
Do you think you could point me to some documentation on where I can set
those values in Storm Trident? I can't seem to find anything or figure that
out.
Thanks


On Wed, May 28, 2014 at 2:58 PM, P. Taylor Goetz  wrote:

> "Silent replays" are usually a sign of batches timing out.
>
> By default storm uses a timeout value of thirty seconds.
>
> Try upping that value and setting TOPOLOGY_SPOUT_MAX_PENDING to a very low
> value like 1. In trident that controls how many batches can be in-flight at
> a time.
>
> -Taylor
>
> > On May 28, 2014, at 5:24 PM, Raphael Hsieh  wrote:
> >
> > Is there a way to add logging to the master coordinator?
> > For some reason my spout is failing batches and not giving me any error
> messages.
> > The log files on the host don't provide any error messages and I'm not
> sure where the logic for this resides in Storm-Trident.
> >
> > Is there a particular string other than 'failed' that I can grep for?
> >
> > Thanks
> > --
> > Raphael Hsieh
> >
> >
>



-- 
Raphael Hsieh


Re: Trident, ZooKeeper and Kafka

2014-05-28 Thread Raphael Hsieh
This is still not working for me. I've set the offset to -1 and it is still
backfilling data.
Is there any documentation on the start offsets that I could take a look at
?
Or even documentation on kafka.api.OffsetRequest.LatestTime() ?


On Wed, May 28, 2014 at 1:01 PM, Raphael Hsieh  wrote:

> would the Trident version of this be
> tridentKafkaConfig.startOffsetTime ?
>
>
> On Wed, May 28, 2014 at 12:23 PM, Danijel Schiavuzzi <
> dani...@schiavuzzi.com> wrote:
>
>> By default, the Kafka spout resumes consuming where it last left off.
>> That offset is stored in ZooKeeper.
>>
>> You can set forceStartOffset to -2 to start consuming from the earliest
>> available offset, or -1 to start consuming from the latest available offset.
>>
>>
>> On Wednesday, May 28, 2014, Raphael Hsieh  wrote:
>>
>>> If I don't tell trident to start consuming data from the beginning of
>>> the Kafka stream, where does it start from?
>>> If I were to do:
>>>tridentKafkaConfig.forceFromStart = true;
>>> Then it will tell the spout to start consuming from the start of the
>>> stream. If that is not set, then where does it start consuming from? and
>>> How might I go about telling it to start consuming from the very end of the
>>> stream?
>>>
>>> If a disaster were to happen and all my hosts died, when I start my
>>> cluster back up, it might start consuming from where it left off. I would
>>> rather manually process that old data, and have my storm system start
>>> processing the live data.
>>>
>>> Thanks
>>> --
>>> Raphael Hsieh
>>>
>>>
>>>
>>>
>>
>>
>> --
>> Danijel Schiavuzzi
>>
>> E: dani...@schiavuzzi.com
>> W: www.schiavuzzi.com
>> T: +385989035562
>> Skype: danijels7
>>
>
>
>
> --
> Raphael Hsieh
> Amazon.com
> Software Development Engineer I
> (978) 764-9014
>
>
>
>



-- 
Raphael Hsieh
Amazon.com
Software Development Engineer I
(978) 764-9014


logging 'failed' tuples in mastercoord-bg0

2014-05-28 Thread Raphael Hsieh
Is there a way to add logging to the master coordinator?
For some reason my spout is failing batches and not giving me any error
messages.
The log files on the host don't provide any error messages and I'm not sure
where the logic for this resides in Storm-Trident.

Is there a particular string other than 'failed' that I can grep for?

Thanks
-- 
Raphael Hsieh


Specify a different ZooKeeper Cluster

2014-05-28 Thread Raphael Hsieh
When using Storm Trident with a kafka spout, I need to give it a ZooKeeper
cluster and the broker path so that it can know where to find the Kafka
hosts and get data. How would I go about specifying a different ZooKeeper
cluster for storm to use?


Thanks
-- 
Raphael Hsieh


Re: Trident, ZooKeeper and Kafka

2014-05-28 Thread Raphael Hsieh
would the Trident version of this be
tridentKafkaConfig.startOffsetTime ?


On Wed, May 28, 2014 at 12:23 PM, Danijel Schiavuzzi  wrote:

> By default, the Kafka spout resumes consuming where it last left off. That
> offset is stored in ZooKeeper.
>
> You can set forceStartOffset to -2 to start consuming from the earliest
> available offset, or -1 to start consuming from the latest available offset.
>
>
> On Wednesday, May 28, 2014, Raphael Hsieh  wrote:
>
>> If I don't tell trident to start consuming data from the beginning of the
>> Kafka stream, where does it start from?
>> If I were to do:
>>tridentKafkaConfig.forceFromStart = true;
>> Then it will tell the spout to start consuming from the start of the
>> stream. If that is not set, then where does it start consuming from? and
>> How might I go about telling it to start consuming from the very end of the
>> stream?
>>
>> If a disaster were to happen and all my hosts died, when I start my
>> cluster back up, it might start consuming from where it left off. I would
>> rather manually process that old data, and have my storm system start
>> processing the live data.
>>
>> Thanks
>> --
>> Raphael Hsieh
>>
>>
>>
>>
>
>
> --
> Danijel Schiavuzzi
>
> E: dani...@schiavuzzi.com
> W: www.schiavuzzi.com
> T: +385989035562
> Skype: danijels7
>



-- 
Raphael Hsieh
Amazon.com
Software Development Engineer I
(978) 764-9014


Trident, ZooKeeper and Kafka

2014-05-28 Thread Raphael Hsieh
If I don't tell trident to start consuming data from the beginning of the
Kafka stream, where does it start from?
If I were to do:
   tridentKafkaConfig.forceFromStart = true;
Then it will tell the spout to start consuming from the start of the
stream. If that is not set, then where does it start consuming from? and
How might I go about telling it to start consuming from the very end of the
stream?

If a disaster were to happen and all my hosts died, when I start my cluster
back up, it might start consuming from where it left off. I would rather
manually process that old data, and have my storm system start processing
the live data.

Thanks
-- 
Raphael Hsieh


Position in Kafka Stream

2014-05-27 Thread Raphael Hsieh
Is there a way to tell where in the kafka stream my topology is starting
from?
>From my understanding Storm will use zookeeper in order to tell its place
in the Kafka stream. Where can I find metrics on this ?
How can I see how large the stream is? What how much data is sitting in the
stream and what the most recent/oldest position is?

Thanks

-- 
Raphael Hsieh


Batches per second

2014-05-27 Thread Raphael Hsieh
Is there a way to tell how many batches per second are being processed by
my topology?

Thanks
-- 
Raphael Hsieh


PersistentAggregate

2014-05-27 Thread Raphael Hsieh
>From my understanding, PersistentAggregate should first aggregate the
batch, then once the batch has finished aggregating, send it to whatever
datastore is specified.

Is this the case ? Or will the Persistent Aggregate use the external
datastore in order to do the aggregations ?

-- 
Raphael Hsieh


Re: $mastercoord-bg0

2014-05-22 Thread Raphael Hsieh
Also is there any way to look at logs for this master coordinator spout ?
I'd like to be able to see what exactly is failing. Currently my
mastercoord-bg0 spout says that there are 20 things failing, but there are
no error messages displayed. and I don't see anything when I ssh into the
box and look at logs there.


On Wed, May 21, 2014 at 4:47 PM, Raphael Hsieh  wrote:

> what does the $mastercoord-bg0 represent ? It seems to have much less work
> that my bolt spout. Also how can I set the parallelism of this master spout
> ?
>
> when my other bolts are emitting and acking millions of tuples, this
> master spout only emits/acks a couple dozen.
>
> Also currently something in my code is broken and the mastercoord spout is
> failing 20, and nothing is being passed through.
>
> Is this mastercoord-bg0 spout acking batches ? How might I go about
> troubleshooting this to figure out why it is broken ?
>
> Thanks
> --
> Raphael Hsieh
>
>



-- 
Raphael Hsieh
Amazon.com
Software Development Engineer I
(978) 764-9014


$mastercoord-bg0

2014-05-21 Thread Raphael Hsieh
what does the $mastercoord-bg0 represent ? It seems to have much less work
that my bolt spout. Also how can I set the parallelism of this master spout
?

when my other bolts are emitting and acking millions of tuples, this master
spout only emits/acks a couple dozen.

Also currently something in my code is broken and the mastercoord spout is
failing 20, and nothing is being passed through.

Is this mastercoord-bg0 spout acking batches ? How might I go about
troubleshooting this to figure out why it is broken ?

Thanks
-- 
Raphael Hsieh


Nimbus UI fields

2014-05-20 Thread Raphael Hsieh
Hi I'm confused as to what each field in the StormUI represents and how to
use the information.
[image: Inline image 1]

The bolts I have above are formed from trident. This is what operations I
believe each bolt represents
b-0 : .each(function) -> .each(filter)
b-1 : .aggregate
--split--
b-2 : .persistentAggregate
b-3 : .persistentAggregate

What does it mean for the first two bolts to emit and transfer 0 ?
What is the Capacity field ? What does that represent ?
Does Execute refer to the tuples acked and successfully processed?

Thanks
-- 
Raphael Hsieh


Are batches processed sequentially or in parallel?

2014-05-13 Thread Raphael Hsieh
In Storm Trident are batches processed sequentially?  Or are they all
processed in parallel?
If they are processed in parallel how does it handle multiple writers to a
datastore ?

I can understand this making sense for append-only implementations, but for
cases where we are updating values in a database, how does it make sure
that values are written, and in the database before another thread reads it
and tries to update it with different data?

Thanks
-- 
Raphael Hsieh


Re: Memcached

2014-04-28 Thread Raphael Hsieh
Also,
How fault tolerant is using the MemcachedState? If a host/worker node dies,
does its in memory map get lost forever ? Or is this map distributed among
worker nodes ?


On Mon, Apr 28, 2014 at 3:57 PM, Raphael Hsieh  wrote:

> How would one pull data from a Memcached in a separate process ?
>
> I see the examples for how to put data into memcached, but what is the
> purpose of doing so ?
>
> How might I have a different process pulling data from the memcache every
> X seconds, and pushing it to a different external datastore ?
>
> --
> Raphael Hsieh
>
>
>
>



-- 
Raphael Hsieh


Memcached

2014-04-28 Thread Raphael Hsieh
How would one pull data from a Memcached in a separate process ?

I see the examples for how to put data into memcached, but what is the
purpose of doing so ?

How might I have a different process pulling data from the memcache every X
seconds, and pushing it to a different external datastore ?

-- 
Raphael Hsieh


Multiple writers to datastore ?

2014-04-28 Thread Raphael Hsieh
How does Storm handle multiple writers writing data to an external
datastore ? If there are multiple workers working to process the data, each
of them must also do the "write" to the database. If there are multiple
writers to the database that are storing data, how does it make sure that
the writers don't overwrite each others data ?

I understand that it will read from the datastore first and check the
batchTxId. However, if two writers look at a previous batchID, lets pretend
it's 26.
Now batch 27 and 28 want to write a continued aggregate data. 27 increments
on 26. 28 increments on 26. 27 writes first, then 28. However, 28 did not
aggregate on top of 27's aggregate, and hence the final data in the
datastore is wrong.

How does storm handle this ?

-- 
Raphael Hsieh


Re: Flush aggregated data every X seconds

2014-04-24 Thread Raphael Hsieh
Thank you very much for your quick reply Corey,
Unfortunately I don't believe the TickSpout exists within Trident yet. I
have seen the threads discussing the implementation of a sliding window and
I've read Michael Noll's
blog<http://www.michael-noll.com/blog/2013/01/18/implementing-real-time-trending-topics-in-storm/>about
it as well. I don't need a sliding window, as much as just multiple
window chunks if that makes sense haha.

What I'm thinking about resorting to is increasing my Batch size to be much
larger than the throughput of the spout, then at the end of my topology,
doing an aggregation such that everything aggregates to a single tuple, and
running a ".each" on that single tuple with the function just sleeping for
X time.

My theory is that this should allow the stream to back up enough such that
the next batch takes (roughly) the entire next X time amount of data.

Can anyone validate that this technique will work ?


On Thu, Apr 24, 2014 at 8:36 PM, Corey Nolet  wrote:

> Raphael, in your case it sounds like a "TickSpout" could be useful where
> you emit a tuple every n time slices and then sleep until needing to emit
> another. I'm not sure how that'd work in a Trident aggregator, however.
>
> I'm not sure if this is something Nathan or the community would approve
> of, but I've been writing my own framework for doing sliding/tumbling
> windows in Storm that allow aggregations and triggering/eviction by count,
> time, and other policies like "when the time difference between the first
> item and the last item in a window is less than x". The bolts could easily
> be ripped out for doing your own aggregations.
>
> It's located here: https://github.com/calrissian/flowbox
>
> It's very much so in the proof of concept stage. My other requirement (and
> the reason I cared so much to implement this) was that the rules need to be
> dynamic and the topology needs to be static as to make the best use of
> resources while users are defining that they need.
>
>
>
> On Thu, Apr 24, 2014 at 11:27 PM, Raphael Hsieh wrote:
>
>> Is there a way in Storm Trident to aggregate data over a certain time
>> period and have it flush the data out to an external data store after that
>> time period is up ?
>>
>> Trident does not have the functionality of Tick Tuples yet, so I cannot
>> use that. Everything I've been researching leads to believe that this is
>> not possible in Storm/Trident, however this seems to me to be a fairly
>> standard use case of any streaming map reduce library.
>>
>> For example,
>> If I am receiving a stream of integers
>> I want to aggregate all those integers over a period of 1 second, then
>> persist it into an external datastore.
>>
>> This is not in order to count how much it will add up to over X amount of
>> time, rather I would like to minimize the read/write/updates I do to said
>> datastore.
>>
>> There are many ways in order to reduce these variables, however all of
>> them force me to modify my schema in ways that are unpleasant. Also, I
>> would rather not have my final external datastore be my scratch space,
>> where my program is reading/updating/writing and checking to make sure that
>> the transaction id's line up.
>> Instead I want that scratch work to be done separately, then the final
>> result stored into a final database that no longer needs to do constant
>> updating.
>>
>> Thanks
>> --
>> Raphael Hsieh
>>
>>
>>
>>
>
>


-- 
Raphael Hsieh


Flush aggregated data every X seconds

2014-04-24 Thread Raphael Hsieh
Is there a way in Storm Trident to aggregate data over a certain time
period and have it flush the data out to an external data store after that
time period is up ?

Trident does not have the functionality of Tick Tuples yet, so I cannot use
that. Everything I've been researching leads to believe that this is not
possible in Storm/Trident, however this seems to me to be a fairly standard
use case of any streaming map reduce library.

For example,
If I am receiving a stream of integers
I want to aggregate all those integers over a period of 1 second, then
persist it into an external datastore.

This is not in order to count how much it will add up to over X amount of
time, rather I would like to minimize the read/write/updates I do to said
datastore.

There are many ways in order to reduce these variables, however all of them
force me to modify my schema in ways that are unpleasant. Also, I would
rather not have my final external datastore be my scratch space, where my
program is reading/updating/writing and checking to make sure that the
transaction id's line up.
Instead I want that scratch work to be done separately, then the final
result stored into a final database that no longer needs to do constant
updating.

Thanks
-- 
Raphael Hsieh


Re: PersistentAggregate across batches

2014-04-23 Thread Raphael Hsieh
Are you saying that there is no purpose to do a "groupBy" followed by a
PersistentAggregate ? The documentation states: "If you run aggregators on
a grouped stream, the aggregation will be run within each group instead of
against the whole batch."


On Wed, Apr 23, 2014 at 2:17 AM, Danijel Schiavuzzi
wrote:

>
> When I do something like
>>
>> Stream
>> .groupBy(new Fields("a")
>> .persistentAggregate(new MyStateFactory(), new Fields("a", "b", "c",
>> "d"), new MyAggregator(), new Fields("resultMap"))
>>
>> What happens (as described 
>> here<https://github.com/nathanmarz/storm/wiki/Trident-API-Overview>)
>> is the stream is split into different groups based on field "a":
>>
>
> This is not true. The stream will be grouped based on all the keys you
> specified in persistentAggregate, i.e. new Fields("a", "b", "c", "d"). This
> will produce as many GroupedStreams as there are distinct groupings among
> those keys. Those groupings will then be combined/reduced with the existing
> values gathered from the IBackingMap#multiGet(), and Trident will then call
> multiPut()) to persist the updated aggregations back to the underlying data
> store.
>
> Take a look at the Storm sources under the package "storm.trident.*". A
> good starting point for understanding Trident would be the Java class
> "storm.trident.state.map.TransactionalMap" (or OpaqueMap or
> NonTransactionalMap).
>
> Danijel Schiavuzzi
> www.schiavuzzi.com
>
>
>
>
>
>> [image: Grouping]
>> like so.
>> then, PartitionPersist will run a MultiGet on the fields ("a", "b", "c",
>> "d"), since that is what we are using as our keys. So in each of the
>> "groups" described above, we would have not only the raw tuples resulting
>> from the grouping, but also a single tuple with the result of the previous
>> aggregation.
>> These would all be run through the aggregator, which should be able to
>> handle aggregating with this semi-complete aggregation (The "Reduce"
>> function in a ReducerAggregator, or the "Combine" function in the
>> CombinerAggregator).
>>
>> How does it know not to treat the previous aggregation as a single new
>> tuple? (hence not running the "init" function ? For example if I was
>> aggregating a count, having that previous value (say 60) as a single extra
>> tuple would only increment the count by 1, instead of 60.
>> would I then just need to implement my own "init" function such that it
>> has checks for the tuple  value, whether it is a raw new tuple, vs a
>> previous tuple aggregation?
>>
>>
>> On Tue, Apr 22, 2014 at 9:59 AM, Cody A. Ray wrote:
>>
>>> My understanding is that the process is
>>> 1. multiGet from the IBackingMap  is called and returns a value for each
>>> key (or null if not present)
>>> 2. For each key, the old value from the get and new values in the batch
>>> are fed through the aggregator to produce one value per key
>>> 3. This value is then stored back into the state through the multiPut in
>>> the IBackingMap.
>>>
>>> If you just want to use nathanmarz's trident-memcached integration, you
>>> don't have to write an IBackingMap yourself. The MemcachedState itself
>>> implements IBackingMap to do the get and put. To use it, just decide what
>>> you want to groupBy (these become your keys) and how you want it aggregated
>>> (this is the reduced/combiner implementation). You don't have to write the
>>> memcache connection logic or the aggregation logic yourself unless you want
>>> to change how it's aggregated or stored.
>>> I've not used the trident-memcached state in particular, but in general
>>> this would look something like this:
>>>
>>> topology.newStream("spout1", spout1)
>>>   .groupBy(new Fields("mykeyfield"))
>>>   .persistentAggregate(MemcachedState.opaque(servers), new
>>> Fields("myvaluefield"), new Sum(), new Fields("sum"))
>>>
>>> (Sorry for any code errors; writing in my phone)
>>>
>>> Does that answer your question?
>>>
>>> -Cody
>>> On Apr 22, 2014 10:32 AM, "Raphael Hsieh"  wrote:
>>>
>>>> The Reducer/Combiner Aggregators hold logic in order to aggregate
>>>> across an entire batch, however it does not have 

Re: PersistentAggregate across batches

2014-04-22 Thread Raphael Hsieh
the previous link didn't work,
https://github.com/nathanmarz/storm/wiki/Trident-API-Overview#operations-on-grouped-streams


On Tue, Apr 22, 2014 at 10:30 AM, Raphael Hsieh wrote:

> Yes partially,
> The part I was missing was getting old values and feeding it through the
> aggregator again, which still doesn't quite make sense to me.
>
> I am using an external datastore, so I am not able to use the vanilla
> MemcachedState, hence why I am implementing my own version of the
> IBackingMap.
>
> So let me try and explain what I am understanding.
> When I do something like
>
> Stream
> .groupBy(new Fields("a")
> .persistentAggregate(new MyStateFactory(), new Fields("a", "b", "c",
> "d"), new MyAggregator(), new Fields("resultMap"))
>
> What happens (as described 
> here<https://github.com/nathanmarz/storm/wiki/Trident-API-Overview>)
> is the stream is split into different groups based on field "a":
> [image: Grouping]
> like so.
> then, PartitionPersist will run a MultiGet on the fields ("a", "b", "c",
> "d"), since that is what we are using as our keys. So in each of the
> "groups" described above, we would have not only the raw tuples resulting
> from the grouping, but also a single tuple with the result of the previous
> aggregation.
> These would all be run through the aggregator, which should be able to
> handle aggregating with this semi-complete aggregation (The "Reduce"
> function in a ReducerAggregator, or the "Combine" function in the
> CombinerAggregator).
>
> How does it know not to treat the previous aggregation as a single new
> tuple? (hence not running the "init" function ? For example if I was
> aggregating a count, having that previous value (say 60) as a single extra
> tuple would only increment the count by 1, instead of 60.
> would I then just need to implement my own "init" function such that it
> has checks for the tuple  value, whether it is a raw new tuple, vs a
> previous tuple aggregation?
>
>
> On Tue, Apr 22, 2014 at 9:59 AM, Cody A. Ray  wrote:
>
>> My understanding is that the process is
>> 1. multiGet from the IBackingMap  is called and returns a value for each
>> key (or null if not present)
>> 2. For each key, the old value from the get and new values in the batch
>> are fed through the aggregator to produce one value per key
>> 3. This value is then stored back into the state through the multiPut in
>> the IBackingMap.
>>
>> If you just want to use nathanmarz's trident-memcached integration, you
>> don't have to write an IBackingMap yourself. The MemcachedState itself
>> implements IBackingMap to do the get and put. To use it, just decide what
>> you want to groupBy (these become your keys) and how you want it aggregated
>> (this is the reduced/combiner implementation). You don't have to write the
>> memcache connection logic or the aggregation logic yourself unless you want
>> to change how it's aggregated or stored.
>> I've not used the trident-memcached state in particular, but in general
>> this would look something like this:
>>
>> topology.newStream("spout1", spout1)
>>   .groupBy(new Fields("mykeyfield"))
>>   .persistentAggregate(MemcachedState.opaque(servers), new
>> Fields("myvaluefield"), new Sum(), new Fields("sum"))
>>
>> (Sorry for any code errors; writing in my phone)
>>
>> Does that answer your question?
>>
>> -Cody
>> On Apr 22, 2014 10:32 AM, "Raphael Hsieh"  wrote:
>>
>>> The Reducer/Combiner Aggregators hold logic in order to aggregate across
>>> an entire batch, however it does not have the logic to aggregate between
>>> batches.
>>> In order for this to happen, it must read the previous TransactionId and
>>> value from the datastore, determine whether this incoming data is in the
>>> right sequence, then increment the value within the datastore.
>>>
>>> I am asking about this second part. Where the logic goes in order to
>>> read previous data from the datastore, and add it to the new incoming
>>> aggregate data.
>>>
>>>
>>> On Mon, Apr 21, 2014 at 6:58 PM, Cody A. Ray wrote:
>>>
>>>> Its the ReducerAggregate/CombinerAggregator's job to implement this
>>>> logic. Look at Count and Sum that are built-in to Trident. You can also
>>>> implement your own aggregator.
>>>>
>>>> -Cody
>>>>
>>>>

Re: PersistentAggregate across batches

2014-04-22 Thread Raphael Hsieh
Yes partially,
The part I was missing was getting old values and feeding it through the
aggregator again, which still doesn't quite make sense to me.

I am using an external datastore, so I am not able to use the vanilla
MemcachedState, hence why I am implementing my own version of the
IBackingMap.

So let me try and explain what I am understanding.
When I do something like

Stream
.groupBy(new Fields("a")
.persistentAggregate(new MyStateFactory(), new Fields("a", "b", "c",
"d"), new MyAggregator(), new Fields("resultMap"))

What happens (as described
here<https://github.com/nathanmarz/storm/wiki/Trident-API-Overview>)
is the stream is split into different groups based on field "a":
[image: Grouping]
like so.
then, PartitionPersist will run a MultiGet on the fields ("a", "b", "c",
"d"), since that is what we are using as our keys. So in each of the
"groups" described above, we would have not only the raw tuples resulting
from the grouping, but also a single tuple with the result of the previous
aggregation.
These would all be run through the aggregator, which should be able to
handle aggregating with this semi-complete aggregation (The "Reduce"
function in a ReducerAggregator, or the "Combine" function in the
CombinerAggregator).

How does it know not to treat the previous aggregation as a single new
tuple? (hence not running the "init" function ? For example if I was
aggregating a count, having that previous value (say 60) as a single extra
tuple would only increment the count by 1, instead of 60.
would I then just need to implement my own "init" function such that it has
checks for the tuple  value, whether it is a raw new tuple, vs a previous
tuple aggregation?


On Tue, Apr 22, 2014 at 9:59 AM, Cody A. Ray  wrote:

> My understanding is that the process is
> 1. multiGet from the IBackingMap  is called and returns a value for each
> key (or null if not present)
> 2. For each key, the old value from the get and new values in the batch
> are fed through the aggregator to produce one value per key
> 3. This value is then stored back into the state through the multiPut in
> the IBackingMap.
>
> If you just want to use nathanmarz's trident-memcached integration, you
> don't have to write an IBackingMap yourself. The MemcachedState itself
> implements IBackingMap to do the get and put. To use it, just decide what
> you want to groupBy (these become your keys) and how you want it aggregated
> (this is the reduced/combiner implementation). You don't have to write the
> memcache connection logic or the aggregation logic yourself unless you want
> to change how it's aggregated or stored.
> I've not used the trident-memcached state in particular, but in general
> this would look something like this:
>
> topology.newStream("spout1", spout1)
>   .groupBy(new Fields("mykeyfield"))
>   .persistentAggregate(MemcachedState.opaque(servers), new
> Fields("myvaluefield"), new Sum(), new Fields("sum"))
>
> (Sorry for any code errors; writing in my phone)
>
> Does that answer your question?
>
> -Cody
> On Apr 22, 2014 10:32 AM, "Raphael Hsieh"  wrote:
>
>> The Reducer/Combiner Aggregators hold logic in order to aggregate across
>> an entire batch, however it does not have the logic to aggregate between
>> batches.
>> In order for this to happen, it must read the previous TransactionId and
>> value from the datastore, determine whether this incoming data is in the
>> right sequence, then increment the value within the datastore.
>>
>> I am asking about this second part. Where the logic goes in order to read
>> previous data from the datastore, and add it to the new incoming aggregate
>> data.
>>
>>
>> On Mon, Apr 21, 2014 at 6:58 PM, Cody A. Ray wrote:
>>
>>> Its the ReducerAggregate/CombinerAggregator's job to implement this
>>> logic. Look at Count and Sum that are built-in to Trident. You can also
>>> implement your own aggregator.
>>>
>>> -Cody
>>>
>>>
>>> On Mon, Apr 21, 2014 at 2:57 PM, Raphael Hsieh wrote:
>>>
>>>> If I am using an opaque spout and doing a persistent aggregate to a
>>>> MemcachedState, how is it aggregating/incrementing the values across all
>>>> batches ?
>>>>
>>>> I'm wanting to implement an IBackingMap so that I can use an external
>>>> datastore. However, I'm unsure where the logic goes that will read the
>>>> previous data, and aggregate it with the new data.
>>>>
>>>> From what I've been told, I need to implement the IBackingMap and the
>>>> multiput/multiget functions. So logically, I think it makes sense that I
>>>> would put this update logiv in the multiput function. However, the
>>>> OpaqueMap class already has multiGet logic in order to check the TxId of
>>>> the batch.
>>>> Instead of using an OpaqueMap class, should I just make my own
>>>> implementation ?
>>>>
>>>> Thanks
>>>> --
>>>> Raphael Hsieh
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>>
>>> --
>>> Cody A. Ray, LEED AP
>>> cody.a@gmail.com
>>> 215.501.7891
>>>
>>
>>
>>
>> --
>> Raphael Hsieh
>>
>>
>>
>>
>


-- 
Raphael Hsieh


Re: PersistentAggregate across batches

2014-04-22 Thread Raphael Hsieh
The Reducer/Combiner Aggregators hold logic in order to aggregate across an
entire batch, however it does not have the logic to aggregate between
batches.
In order for this to happen, it must read the previous TransactionId and
value from the datastore, determine whether this incoming data is in the
right sequence, then increment the value within the datastore.

I am asking about this second part. Where the logic goes in order to read
previous data from the datastore, and add it to the new incoming aggregate
data.


On Mon, Apr 21, 2014 at 6:58 PM, Cody A. Ray  wrote:

> Its the ReducerAggregate/CombinerAggregator's job to implement this logic.
> Look at Count and Sum that are built-in to Trident. You can also implement
> your own aggregator.
>
> -Cody
>
>
> On Mon, Apr 21, 2014 at 2:57 PM, Raphael Hsieh wrote:
>
>> If I am using an opaque spout and doing a persistent aggregate to a
>> MemcachedState, how is it aggregating/incrementing the values across all
>> batches ?
>>
>> I'm wanting to implement an IBackingMap so that I can use an external
>> datastore. However, I'm unsure where the logic goes that will read the
>> previous data, and aggregate it with the new data.
>>
>> From what I've been told, I need to implement the IBackingMap and the
>> multiput/multiget functions. So logically, I think it makes sense that I
>> would put this update logiv in the multiput function. However, the
>> OpaqueMap class already has multiGet logic in order to check the TxId of
>> the batch.
>> Instead of using an OpaqueMap class, should I just make my own
>> implementation ?
>>
>> Thanks
>> --
>> Raphael Hsieh
>>
>>
>>
>>
>
>
>
> --
> Cody A. Ray, LEED AP
> cody.a@gmail.com
> 215.501.7891
>



-- 
Raphael Hsieh
Amazon.com
Software Development Engineer I
(978) 764-9014


PersistentAggregate across batches

2014-04-21 Thread Raphael Hsieh
If I am using an opaque spout and doing a persistent aggregate to a
MemcachedState, how is it aggregating/incrementing the values across all
batches ?

I'm wanting to implement an IBackingMap so that I can use an external
datastore. However, I'm unsure where the logic goes that will read the
previous data, and aggregate it with the new data.

>From what I've been told, I need to implement the IBackingMap and the
multiput/multiget functions. So logically, I think it makes sense that I
would put this update logiv in the multiput function. However, the
OpaqueMap class already has multiGet logic in order to check the TxId of
the batch.
Instead of using an OpaqueMap class, should I just make my own
implementation ?

Thanks
-- 
Raphael Hsieh


In memory state, drop data after X time

2014-04-17 Thread Raphael Hsieh
At the end of the documentation here
https://github.com/nathanmarz/storm/wiki/Trident-tutorial#state it mentions:

 State's are not required to hold onto state forever. For example, you
could have an in-memory State implementation that only keeps the last X
hours of data available and drops anything older. Take a look at the
implementation of the Memcached
integration<https://github.com/nathanmarz/trident-memcached/blob/master/src/jvm/trident/memcached/MemcachedState.java>
for
an example State implementation.

How would I go about doing  this ?
In Trident I haven't found an easy way to store a state for a period of
time, then flush it out to an external datastore.

-- 
Raphael Hsieh


Re: How to think of batches vs partitions

2014-04-17 Thread Raphael Hsieh
oh ok,
So if I want the guarantee of single message processing when using an
external datastore, I need to implement the methods described
here<https://github.com/nathanmarz/storm/wiki/Trident-state#opaque-transactional-spouts>
 (
https://github.com/nathanmarz/storm/wiki/Trident-state#opaque-transactional-spouts)
myself?



On Thu, Apr 17, 2014 at 11:00 AM, Nathan Marz  wrote:

> aggregate / partitionAggregate are only aggregations within the current
> batch, the persistent equivalents are aggregations across all batches.
>
> The logic for querying states, updating them, and keeping track of batch
> ids happens in the states themselves. For example, look at the multiUpdate
> method in TransactionalMap:
> https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/storm/trident/state/map/TransactionalMap.java
>
> Things are structured so that TransactionalMap delegates to an
> "IBackingMap" which handles the actual persistence. IBackingMap just has
> multiGet and multiPut methods. An implementation for a database (like
> Cassandra, Riak, HBase, etc.) just has to implement IBackingMap.
>
>
> On Thu, Apr 17, 2014 at 10:15 AM, Raphael Hsieh wrote:
>
>> I guess I'm just confused as to when "multiGet" and "multiPut" are called
>> when using an implementation of the IBackingMap
>>
>>
>> On Thu, Apr 17, 2014 at 8:33 AM, Raphael Hsieh wrote:
>>
>>> So from my understanding, this is how the different spout types
>>> guarantee single message processing. For example, an opaque transactional
>>> spout will look at transaction id's in order to guarantee in order batch
>>> processing, making sure that the txid's are processed in order, and using
>>> the previous and current values to fix any mixups.
>>>
>>> When doing an aggregation does it aggregation across all batches ? If
>>> so, how does this happen ? Will it query the datastore for the current
>>> value, then add the current aggregate value to the stored value in order to
>>> create the global aggregate ? Where does this logic happen ? I can't seem
>>> to find where this happens in the persistentAggregate or even
>>> partitionPersist...
>>>
>>>
>>> On Wed, Apr 16, 2014 at 8:30 PM, Nathan Marz wrote:
>>>
>>>> Batches are processed sequentially, but each batch is partitioned (and
>>>> therefore processed in parallel). As a batch is processed, it can be
>>>> repartitioned an arbitrary number of times throughout the Trident topology.
>>>>
>>>>
>>>> On Wed, Apr 16, 2014 at 4:28 PM, Raphael Hsieh wrote:
>>>>
>>>>> Hi I found myself being confused on how to think of Storm/Trident
>>>>> processing batches.
>>>>> Are batches processed sequentially, but split into multiple partitions
>>>>> that are spread throughout the worker nodes ?
>>>>>
>>>>> Or are batches processed in parrallel and spread among worker nodes to
>>>>> be split into partitions within each host running on multiple threads ?
>>>>>
>>>>> Thanks!
>>>>> --
>>>>> Raphael Hsieh
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Twitter: @nathanmarz
>>>> http://nathanmarz.com
>>>>
>>>
>>>
>>>
>>> --
>>> Raphael Hsieh
>>>
>>>
>>>
>>
>>
>>
>> --
>> Raphael Hsieh
>>
>>
>>
>>
>
>
>
> --
> Twitter: @nathanmarz
> http://nathanmarz.com
>



-- 
Raphael Hsieh


Re: How to think of batches vs partitions

2014-04-17 Thread Raphael Hsieh
I guess I'm just confused as to when "multiGet" and "multiPut" are called
when using an implementation of the IBackingMap


On Thu, Apr 17, 2014 at 8:33 AM, Raphael Hsieh  wrote:

> So from my understanding, this is how the different spout types guarantee
> single message processing. For example, an opaque transactional spout will
> look at transaction id's in order to guarantee in order batch processing,
> making sure that the txid's are processed in order, and using the previous
> and current values to fix any mixups.
>
> When doing an aggregation does it aggregation across all batches ? If so,
> how does this happen ? Will it query the datastore for the current value,
> then add the current aggregate value to the stored value in order to create
> the global aggregate ? Where does this logic happen ? I can't seem to find
> where this happens in the persistentAggregate or even partitionPersist...
>
>
> On Wed, Apr 16, 2014 at 8:30 PM, Nathan Marz wrote:
>
>> Batches are processed sequentially, but each batch is partitioned (and
>> therefore processed in parallel). As a batch is processed, it can be
>> repartitioned an arbitrary number of times throughout the Trident topology.
>>
>>
>> On Wed, Apr 16, 2014 at 4:28 PM, Raphael Hsieh wrote:
>>
>>> Hi I found myself being confused on how to think of Storm/Trident
>>> processing batches.
>>> Are batches processed sequentially, but split into multiple partitions
>>> that are spread throughout the worker nodes ?
>>>
>>> Or are batches processed in parrallel and spread among worker nodes to
>>> be split into partitions within each host running on multiple threads ?
>>>
>>> Thanks!
>>> --
>>> Raphael Hsieh
>>>
>>>
>>>
>>>
>>
>>
>>
>> --
>> Twitter: @nathanmarz
>> http://nathanmarz.com
>>
>
>
>
> --
> Raphael Hsieh
>
>
>



-- 
Raphael Hsieh
Amazon.com
Software Development Engineer I
(978) 764-9014


Re: How to think of batches vs partitions

2014-04-17 Thread Raphael Hsieh
So from my understanding, this is how the different spout types guarantee
single message processing. For example, an opaque transactional spout will
look at transaction id's in order to guarantee in order batch processing,
making sure that the txid's are processed in order, and using the previous
and current values to fix any mixups.

When doing an aggregation does it aggregation across all batches ? If so,
how does this happen ? Will it query the datastore for the current value,
then add the current aggregate value to the stored value in order to create
the global aggregate ? Where does this logic happen ? I can't seem to find
where this happens in the persistentAggregate or even partitionPersist...


On Wed, Apr 16, 2014 at 8:30 PM, Nathan Marz  wrote:

> Batches are processed sequentially, but each batch is partitioned (and
> therefore processed in parallel). As a batch is processed, it can be
> repartitioned an arbitrary number of times throughout the Trident topology.
>
>
> On Wed, Apr 16, 2014 at 4:28 PM, Raphael Hsieh wrote:
>
>> Hi I found myself being confused on how to think of Storm/Trident
>> processing batches.
>> Are batches processed sequentially, but split into multiple partitions
>> that are spread throughout the worker nodes ?
>>
>> Or are batches processed in parrallel and spread among worker nodes to be
>> split into partitions within each host running on multiple threads ?
>>
>> Thanks!
>> --
>> Raphael Hsieh
>>
>>
>>
>>
>
>
>
> --
> Twitter: @nathanmarz
> http://nathanmarz.com
>



-- 
Raphael Hsieh


How to think of batches vs partitions

2014-04-16 Thread Raphael Hsieh
Hi I found myself being confused on how to think of Storm/Trident
processing batches.
Are batches processed sequentially, but split into multiple partitions that
are spread throughout the worker nodes ?

Or are batches processed in parrallel and spread among worker nodes to be
split into partitions within each host running on multiple threads ?

Thanks!
-- 
Raphael Hsieh


Re: setting trident transaction window

2014-04-15 Thread Raphael Hsieh
The documentation for trident aggregations mentions that aggregations are
done globally across all batches.
https://github.com/nathanmarz/storm/wiki/Trident-API-Overview#aggregation-operations
Is this incorrect ?

When the documentation says "all batches" it does mean all batches across
all worker nodes right ?


On Thu, Apr 10, 2014 at 3:21 PM, Jason Jackson  wrote:

>
>
>
> stream.aggregate(...) computes aggregations per batch
> stream.groupBy(..).aggregate(..) computes aggregations per key per batch.
>
> stream.persistentAggregate(..) computes partial aggregations per batch, DB
> is updated with partial aggregates per batch
> stream.groupBy(..).persistentAggregate(..) computes partial aggregations
> per batch per key, DB is updated with partial aggregates per batch.
>
> you can also think of persistent aggregate as computing DB deltas, and
> only sending the deltas to the DB.
>
> Whether increasing the batch size reduces the total amount of writes to
> persistent store is not strictly true, but in practice it does. E.g.
> imagine our stream has 5 unique keys and you do a
> groupBy.persistentAggregate, and the throughput is 1B/sec. If you have
> batches of 5B items, then after 5 seconds you sent ~10 key/vals updates to
> the DB, if you have batches of 0.5B items, then after 10 seconds you've
> sent ~100 key/vals to the DB.
>
> Some of the tradeoffs here are that if a batch fails you have to do more
> recomputation. And greater chance for a batch to fail as there's more
> tuples per batch. This should though, definitely give it a shot.
>
>
>
>
>
>
> On Thu, Apr 10, 2014 at 1:33 PM, Raphael Hsieh wrote:
>
>> Thanks for your reply Jason,
>> So what I'm hearing is that there is no nice way of doing temporal
>> flushes to a database. My main reason for wanting to do this is because I
>> want to use DynamoDB for my external datastore, but it gets expensive. I
>> would like to limit my reads and writes as much as I can so that the cost
>> does not add up.
>>
>> Increasing the batch size seems like the best solution so far, however
>> from my understanding doing an aggregation in storm/trident does a global
>> aggregation, so do batch sizes really make a difference ? Or is my
>> understanding of the aggregation process wrong. I am had though that
>> aggregating is global among all partitions (and storm nodes).
>>
>>
>> On Thu, Apr 10, 2014 at 1:58 AM, Jason Jackson wrote:
>>
>>> trident doesn't expose tick tuples in it's API yet, even though it was
>>> added in storm a while ago.
>>>
>>> There's two problems I think you're talking about (1) windowed
>>> aggregations (2) reducing DB load.
>>>
>>> For (1)
>>> Trident can do aggregations at the batch level but this doesn't really
>>> help you for doing aggregations over a range of timestamps. The way to do
>>> that is you would include the timebucket in your key when
>>> persistentAggregate. E.g. your key could be "apple-2014-01-02-12:40:64"
>>> for minutely buckets. Then when serving the data you would query all keys
>>> across the time range. Certain databases such as cassandra can make this
>>> query very fast.
>>>
>>> (2) You'll need to implement your own IBackingMap persistent store
>>> plugin and pass it to persistentAggregate. Look other examples such as the
>>> trident-memcache for how to implement these. So for your custom persistent
>>> store plugin  you could use a combination of in-memory map and DB. 4/5
>>> batches would just commit their state updates to the in-memory map. The 5th
>>> batch would commit to the in-memory map, and then flush that map to the
>>> database. You could even launch a separate thread to do the flushing,
>>> incase it takes a while. This design however is not going to give you
>>> exactly once semantics. As if you loose the in-memory map because your
>>> worker died for example, then when it comes back online it will still
>>> resume from the last successful batch (not the last flushed batch).
>>>
>>> To retain exactly once semantics you could also make your batch sizes
>>> much larger, by default they read 1MB from each kafka partition (see
>>> bufferSize and fetchSize configuration option in Kafka Spout). IF you
>>> increased batch size, and you're doing some kind of key based aggregations,
>>> then this would reduce the total number of writes you would need to do your
>>> persistent storage.
>>>
>>> Trident could definitely be improved here, so y

Re: setting trident transaction window

2014-04-10 Thread Raphael Hsieh
Thanks for your reply Jason,
So what I'm hearing is that there is no nice way of doing temporal flushes
to a database. My main reason for wanting to do this is because I want to
use DynamoDB for my external datastore, but it gets expensive. I would like
to limit my reads and writes as much as I can so that the cost does not add
up.

Increasing the batch size seems like the best solution so far, however from
my understanding doing an aggregation in storm/trident does a global
aggregation, so do batch sizes really make a difference ? Or is my
understanding of the aggregation process wrong. I am had though that
aggregating is global among all partitions (and storm nodes).


On Thu, Apr 10, 2014 at 1:58 AM, Jason Jackson  wrote:

> trident doesn't expose tick tuples in it's API yet, even though it was
> added in storm a while ago.
>
> There's two problems I think you're talking about (1) windowed
> aggregations (2) reducing DB load.
>
> For (1)
> Trident can do aggregations at the batch level but this doesn't really
> help you for doing aggregations over a range of timestamps. The way to do
> that is you would include the timebucket in your key when
> persistentAggregate. E.g. your key could be "apple-2014-01-02-12:40:64"
> for minutely buckets. Then when serving the data you would query all keys
> across the time range. Certain databases such as cassandra can make this
> query very fast.
>
> (2) You'll need to implement your own IBackingMap persistent store plugin
> and pass it to persistentAggregate. Look other examples such as the
> trident-memcache for how to implement these. So for your custom persistent
> store plugin  you could use a combination of in-memory map and DB. 4/5
> batches would just commit their state updates to the in-memory map. The 5th
> batch would commit to the in-memory map, and then flush that map to the
> database. You could even launch a separate thread to do the flushing,
> incase it takes a while. This design however is not going to give you
> exactly once semantics. As if you loose the in-memory map because your
> worker died for example, then when it comes back online it will still
> resume from the last successful batch (not the last flushed batch).
>
> To retain exactly once semantics you could also make your batch sizes much
> larger, by default they read 1MB from each kafka partition (see bufferSize
> and fetchSize configuration option in Kafka Spout). IF you increased batch
> size, and you're doing some kind of key based aggregations, then this would
> reduce the total number of writes you would need to do your persistent
> storage.
>
> Trident could definitely be improved here, so your mileage may vary.
>
>
> On Wed, Apr 9, 2014 at 9:15 AM, Raphael Hsieh wrote:
>
>> I have been struggling to figure out how to get trident to aggregate data
>> over a certain time period then flush the data to an external data store.
>> The reasoning behind this is to reduce the number of reads and writes
>> sent to the database.
>>
>> I've seen that Storm allows for tick tuples to be inserted into the
>> stream, however I can't figure out how to do this with trident. I had
>> thought that this functionality was added with Storm version 0.8.0 ?
>> Is this the case ?
>>
>> One thing I had tried was to create a new stream that emitted a tuple
>> once every X time period, then I tried to merge this stream into my actual
>> data stream. However, doing this would result in a non transactional stream
>> which would be no good. Also it didn't work, as the resulting stream only
>> consisted of tuples from my clock stream.
>>
>> Can anybody help me figure out how to have Trident aggregate data over a
>> certain time frame, flush it out to an external datastore, then rinse and
>> repeat ?
>>
>> there are some blogs out there regarding how to use a sliding window in
>> storm, however I just want sequential windows in Trident.
>>
>> Thanks
>>
>> --
>> Raphael Hsieh
>>
>>
>>
>>
>
>


-- 
Raphael Hsieh
Amazon.com
Software Development Engineer I
(978) 764-9014


setting trident transaction window

2014-04-09 Thread Raphael Hsieh
I have been struggling to figure out how to get trident to aggregate data
over a certain time period then flush the data to an external data store.
The reasoning behind this is to reduce the number of reads and writes sent
to the database.

I've seen that Storm allows for tick tuples to be inserted into the stream,
however I can't figure out how to do this with trident. I had thought that
this functionality was added with Storm version 0.8.0 ?
Is this the case ?

One thing I had tried was to create a new stream that emitted a tuple once
every X time period, then I tried to merge this stream into my actual data
stream. However, doing this would result in a non transactional stream
which would be no good. Also it didn't work, as the resulting stream only
consisted of tuples from my clock stream.

Can anybody help me figure out how to have Trident aggregate data over a
certain time frame, flush it out to an external datastore, then rinse and
repeat ?

there are some blogs out there regarding how to use a sliding window in
storm, however I just want sequential windows in Trident.

Thanks

-- 
Raphael Hsieh