If I understand you correctly (I never used Redis), you put the data
into a table. In Spout.next() we do a table scan, remember the current
time stamp and to the next table scan a minute later.

Just make sure to return and not block in Spout.next() while waiting for
the next scan!

Something like:

private long ts = Long.MIN_VALUE;

void nextTuple() {
  long time = System.currentTimeMillis();
  if(ts < time) {
     // do table scan and emit all tuples)
     ts = time;
  }
  // else do nothing
}




On 04/11/2016 02:04 PM, Daniela Stoiber wrote:
> Hi Matthias
> 
> Thank you very much for your reply.
> 
> How can I ensure that the spout fetches the values every minute?
> 
> Thank you in advance.
> 
> Regards,
> Daniela
> 
> 
> 2016-04-11 11:10 GMT+02:00 Matthias J. Sax <mj...@apache.org>:
> 
>> Hi,
>>
>> @Arun: thanks for correcting my (it's hard to keep up to date with the
>> latest changes those days... :))
>>
>> @Daniela:
>>  - processing time means that the windows are aligned to the wall-clock
>> time of your machine when you process your data; this implies some
>> non-determinism and not repeatability if you process historical data
>>  - event time means that the windows are aligned to timestamps that are
>> encoded in your tuples (eg, as an attribute); this allows for
>> deterministic processing as the result of a computation is independent
>> on the time you perform the computation
>>
>> For your Redis idea:
>>
>> You can certainly do this. A Spout fetches all data from Redis each
>> minute and forwards it to the window bolt. Ie, you do not fetch directly
>> within you agg-bolt.
>>
>> However, if you use a custom aggregate function, you might be able to do
>> this without Redis in between and de-duplicate in you aggregate function
>> directly. When the window closes, you store the value for each device in
>> a hash-map (key: Device-ID). During processing, for each value in the
>> window. If a second value comes in, you overwrite it. As long as you do
>> not have too many devices (ie, the window and hash-map does fix in
>> memory) this should be the simplest approach.
>>
>>
>> -Matthias
>>
>>
>> On 04/10/2016 10:46 PM, Daniela Stoiber wrote:
>>> HI Arun,
>>>
>>> thank you for your reply.
>>>
>>> But my problem is that I need to add up the values over all devices, but
>> I am only allowed to use the most recent value of each device. A value is
>> valid as long as there is no new value for this device available.
>>>
>>> So if I receive a message with device A with value 1, value 1 should be
>> used for the sum as long as the value of A does not change.
>>> When I receive a new value for A, the new value should be used for the
>> sum and the old one should be replaced.
>>>
>>> Therefore I thought to use Redis to store this information:
>>> Device                Value
>>> A             1
>>> B             10
>>> C             4
>>>
>>> Then I would like to pull every minute the most recent value of each
>> device to build the sum. Therefore I would like to use the windowed bolt.
>> But I am not sure if it is possible to pull data out of Redis within a
>> windowed bolt.
>>>
>>> Thank you in advance.
>>>
>>> Regards,
>>> Daniela
>>>
>>>
>>> -----Ursprüngliche Nachricht-----
>>> Von: Arun Iyer [mailto:ai...@hortonworks.com] Im Auftrag von Arun
>> Mahadevan
>>> Gesendet: Sonntag, 10. April 2016 20:55
>>> An: dev@storm.apache.org
>>> Betreff: Re: AW: Use only latest values
>>>
>>> Hi Matthias,
>>>
>>> WindowedBolt does support event time. In trident its is not yet exposed.
>>>
>>> Hi Daniela,
>>>
>>> You could solve your use cases in different ways. One would be to have a
>> WindowedBolt with a 1 min tumbling window, do your custom aggregation (e.g.
>> sum) every time the window tumbles and emit the results to another bolt
>> where you update the count in Redis. Most of your state saving could also
>> be automated by defining a Stateful bolt that would periodically checkpoint
>> your state (sum per device). You could also club both windowing and state
>> into a StatefulWindowedBolt implementation. You can evaluate the options
>> and decide based on your use cases.
>>>
>>> Take a look at the sample topologies (SlidingWindowTopology,
>> SlidingTupleTsTopology, StatefulTopology, StatefulWindowingTopology) in
>> storm-starter and the docs for more info.
>>>
>>> https://github.com/apache/storm/blob/master/docs/Windowing.md
>>>
>>> https://github.com/apache/storm/blob/master/docs/State-checkpointing.md
>>>
>>>
>>> -Arun
>>>
>>>
>>>
>>>
>>> On 4/10/16, 4:30 PM, "Matthias J. Sax" <mj...@apache.org> wrote:
>>>
>>>> A tumbling window (ie, non-overlapping window) is the right approach (a
>>>> sliding window is overlapping).
>>>>
>>>> The window goes into your aggregation bolt (windowing and aggregation
>>>> goes hand in hand, ie, when the window gets closed, the aggregation is
>>>> triggered and the window content is handed over to the aggregation
>>>> function).
>>>>
>>>> Be aware that Storm (currently) only supports processing time window
>>>> (an no event time windows).
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 04/10/2016 09:56 AM, Daniela Stoiber wrote:
>>>>> Hi,
>>>>>
>>>>> thank you for your reply.
>>>>>
>>>>> How can I ensure that the latest values are pulled from Redis the sum
>>>>> is updated every minute? Do I need a sliding window with an interval
>>>>> of 1 minute? Where would this sliding window be located in my topology?
>>>>>
>>>>> Thank you in advance.
>>>>>
>>>>> Regards,
>>>>> Daniela
>>>>>
>>>>> -----Ursprüngliche Nachricht-----
>>>>> Von: Matthias J. Sax [mailto:mj...@apache.org]
>>>>> Gesendet: Samstag, 9. April 2016 12:13
>>>>> An: dev@storm.apache.org
>>>>> Betreff: Re: Use only latest values
>>>>>
>>>>> Sounds reasonable.
>>>>>
>>>>>
>>>>> On 04/09/2016 08:34 AM, Daniela Stoiber wrote:
>>>>>> Hi,
>>>>>>
>>>>>>
>>>>>>
>>>>>> I would like to cache values and to use only the latest "valid"
>>>>>> values to build a sum.
>>>>>>
>>>>>> In more detail, I receive values from devices periodically. I would
>>>>>> like to add up all the valid values each minute. But not every
>>>>>> device sends a new value every minute. And as long as there is no
>>>>>> new value the old one should be used for the sum. As soon as I
>>>>>> receive a new value from a device I would like to overwrite the old
>>>>>> value and to use the new one for the sum. Would that be possible
>>>>>> with the combination of
>>>>> Storm and Redis?
>>>>>>
>>>>>>
>>>>>>
>>>>>> My idea was to use the following:
>>>>>>
>>>>>>
>>>>>>
>>>>>> - Kafka Spout
>>>>>>
>>>>>> - Storm Bolt for storing the tuples in Redis and for overwriting the
>>>>>> values as soon as a new one is delivered
>>>>>>
>>>>>> - Storm Bolt for reading the latest tuples from Redis
>>>>>>
>>>>>> - Storm Bolt for grouping (I would like to group the devices per
>>>>>> region)
>>>>>>
>>>>>> - Storm Bolt for aggregation
>>>>>>
>>>>>> - Storm Bolt for storing the results again in Redis
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thank you in advance.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Daniela
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to