Of course with advancing the ts for the next table scan...

> void nextTuple() {
>   long time = System.currentTimeMillis();
>   if(ts < time) {
>      // do table scan and emit all tuples)
>      ts = ts == Long.MIN_VALUE ? time + 60000 : ts + 60000;
>   }
>   // else do nothing
> }


On 04/11/2016 02:08 PM, Matthias J. Sax wrote:
> If I understand you correctly (I never used Redis), you put the data
> into a table. In Spout.next() we do a table scan, remember the current
> time stamp and to the next table scan a minute later.
> 
> Just make sure to return and not block in Spout.next() while waiting for
> the next scan!
> 
> Something like:
> 
> private long ts = Long.MIN_VALUE;
> 
> void nextTuple() {
>   long time = System.currentTimeMillis();
>   if(ts < time) {
>      // do table scan and emit all tuples)
>      ts = time;
>   }
>   // else do nothing
> }
> 
> 
> 
> 
> On 04/11/2016 02:04 PM, Daniela Stoiber wrote:
>> Hi Matthias
>>
>> Thank you very much for your reply.
>>
>> How can I ensure that the spout fetches the values every minute?
>>
>> Thank you in advance.
>>
>> Regards,
>> Daniela
>>
>>
>> 2016-04-11 11:10 GMT+02:00 Matthias J. Sax <mj...@apache.org>:
>>
>>> Hi,
>>>
>>> @Arun: thanks for correcting my (it's hard to keep up to date with the
>>> latest changes those days... :))
>>>
>>> @Daniela:
>>>  - processing time means that the windows are aligned to the wall-clock
>>> time of your machine when you process your data; this implies some
>>> non-determinism and not repeatability if you process historical data
>>>  - event time means that the windows are aligned to timestamps that are
>>> encoded in your tuples (eg, as an attribute); this allows for
>>> deterministic processing as the result of a computation is independent
>>> on the time you perform the computation
>>>
>>> For your Redis idea:
>>>
>>> You can certainly do this. A Spout fetches all data from Redis each
>>> minute and forwards it to the window bolt. Ie, you do not fetch directly
>>> within you agg-bolt.
>>>
>>> However, if you use a custom aggregate function, you might be able to do
>>> this without Redis in between and de-duplicate in you aggregate function
>>> directly. When the window closes, you store the value for each device in
>>> a hash-map (key: Device-ID). During processing, for each value in the
>>> window. If a second value comes in, you overwrite it. As long as you do
>>> not have too many devices (ie, the window and hash-map does fix in
>>> memory) this should be the simplest approach.
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 04/10/2016 10:46 PM, Daniela Stoiber wrote:
>>>> HI Arun,
>>>>
>>>> thank you for your reply.
>>>>
>>>> But my problem is that I need to add up the values over all devices, but
>>> I am only allowed to use the most recent value of each device. A value is
>>> valid as long as there is no new value for this device available.
>>>>
>>>> So if I receive a message with device A with value 1, value 1 should be
>>> used for the sum as long as the value of A does not change.
>>>> When I receive a new value for A, the new value should be used for the
>>> sum and the old one should be replaced.
>>>>
>>>> Therefore I thought to use Redis to store this information:
>>>> Device                Value
>>>> A             1
>>>> B             10
>>>> C             4
>>>>
>>>> Then I would like to pull every minute the most recent value of each
>>> device to build the sum. Therefore I would like to use the windowed bolt.
>>> But I am not sure if it is possible to pull data out of Redis within a
>>> windowed bolt.
>>>>
>>>> Thank you in advance.
>>>>
>>>> Regards,
>>>> Daniela
>>>>
>>>>
>>>> -----Ursprüngliche Nachricht-----
>>>> Von: Arun Iyer [mailto:ai...@hortonworks.com] Im Auftrag von Arun
>>> Mahadevan
>>>> Gesendet: Sonntag, 10. April 2016 20:55
>>>> An: dev@storm.apache.org
>>>> Betreff: Re: AW: Use only latest values
>>>>
>>>> Hi Matthias,
>>>>
>>>> WindowedBolt does support event time. In trident its is not yet exposed.
>>>>
>>>> Hi Daniela,
>>>>
>>>> You could solve your use cases in different ways. One would be to have a
>>> WindowedBolt with a 1 min tumbling window, do your custom aggregation (e.g.
>>> sum) every time the window tumbles and emit the results to another bolt
>>> where you update the count in Redis. Most of your state saving could also
>>> be automated by defining a Stateful bolt that would periodically checkpoint
>>> your state (sum per device). You could also club both windowing and state
>>> into a StatefulWindowedBolt implementation. You can evaluate the options
>>> and decide based on your use cases.
>>>>
>>>> Take a look at the sample topologies (SlidingWindowTopology,
>>> SlidingTupleTsTopology, StatefulTopology, StatefulWindowingTopology) in
>>> storm-starter and the docs for more info.
>>>>
>>>> https://github.com/apache/storm/blob/master/docs/Windowing.md
>>>>
>>>> https://github.com/apache/storm/blob/master/docs/State-checkpointing.md
>>>>
>>>>
>>>> -Arun
>>>>
>>>>
>>>>
>>>>
>>>> On 4/10/16, 4:30 PM, "Matthias J. Sax" <mj...@apache.org> wrote:
>>>>
>>>>> A tumbling window (ie, non-overlapping window) is the right approach (a
>>>>> sliding window is overlapping).
>>>>>
>>>>> The window goes into your aggregation bolt (windowing and aggregation
>>>>> goes hand in hand, ie, when the window gets closed, the aggregation is
>>>>> triggered and the window content is handed over to the aggregation
>>>>> function).
>>>>>
>>>>> Be aware that Storm (currently) only supports processing time window
>>>>> (an no event time windows).
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>> On 04/10/2016 09:56 AM, Daniela Stoiber wrote:
>>>>>> Hi,
>>>>>>
>>>>>> thank you for your reply.
>>>>>>
>>>>>> How can I ensure that the latest values are pulled from Redis the sum
>>>>>> is updated every minute? Do I need a sliding window with an interval
>>>>>> of 1 minute? Where would this sliding window be located in my topology?
>>>>>>
>>>>>> Thank you in advance.
>>>>>>
>>>>>> Regards,
>>>>>> Daniela
>>>>>>
>>>>>> -----Ursprüngliche Nachricht-----
>>>>>> Von: Matthias J. Sax [mailto:mj...@apache.org]
>>>>>> Gesendet: Samstag, 9. April 2016 12:13
>>>>>> An: dev@storm.apache.org
>>>>>> Betreff: Re: Use only latest values
>>>>>>
>>>>>> Sounds reasonable.
>>>>>>
>>>>>>
>>>>>> On 04/09/2016 08:34 AM, Daniela Stoiber wrote:
>>>>>>> Hi,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I would like to cache values and to use only the latest "valid"
>>>>>>> values to build a sum.
>>>>>>>
>>>>>>> In more detail, I receive values from devices periodically. I would
>>>>>>> like to add up all the valid values each minute. But not every
>>>>>>> device sends a new value every minute. And as long as there is no
>>>>>>> new value the old one should be used for the sum. As soon as I
>>>>>>> receive a new value from a device I would like to overwrite the old
>>>>>>> value and to use the new one for the sum. Would that be possible
>>>>>>> with the combination of
>>>>>> Storm and Redis?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> My idea was to use the following:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> - Kafka Spout
>>>>>>>
>>>>>>> - Storm Bolt for storing the tuples in Redis and for overwriting the
>>>>>>> values as soon as a new one is delivered
>>>>>>>
>>>>>>> - Storm Bolt for reading the latest tuples from Redis
>>>>>>>
>>>>>>> - Storm Bolt for grouping (I would like to group the devices per
>>>>>>> region)
>>>>>>>
>>>>>>> - Storm Bolt for aggregation
>>>>>>>
>>>>>>> - Storm Bolt for storing the results again in Redis
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Thank you in advance.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Regards,
>>>>>>>
>>>>>>> Daniela
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to