Hi,

@Arun: thanks for correcting my (it's hard to keep up to date with the
latest changes those days... :))

@Daniela:
 - processing time means that the windows are aligned to the wall-clock
time of your machine when you process your data; this implies some
non-determinism and not repeatability if you process historical data
 - event time means that the windows are aligned to timestamps that are
encoded in your tuples (eg, as an attribute); this allows for
deterministic processing as the result of a computation is independent
on the time you perform the computation

For your Redis idea:

You can certainly do this. A Spout fetches all data from Redis each
minute and forwards it to the window bolt. Ie, you do not fetch directly
within you agg-bolt.

However, if you use a custom aggregate function, you might be able to do
this without Redis in between and de-duplicate in you aggregate function
directly. When the window closes, you store the value for each device in
a hash-map (key: Device-ID). During processing, for each value in the
window. If a second value comes in, you overwrite it. As long as you do
not have too many devices (ie, the window and hash-map does fix in
memory) this should be the simplest approach.


-Matthias


On 04/10/2016 10:46 PM, Daniela Stoiber wrote:
> HI Arun,
> 
> thank you for your reply.
> 
> But my problem is that I need to add up the values over all devices, but I am 
> only allowed to use the most recent value of each device. A value is valid as 
> long as there is no new value for this device available. 
> 
> So if I receive a message with device A with value 1, value 1 should be used 
> for the sum as long as the value of A does not change. 
> When I receive a new value for A, the new value should be used for the sum 
> and the old one should be replaced.
> 
> Therefore I thought to use Redis to store this information:
> Device                Value
> A             1
> B             10
> C             4
> 
> Then I would like to pull every minute the most recent value of each device 
> to build the sum. Therefore I would like to use the windowed bolt. But I am 
> not sure if it is possible to pull data out of Redis within a windowed bolt.
> 
> Thank you in advance.
> 
> Regards,
> Daniela
> 
> 
> -----Ursprüngliche Nachricht-----
> Von: Arun Iyer [mailto:ai...@hortonworks.com] Im Auftrag von Arun Mahadevan
> Gesendet: Sonntag, 10. April 2016 20:55
> An: dev@storm.apache.org
> Betreff: Re: AW: Use only latest values
> 
> Hi Matthias, 
> 
> WindowedBolt does support event time. In trident its is not yet exposed.
> 
> Hi Daniela,
> 
> You could solve your use cases in different ways. One would be to have a 
> WindowedBolt with a 1 min tumbling window, do your custom aggregation (e.g. 
> sum) every time the window tumbles and emit the results to another bolt where 
> you update the count in Redis. Most of your state saving could also be 
> automated by defining a Stateful bolt that would periodically checkpoint your 
> state (sum per device). You could also club both windowing and state into a 
> StatefulWindowedBolt implementation. You can evaluate the options and decide 
> based on your use cases.
> 
> Take a look at the sample topologies (SlidingWindowTopology, 
> SlidingTupleTsTopology, StatefulTopology, StatefulWindowingTopology) in 
> storm-starter and the docs for more info.
> 
> https://github.com/apache/storm/blob/master/docs/Windowing.md
> 
> https://github.com/apache/storm/blob/master/docs/State-checkpointing.md
> 
> 
> -Arun
> 
> 
> 
> 
> On 4/10/16, 4:30 PM, "Matthias J. Sax" <mj...@apache.org> wrote:
> 
>> A tumbling window (ie, non-overlapping window) is the right approach (a 
>> sliding window is overlapping).
>>
>> The window goes into your aggregation bolt (windowing and aggregation 
>> goes hand in hand, ie, when the window gets closed, the aggregation is 
>> triggered and the window content is handed over to the aggregation 
>> function).
>>
>> Be aware that Storm (currently) only supports processing time window 
>> (an no event time windows).
>>
>> -Matthias
>>
>>
>> On 04/10/2016 09:56 AM, Daniela Stoiber wrote:
>>> Hi,
>>>
>>> thank you for your reply.
>>>
>>> How can I ensure that the latest values are pulled from Redis the sum 
>>> is updated every minute? Do I need a sliding window with an interval 
>>> of 1 minute? Where would this sliding window be located in my topology?
>>>
>>> Thank you in advance.
>>>
>>> Regards,
>>> Daniela
>>>
>>> -----Ursprüngliche Nachricht-----
>>> Von: Matthias J. Sax [mailto:mj...@apache.org]
>>> Gesendet: Samstag, 9. April 2016 12:13
>>> An: dev@storm.apache.org
>>> Betreff: Re: Use only latest values
>>>
>>> Sounds reasonable.
>>>
>>>
>>> On 04/09/2016 08:34 AM, Daniela Stoiber wrote:
>>>> Hi,
>>>>
>>>>  
>>>>
>>>> I would like to cache values and to use only the latest "valid" 
>>>> values to build a sum.
>>>>
>>>> In more detail, I receive values from devices periodically. I would 
>>>> like to add up all the valid values each minute. But not every 
>>>> device sends a new value every minute. And as long as there is no 
>>>> new value the old one should be used for the sum. As soon as I 
>>>> receive a new value from a device I would like to overwrite the old 
>>>> value and to use the new one for the sum. Would that be possible 
>>>> with the combination of
>>> Storm and Redis?
>>>>
>>>>  
>>>>
>>>> My idea was to use the following:
>>>>
>>>>  
>>>>
>>>> - Kafka Spout
>>>>
>>>> - Storm Bolt for storing the tuples in Redis and for overwriting the 
>>>> values as soon as a new one is delivered
>>>>
>>>> - Storm Bolt for reading the latest tuples from Redis
>>>>
>>>> - Storm Bolt for grouping (I would like to group the devices per
>>>> region)
>>>>
>>>> - Storm Bolt for aggregation
>>>>
>>>> - Storm Bolt for storing the results again in Redis
>>>>
>>>>  
>>>>
>>>> Thank you in advance.
>>>>
>>>>  
>>>>
>>>> Regards,
>>>>
>>>> Daniela
>>>>
>>>>
>>>
>>>
>>
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to