Of course with advancing the ts for the next table scan... > void nextTuple() { > long time = System.currentTimeMillis(); > if(ts < time) { > // do table scan and emit all tuples) > ts = ts == Long.MIN_VALUE ? time + 60000 : ts + 60000; > } > // else do nothing > }
On 04/11/2016 02:08 PM, Matthias J. Sax wrote: > If I understand you correctly (I never used Redis), you put the data > into a table. In Spout.next() we do a table scan, remember the current > time stamp and to the next table scan a minute later. > > Just make sure to return and not block in Spout.next() while waiting for > the next scan! > > Something like: > > private long ts = Long.MIN_VALUE; > > void nextTuple() { > long time = System.currentTimeMillis(); > if(ts < time) { > // do table scan and emit all tuples) > ts = time; > } > // else do nothing > } > > > > > On 04/11/2016 02:04 PM, Daniela Stoiber wrote: >> Hi Matthias >> >> Thank you very much for your reply. >> >> How can I ensure that the spout fetches the values every minute? >> >> Thank you in advance. >> >> Regards, >> Daniela >> >> >> 2016-04-11 11:10 GMT+02:00 Matthias J. Sax <mj...@apache.org>: >> >>> Hi, >>> >>> @Arun: thanks for correcting my (it's hard to keep up to date with the >>> latest changes those days... :)) >>> >>> @Daniela: >>> - processing time means that the windows are aligned to the wall-clock >>> time of your machine when you process your data; this implies some >>> non-determinism and not repeatability if you process historical data >>> - event time means that the windows are aligned to timestamps that are >>> encoded in your tuples (eg, as an attribute); this allows for >>> deterministic processing as the result of a computation is independent >>> on the time you perform the computation >>> >>> For your Redis idea: >>> >>> You can certainly do this. A Spout fetches all data from Redis each >>> minute and forwards it to the window bolt. Ie, you do not fetch directly >>> within you agg-bolt. >>> >>> However, if you use a custom aggregate function, you might be able to do >>> this without Redis in between and de-duplicate in you aggregate function >>> directly. When the window closes, you store the value for each device in >>> a hash-map (key: Device-ID). During processing, for each value in the >>> window. If a second value comes in, you overwrite it. As long as you do >>> not have too many devices (ie, the window and hash-map does fix in >>> memory) this should be the simplest approach. >>> >>> >>> -Matthias >>> >>> >>> On 04/10/2016 10:46 PM, Daniela Stoiber wrote: >>>> HI Arun, >>>> >>>> thank you for your reply. >>>> >>>> But my problem is that I need to add up the values over all devices, but >>> I am only allowed to use the most recent value of each device. A value is >>> valid as long as there is no new value for this device available. >>>> >>>> So if I receive a message with device A with value 1, value 1 should be >>> used for the sum as long as the value of A does not change. >>>> When I receive a new value for A, the new value should be used for the >>> sum and the old one should be replaced. >>>> >>>> Therefore I thought to use Redis to store this information: >>>> Device Value >>>> A 1 >>>> B 10 >>>> C 4 >>>> >>>> Then I would like to pull every minute the most recent value of each >>> device to build the sum. Therefore I would like to use the windowed bolt. >>> But I am not sure if it is possible to pull data out of Redis within a >>> windowed bolt. >>>> >>>> Thank you in advance. >>>> >>>> Regards, >>>> Daniela >>>> >>>> >>>> -----Ursprüngliche Nachricht----- >>>> Von: Arun Iyer [mailto:ai...@hortonworks.com] Im Auftrag von Arun >>> Mahadevan >>>> Gesendet: Sonntag, 10. April 2016 20:55 >>>> An: dev@storm.apache.org >>>> Betreff: Re: AW: Use only latest values >>>> >>>> Hi Matthias, >>>> >>>> WindowedBolt does support event time. In trident its is not yet exposed. >>>> >>>> Hi Daniela, >>>> >>>> You could solve your use cases in different ways. One would be to have a >>> WindowedBolt with a 1 min tumbling window, do your custom aggregation (e.g. >>> sum) every time the window tumbles and emit the results to another bolt >>> where you update the count in Redis. Most of your state saving could also >>> be automated by defining a Stateful bolt that would periodically checkpoint >>> your state (sum per device). You could also club both windowing and state >>> into a StatefulWindowedBolt implementation. You can evaluate the options >>> and decide based on your use cases. >>>> >>>> Take a look at the sample topologies (SlidingWindowTopology, >>> SlidingTupleTsTopology, StatefulTopology, StatefulWindowingTopology) in >>> storm-starter and the docs for more info. >>>> >>>> https://github.com/apache/storm/blob/master/docs/Windowing.md >>>> >>>> https://github.com/apache/storm/blob/master/docs/State-checkpointing.md >>>> >>>> >>>> -Arun >>>> >>>> >>>> >>>> >>>> On 4/10/16, 4:30 PM, "Matthias J. Sax" <mj...@apache.org> wrote: >>>> >>>>> A tumbling window (ie, non-overlapping window) is the right approach (a >>>>> sliding window is overlapping). >>>>> >>>>> The window goes into your aggregation bolt (windowing and aggregation >>>>> goes hand in hand, ie, when the window gets closed, the aggregation is >>>>> triggered and the window content is handed over to the aggregation >>>>> function). >>>>> >>>>> Be aware that Storm (currently) only supports processing time window >>>>> (an no event time windows). >>>>> >>>>> -Matthias >>>>> >>>>> >>>>> On 04/10/2016 09:56 AM, Daniela Stoiber wrote: >>>>>> Hi, >>>>>> >>>>>> thank you for your reply. >>>>>> >>>>>> How can I ensure that the latest values are pulled from Redis the sum >>>>>> is updated every minute? Do I need a sliding window with an interval >>>>>> of 1 minute? Where would this sliding window be located in my topology? >>>>>> >>>>>> Thank you in advance. >>>>>> >>>>>> Regards, >>>>>> Daniela >>>>>> >>>>>> -----Ursprüngliche Nachricht----- >>>>>> Von: Matthias J. Sax [mailto:mj...@apache.org] >>>>>> Gesendet: Samstag, 9. April 2016 12:13 >>>>>> An: dev@storm.apache.org >>>>>> Betreff: Re: Use only latest values >>>>>> >>>>>> Sounds reasonable. >>>>>> >>>>>> >>>>>> On 04/09/2016 08:34 AM, Daniela Stoiber wrote: >>>>>>> Hi, >>>>>>> >>>>>>> >>>>>>> >>>>>>> I would like to cache values and to use only the latest "valid" >>>>>>> values to build a sum. >>>>>>> >>>>>>> In more detail, I receive values from devices periodically. I would >>>>>>> like to add up all the valid values each minute. But not every >>>>>>> device sends a new value every minute. And as long as there is no >>>>>>> new value the old one should be used for the sum. As soon as I >>>>>>> receive a new value from a device I would like to overwrite the old >>>>>>> value and to use the new one for the sum. Would that be possible >>>>>>> with the combination of >>>>>> Storm and Redis? >>>>>>> >>>>>>> >>>>>>> >>>>>>> My idea was to use the following: >>>>>>> >>>>>>> >>>>>>> >>>>>>> - Kafka Spout >>>>>>> >>>>>>> - Storm Bolt for storing the tuples in Redis and for overwriting the >>>>>>> values as soon as a new one is delivered >>>>>>> >>>>>>> - Storm Bolt for reading the latest tuples from Redis >>>>>>> >>>>>>> - Storm Bolt for grouping (I would like to group the devices per >>>>>>> region) >>>>>>> >>>>>>> - Storm Bolt for aggregation >>>>>>> >>>>>>> - Storm Bolt for storing the results again in Redis >>>>>>> >>>>>>> >>>>>>> >>>>>>> Thank you in advance. >>>>>>> >>>>>>> >>>>>>> >>>>>>> Regards, >>>>>>> >>>>>>> Daniela >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> >>> >>> >> >
signature.asc
Description: OpenPGP digital signature