If I understand you correctly (I never used Redis), you put the data into a table. In Spout.next() we do a table scan, remember the current time stamp and to the next table scan a minute later.
Just make sure to return and not block in Spout.next() while waiting for the next scan! Something like: private long ts = Long.MIN_VALUE; void nextTuple() { long time = System.currentTimeMillis(); if(ts < time) { // do table scan and emit all tuples) ts = time; } // else do nothing } On 04/11/2016 02:04 PM, Daniela Stoiber wrote: > Hi Matthias > > Thank you very much for your reply. > > How can I ensure that the spout fetches the values every minute? > > Thank you in advance. > > Regards, > Daniela > > > 2016-04-11 11:10 GMT+02:00 Matthias J. Sax <mj...@apache.org>: > >> Hi, >> >> @Arun: thanks for correcting my (it's hard to keep up to date with the >> latest changes those days... :)) >> >> @Daniela: >> - processing time means that the windows are aligned to the wall-clock >> time of your machine when you process your data; this implies some >> non-determinism and not repeatability if you process historical data >> - event time means that the windows are aligned to timestamps that are >> encoded in your tuples (eg, as an attribute); this allows for >> deterministic processing as the result of a computation is independent >> on the time you perform the computation >> >> For your Redis idea: >> >> You can certainly do this. A Spout fetches all data from Redis each >> minute and forwards it to the window bolt. Ie, you do not fetch directly >> within you agg-bolt. >> >> However, if you use a custom aggregate function, you might be able to do >> this without Redis in between and de-duplicate in you aggregate function >> directly. When the window closes, you store the value for each device in >> a hash-map (key: Device-ID). During processing, for each value in the >> window. If a second value comes in, you overwrite it. As long as you do >> not have too many devices (ie, the window and hash-map does fix in >> memory) this should be the simplest approach. >> >> >> -Matthias >> >> >> On 04/10/2016 10:46 PM, Daniela Stoiber wrote: >>> HI Arun, >>> >>> thank you for your reply. >>> >>> But my problem is that I need to add up the values over all devices, but >> I am only allowed to use the most recent value of each device. A value is >> valid as long as there is no new value for this device available. >>> >>> So if I receive a message with device A with value 1, value 1 should be >> used for the sum as long as the value of A does not change. >>> When I receive a new value for A, the new value should be used for the >> sum and the old one should be replaced. >>> >>> Therefore I thought to use Redis to store this information: >>> Device Value >>> A 1 >>> B 10 >>> C 4 >>> >>> Then I would like to pull every minute the most recent value of each >> device to build the sum. Therefore I would like to use the windowed bolt. >> But I am not sure if it is possible to pull data out of Redis within a >> windowed bolt. >>> >>> Thank you in advance. >>> >>> Regards, >>> Daniela >>> >>> >>> -----Ursprüngliche Nachricht----- >>> Von: Arun Iyer [mailto:ai...@hortonworks.com] Im Auftrag von Arun >> Mahadevan >>> Gesendet: Sonntag, 10. April 2016 20:55 >>> An: dev@storm.apache.org >>> Betreff: Re: AW: Use only latest values >>> >>> Hi Matthias, >>> >>> WindowedBolt does support event time. In trident its is not yet exposed. >>> >>> Hi Daniela, >>> >>> You could solve your use cases in different ways. One would be to have a >> WindowedBolt with a 1 min tumbling window, do your custom aggregation (e.g. >> sum) every time the window tumbles and emit the results to another bolt >> where you update the count in Redis. Most of your state saving could also >> be automated by defining a Stateful bolt that would periodically checkpoint >> your state (sum per device). You could also club both windowing and state >> into a StatefulWindowedBolt implementation. You can evaluate the options >> and decide based on your use cases. >>> >>> Take a look at the sample topologies (SlidingWindowTopology, >> SlidingTupleTsTopology, StatefulTopology, StatefulWindowingTopology) in >> storm-starter and the docs for more info. >>> >>> https://github.com/apache/storm/blob/master/docs/Windowing.md >>> >>> https://github.com/apache/storm/blob/master/docs/State-checkpointing.md >>> >>> >>> -Arun >>> >>> >>> >>> >>> On 4/10/16, 4:30 PM, "Matthias J. Sax" <mj...@apache.org> wrote: >>> >>>> A tumbling window (ie, non-overlapping window) is the right approach (a >>>> sliding window is overlapping). >>>> >>>> The window goes into your aggregation bolt (windowing and aggregation >>>> goes hand in hand, ie, when the window gets closed, the aggregation is >>>> triggered and the window content is handed over to the aggregation >>>> function). >>>> >>>> Be aware that Storm (currently) only supports processing time window >>>> (an no event time windows). >>>> >>>> -Matthias >>>> >>>> >>>> On 04/10/2016 09:56 AM, Daniela Stoiber wrote: >>>>> Hi, >>>>> >>>>> thank you for your reply. >>>>> >>>>> How can I ensure that the latest values are pulled from Redis the sum >>>>> is updated every minute? Do I need a sliding window with an interval >>>>> of 1 minute? Where would this sliding window be located in my topology? >>>>> >>>>> Thank you in advance. >>>>> >>>>> Regards, >>>>> Daniela >>>>> >>>>> -----Ursprüngliche Nachricht----- >>>>> Von: Matthias J. Sax [mailto:mj...@apache.org] >>>>> Gesendet: Samstag, 9. April 2016 12:13 >>>>> An: dev@storm.apache.org >>>>> Betreff: Re: Use only latest values >>>>> >>>>> Sounds reasonable. >>>>> >>>>> >>>>> On 04/09/2016 08:34 AM, Daniela Stoiber wrote: >>>>>> Hi, >>>>>> >>>>>> >>>>>> >>>>>> I would like to cache values and to use only the latest "valid" >>>>>> values to build a sum. >>>>>> >>>>>> In more detail, I receive values from devices periodically. I would >>>>>> like to add up all the valid values each minute. But not every >>>>>> device sends a new value every minute. And as long as there is no >>>>>> new value the old one should be used for the sum. As soon as I >>>>>> receive a new value from a device I would like to overwrite the old >>>>>> value and to use the new one for the sum. Would that be possible >>>>>> with the combination of >>>>> Storm and Redis? >>>>>> >>>>>> >>>>>> >>>>>> My idea was to use the following: >>>>>> >>>>>> >>>>>> >>>>>> - Kafka Spout >>>>>> >>>>>> - Storm Bolt for storing the tuples in Redis and for overwriting the >>>>>> values as soon as a new one is delivered >>>>>> >>>>>> - Storm Bolt for reading the latest tuples from Redis >>>>>> >>>>>> - Storm Bolt for grouping (I would like to group the devices per >>>>>> region) >>>>>> >>>>>> - Storm Bolt for aggregation >>>>>> >>>>>> - Storm Bolt for storing the results again in Redis >>>>>> >>>>>> >>>>>> >>>>>> Thank you in advance. >>>>>> >>>>>> >>>>>> >>>>>> Regards, >>>>>> >>>>>> Daniela >>>>>> >>>>>> >>>>> >>>>> >>>> >>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature