Glad it works.

If you want to use windows, what seems more natural and also allows you
to "expire old windows eventually" (with your current approach, you
never delete old window, and thus each window create a new entry in the
internal key-value store, thus, you store grows unbounded over time) it
should work out of the box, because windows automatically aligned:

https://docs.confluent.io/current/streams/developer-guide.html#tumbling-time-windows

> Tumbling time windows are aligned to the epoch, with the lower interval bound 
> being inclusive and the upper bound being exclusive. “Aligned to the epoch” 
> means that the first window starts at timestamp zero. For example, tumbling 
> windows with a size of 5000ms have predictable window boundaries 
> [0;5000),[5000;10000),... — and not [1000;6000),[6000;11000),... or even 
> something “random” like [1452;6452),[6452;11452),....


-Matthias



On 10/11/17 1:42 AM, RedShift wrote:
> Matthias
> 
> 
> Thanks, using grouping key of "deviceId + timestamp" with *aggregation*
> _instead of_ reducing solved it:
> 
> 
>   KGroupedStream<String, JsonObject> grouped = data.groupBy(
>       (k, v) ->
>       {
>           Date dt =
> Date.from(Instant.ofEpochSecond(v.get("data").asObject().get("tss").asLong()));
> 
>           return v.get("deviceId").asString() + dateFormat.format(dt);
>       }
>   );
> 
> 
>   KTable<String, Integer> aggregate = grouped.aggregate(
>       () -> 0,
>       (aggKey, value, aggr) -> aggr +
> value.get("data").asObject().get("load").asInt(),
>       Serdes.Integer()
>   );
> 
> I'm still trying to find out how windowing fits in. It sounds like a
> tumbling window, but a tumbling window is defined by its length. So you
> get information for the last hour that has passed, but that last hour is
> a window of NOW - 1 hour. How do I get a window to align to hours of the
> clock?
> 
> 
> 
> On 10/10/2017 19:41, Matthias J. Sax wrote:
>> Hi,
>>
>> if the aggregation returns a different type, you can use .aggregate(...)
>> instead of .reduce(...)
>>
>> Also, for you time based computation, did you consider to use windowing?
>>
>>
>> -Matthias
>>
>> On 10/10/17 6:27 AM, RedShift wrote:
>>> Hi all
>>>
>>> Complete noob with regards to stream processing, this is my first
>>> attempt. I'm going to try and explain my thought process, here's what
>>> I'm trying to do:
>>>
>>> I would like to create a sum of "load" for every hour, for every device.
>>>
>>> Incoming stream of data:
>>>
>>> {"deviceId":"1234","data":{"tss":1507619473,"load":9}}
>>> {"deviceId":"1234","data":{"tss":1507619511,"load":8}}
>>> {"deviceId":"1234","data":{"tss":1507619549,"load":5}}
>>> {"deviceId":"9876","data":{"tss":1507619587,"load":8}}
>>> {"deviceId":"1234","data":{"tss":1507619625,"load":8}}
>>> {"deviceId":"1234","data":{"tss":1507619678,"load":8}}
>>> {"deviceId":"9876","data":{"tss":1507619716,"load":8}}
>>> {"deviceId":"9876","data":{"tss":1507619752,"load":9}}
>>> {"deviceId":"1234","data":{"tss":1507619789,"load":8}}
>>> {"deviceId":"9876","data":{"tss":1507619825,"load":8}}
>>> {"deviceId":"9876","data":{"tss":1507619864,"load":8}}
>>>
>>> Where
>>> deviceId: unique ID for every device, which also doubles as the key I
>>> use
>>> tss: UNIX timestamp in seconds
>>> load: load indication
>>>
>>> Expected outcome something like this:
>>> deviceId: 1234, time: 2017-10-01 18:00, load: 25
>>> deviceId: 1234, time: 2017-10-01 19:00, load: 13
>>> deviceId: 9876, time: 2017-10-01 18:00, load: 33
>>> deviceId: 9876, time: 2017-10-01 19:00, load: 5
>>> ...
>>>
>>>
>>> So I started:
>>>
>>>    SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH");
>>> // Important bit here, I use this to construct a grouping key
>>>    KStreamBuilder builder = new KStreamBuilder();
>>>    KStream<String, JsonObject> data = builder.stream("telemetry");
>>>
>>> We need to group by device, so:
>>>
>>>    KGroupedStream<String, JsonObject> grouped = data.groupBy((k, v) ->
>>> v.get("deviceId").asString());
>>>
>>> But now I can't group the data again by date. So I made a combined
>>> grouping key like this:
>>>
>>>    KGroupedStream<String, JsonObject> grouped = data.groupBy(
>>>        (k, v) ->
>>>        {
>>>            Date dt =
>>> Date.from(Instant.ofEpochSecond(v.get("data").asObject().get("tss").asLong()));
>>>
>>>
>>>            return v.get("deviceId").asString() + dateFormat.format(dt);
>>>        }
>>>    );
>>>
>>> Now I need to reduce the groups to sum the load:
>>>
>>>    grouped.reduce(new Reducer<JsonObject>()
>>>    {
>>>        @Override
>>>        public JsonObject apply(JsonObject v1, JsonObject v2)
>>>        {
>>>            return null;
>>>        }
>>>    });
>>>
>>> But that's a problem. I'm supposed to sum "load" here, but I also have
>>> to return a JsonObject. That doesn't seem right. So now I figure I have
>>> to extract the "load" before the reducer, but a KGroupedStream doesn't
>>> have a map() function.
>>>
>>> Back to the drawing board. So I figure let's extract the "load" and
>>> grouping key first:
>>>
>>>    KStream<Object, Object> map = data.map(new KeyValueMapper<String,
>>> JsonObject, KeyValue<?, ?>>()
>>>    {
>>>        @Override
>>>        public KeyValue<String, Integer> apply(String s, JsonObject v)
>>>        {
>>>            Date dt =
>>> Date.from(Instant.ofEpochSecond(v.get("data").asObject().get("tss").asLong()));
>>>
>>>
>>>            String key = v.get("deviceId").asString() +
>>> dateFormat.format(dt);
>>>
>>>            return new KeyValue<>(
>>>                key, v.get("data").asObject().get("load").asInt()
>>>            );
>>>        }
>>>    });
>>>
>>> But now I'm left with a KStream of <Object, Object>. I've lost my types.
>>> If I change it to:
>>> Kstream<String, Integer>, the compiler has this to say:
>>>
>>> Error:(35, 54) java: incompatible types: inference variable KR has
>>> incompatible bounds
>>>      equality constraints: java.lang.String
>>>      lower bounds: java.lang.Object
>>>      Makes sense, as there's no garantuee that a random given object
>>> is a
>>> string. But how do I preserve types then?
>>>
>>> I'm also unsure about the way I'm grouping things. It seems to me I have
>>> to group by deviceId, and then using windowing to get the "per hour"
>>> part. But I'm even more clueless how and where that fits in. For some
>>> reason I also think a KTable should be the final result?
>>>
>>> Thanks,
>>>
>>> Best regards,
>>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to