Hey all,
  Just wanted to follow up.  I have not had much time to work on this, been
trying to figure out why leaving the stream on for a few days causes some
of the stream apps to go into this death spiral of connecting/disconnecting.

  The store growing unbounded stopped me in my tracks and I haven't thought
of a good way around that yet.  Was reading the source code for the RockDB
Store and noticed that there's mention of handling TTL in there but its not
currently implemented.  And as of right now me knowledge of RocksDB is that
Kafka Streams uses it. :)  So not too comfortable trying to configure my
own RocksDB store just yet.

 I have thought about writing a custom processor for this that handles all
the windows in the same step (after the 1 minute window runs).  I  think it
would just store the 1min stuff and on punctuate() send out updates just
for the time slices that have changed.  The only way I've thought about
doing it is having two stores, one really to recreate the 1 minute store to
tell if we have seen the value before or not (to do the subtractor part of
a KTable.aggregate()) and another to hold the latest x minute window values.

Thoughts?

On Tue, May 9, 2017 at 9:21 AM, Michal Borowiecki <
michal.borowie...@openbet.com> wrote:

> Just had a thought:
>
> If you implement the Windowed/Tuple serde to store the timestamp(s) before
> the actual record key then you can simply periodically do a ranged query on
> each of the state stores to find and delete all data older than ... (using
> punctuate() inside a Processor).
>
> Any downsides to that?
>
> Cheers,
>
> Michał
>
>
>
> On 09/05/17 09:17, Michal Borowiecki wrote:
>
>> Hi Matthias,
>>
>> Yes, the ever growing stores were my concern too. That was the intention
>> behind my TODO note in the first reply just didn't want to touch on this
>> until I've dug deeper into it.
>>
>> I understand compaction+retention policy on the backing changelog topics
>> takes care of cleaning up on the broker-side but Rocks dbs will grow
>> indefinitely, right? (until re-balanced?)
>>
>>
>> Punctuation was the first idea that came to my mind too when originally
>> faced this problem on my project. However, as you said it's only on KStream
>> and aggregations on KStream actually discard tombstones and don't forward
>> them on to the KTable:
>>
>> https://github.com/apache/kafka/blob/trunk/streams/src/main/
>> java/org/apache/kafka/streams/kstream/KGroupedStream.java#L798-L799
>>
>>      * Aggregate the values of records in this stream by the grouped key.
>>      * Records with {@code null} key or value are ignored.
>>
>> I haven't come up with a satisfactory solution yet, but it's still on my
>> mind.
>>
>>
>> TTLs on stores could potentially solve this issue and just today they
>> were asked about on SO: http://stackoverflow.com/quest
>> ions/43860114/kafka-streams-low-level-processor-api-
>> rocksdb-timetolivettl/43862922#43862922
>>
>> Garrett, was that you? :-)
>>
>>
>> Thanks,
>>
>> Michał
>>
>>
>> On 08/05/17 23:29, Matthias J. Sax wrote:
>>
>>> Thinking about this once more (and also having a fresh memory of another
>>> thread about KTables), I am wondering if this approach needs some extra
>>> tuning:
>>>
>>> As the result of the first window aggregation produces an output stream
>>> with unbounded key space, the following (non-windowed) KTables would
>>> grow indefinitely, if I don't miss anything.
>>>
>>> Thus, it might be required to put a transform() that only forwards all
>>> data 1-to-1, but additionally registers a punctuation schedule. When
>>> punctuation is called, it would be required to send tombstone messages
>>> downstream (or a simliar) that deletes windows that are older than the
>>> retention time. Sound tricky to implement though... `transform()` would
>>> need to keep track of used keys to send appropriate tombstones in an
>>> custom state. Also. `transform` is only available for KStream and
>>> transforming (windowed) KTable into KStream into KTable while preserving
>>> the required semantics seems not to be straight forwards.
>>>
>>> Any thoughts about this potential issue?
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 5/8/17 3:05 PM, Garrett Barton wrote:
>>>
>>>> Michael,
>>>>    This is slick!  I am still writing unit tests to verify it.  My code
>>>> looks something like:
>>>>
>>>> KTable<Windowed<String>, CountSumMinMaxAvgObj> oneMinuteWindowed =
>>>> srcStream    // my val object isnt really called that, just wanted to
>>>> show
>>>> a sample set of calculations the value can do!
>>>>      .groupByKey(Serdes.String(), Serdes.Double())
>>>>      .aggregate(/*initializer */, /* aggregator */,
>>>> TimeWindows.of(60*1000,
>>>> 60*1000), "store1m");
>>>>
>>>>     // i used an aggregate here so I could have a non-primitive value
>>>> object
>>>> that does the calculations on each aggregator, pojo has an .add(Double)
>>>> in
>>>> it.
>>>> KTable<Tuple2<String, Long>, CountSumMinMaxAvgObj> fiveMinuteWindowed =
>>>> oneMinuteWindowed    // I made my own Tuple2, will move window calc
>>>> into it
>>>>      .groupBy( (windowedKey, value) -> new KeyValue<>(new Tuple2<String,
>>>> Long>(windowedKey.key(), windowedKey.window().end() /1000/60/5
>>>> *1000*60*5),
>>>> value, keySerde, valSerde)
>>>>
>>>>          // the above rounds time down to a timestamp divisible by 5
>>>> minutes
>>>>
>>>>      .reduce(/*your adder*/, /*your subtractor*/, "store5m");
>>>>
>>>>          // where your subtractor can be as simple as (val, agg) -> agg
>>>> - val
>>>> for primitive types or as complex as you need,
>>>>
>>>>          // just make sure you get the order right (lesson hard learnt
>>>> ;) ),
>>>> subtraction is not commutative!
>>>>
>>>>          // again my val object has an .add(Obj) and a .sub() to handle
>>>> this, so nice!
>>>>
>>>>
>>>> KTable<Tuple2<String, Long>, CountSumMinMaxAvgObj>
>>>> fifteenMinuteWindowed =
>>>> fiveMinuteWindowed
>>>>
>>>>      .groupBy( (keyPair, value) -> new KeyValue<>(new Tuple2(keyPair._1,
>>>> keyPair._2 /1000/60/15 *1000*60*15), value, keySerde, valSerde)
>>>>
>>>>          // the above rounds time down to a timestamp divisible by 15
>>>> minutes
>>>>
>>>>      .reduce(/*your adder*/, /*your subtractor*/, "store15m");
>>>>
>>>>
>>>> KTable<Tuple2<String, Long>, CountSumMinMaxAvgObj> sixtyMinuteWindowed =
>>>> fifteeenMinuteWindowed
>>>>
>>>>      .groupBy( (keyPair, value) -> new KeyValue<>(new
>>>> Tuple2(keyPairair._1,
>>>> pair._2 /1000/60/60 *1000*60*60), value, keySerde, valSerde)
>>>>
>>>>          // the above rounds time down to a timestamp divisible by 60
>>>> minutes
>>>>
>>>>      .reduce(/*your adder*/, /*your subtractor*/, "store60m");
>>>>
>>>>
>>>> Notes thus far:
>>>>    Doesn't look like I need to start the 5min with a windowed KTable
>>>> return
>>>> object, it starts with the regular KTable<Tuple2<String,Long>> in this
>>>> case.
>>>>    I thinking about using windowedKey.window().start() instead of end()
>>>> as I
>>>> believe that is more consistent with what the windows themselves put
>>>> out.
>>>> They go into the stores bound by their start time I believe.
>>>>    Serdes gets nuts as well as the Generic typing on some of these
>>>> classes
>>>> (yea you KeyValueMapper), makes for long code!  I had to specify them
>>>> everywhere since the key/val's changed.
>>>>
>>>>
>>>> I didn't get enough time to mess with it today, I will wrap up the unit
>>>> tests and run it to see how it performs against my real data as well
>>>> tomorrow.  I expect a huge reduction in resources (both streams and
>>>> kafka
>>>> storage) by moving to this.
>>>> Thank you!
>>>>
>>>>
>>>>
>>>> On Mon, May 8, 2017 at 5:26 PM, Matthias J. Sax <matth...@confluent.io>
>>>> wrote:
>>>>
>>>> Michal,
>>>>>
>>>>> that's an interesting idea. In an ideal world, Kafka Streams should
>>>>> have
>>>>> an optimizer that is able to to this automatically under the hood. Too
>>>>> bad we are not there yet.
>>>>>
>>>>> @Garret: did you try this out?
>>>>>
>>>>> This seems to be a question that might affect many users, and it might
>>>>> we worth to document it somewhere as a recommended pattern.
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>> On 5/8/17 1:43 AM, Michal Borowiecki wrote:
>>>>>
>>>>>> Apologies,
>>>>>>
>>>>>> In the code snippet of course only oneMinuteWindowed KTable will have
>>>>>> a
>>>>>> Windowed key (KTable<Windowed<Key>, Value>), all others would be just
>>>>>> KTable<Tuple2<Key, Long>, Value>.
>>>>>>
>>>>>> Michał
>>>>>>
>>>>>> On 07/05/17 16:09, Michal Borowiecki wrote:
>>>>>>
>>>>>>> Hi Garrett,
>>>>>>>
>>>>>>> I've encountered a similar challenge in a project I'm working on
>>>>>>> (it's
>>>>>>> still work in progress, so please take my suggestions with a grain of
>>>>>>> salt).
>>>>>>>
>>>>>>> Yes, I believe KTable.groupBy lets you accomplish what you are aiming
>>>>>>> for with something like the following (same snippet attached as txt
>>>>>>>
>>>>>> file):
>>>>>
>>>>>>
>>>>>>> KTable<Windowed<Key>, Value> oneMinuteWindowed = yourStream    //
>>>>>>> where Key and Value stand for your actual key and value types
>>>>>>>
>>>>>>>      .groupByKey()
>>>>>>>
>>>>>>>      .reduce(/*your adder*/, TimeWindows.of(60*1000, 60*1000),
>>>>>>>
>>>>>> "store1m");
>>>>>
>>>>>>          //where your adder can be as simple as (val, agg) -> agg +
>>>>>>> val
>>>>>>>
>>>>>>>          //for primitive types or as complex as you need
>>>>>>>
>>>>>>>
>>>>>>> KTable<Windowed<Tuple2<Key, Long>>, Value> fiveMinuteWindowed =
>>>>>>> oneMinuteWindowed    // Tuple2 for this example as defined by
>>>>>>> javaslang library
>>>>>>>
>>>>>>>      .groupBy( (windowedKey, value) -> new KeyValue<>(new
>>>>>>> Tuple2<>(windowedKey.key(), windowedKey.window().end() /1000/60/5
>>>>>>> *1000*60*5), value)
>>>>>>>
>>>>>>>          // the above rounds time down to a timestamp divisible by 5
>>>>>>> minutes
>>>>>>>
>>>>>>>      .reduce(/*your adder*/, /*your subtractor*/, "store5m");
>>>>>>>
>>>>>>>          // where your subtractor can be as simple as (val, agg) ->
>>>>>>> agg
>>>>>>> - valfor primitive types or as complex as you need,
>>>>>>>
>>>>>>>          // just make sure you get the order right (lesson hard
>>>>>>> learnt
>>>>>>> ;) ), subtraction is not commutative!
>>>>>>>
>>>>>>>
>>>>>>> KTable<Windowed<Tuple2<Key, Long>>, Value> fifteenMinuteWindowed =
>>>>>>> fiveMinuteWindowed
>>>>>>>
>>>>>>>      .groupBy( (keyPair, value) -> new KeyValue<>(new
>>>>>>> Tuple2(keyPair._1, keyPair._2/1000/60/15 *1000*60*15), value)
>>>>>>>
>>>>>>>          // the above rounds time down to a timestamp divisible by 15
>>>>>>> minutes
>>>>>>>
>>>>>>>      .reduce(/*your adder*/, /*your subtractor*/, "store15m");
>>>>>>>
>>>>>>>
>>>>>>> KTable<Windowed<Tuple2<Key, Long>>, Value> sixtyMinuteWindowed =
>>>>>>> fifteeenMinuteWindowed
>>>>>>>
>>>>>>>      .groupBy( (keyPair, value) -> new KeyValue<>(new
>>>>>>> Tuple2(keyPairair._1, pair._2 /1000/60/60 *1000*60*60), value)
>>>>>>>
>>>>>>>          // the above rounds time down to a timestamp divisible by 5
>>>>>>> minutes
>>>>>>>
>>>>>>>      .reduce(/*your adder*/, /*your subtractor*/, "store60m");
>>>>>>>
>>>>>>>
>>>>>>> So, step by step:
>>>>>>>
>>>>>>>    * You use a windowed aggregation only once, from there on you use
>>>>>>>      the KTable abstraction only (which doesn't have windowed
>>>>>>>      aggregations).
>>>>>>>    * In each subsequent groupBy you map the key to a pair of
>>>>>>>      (your-real-key, timestamp) where the timestamp is rounded down
>>>>>>>      with the precision of the size of the new window.
>>>>>>>    * reduce() on a KGroupedTable takes an adder and a subtractor and
>>>>>>> it
>>>>>>>      will correctly update the new aggregate by first subtracting the
>>>>>>>      previous value of the upstream record before adding the new
>>>>>>> value
>>>>>>>      (this way, just as you said, the downstream is aware of the
>>>>>>>      statefulness of the upstream and correctly treats each record as
>>>>>>>      an update)
>>>>>>>    * If you want to reduce message volume further, you can break
>>>>>>> these
>>>>>>>      into separate KafkaStreams instances and configure downstream
>>>>>>> ones
>>>>>>>      with a higher commit.interval.ms (unfortunately you can't have
>>>>>>>      different values of this setting in different places of the same
>>>>>>>      topology I'm afraid)
>>>>>>>    * TODO: Look into retention policies, I haven't investigated that
>>>>>>> in
>>>>>>>      any detail.
>>>>>>>
>>>>>>> I haven't tested this exact code, so please excuse any typos.
>>>>>>>
>>>>>>> Also, if someone with more experience could chip in and check if I'm
>>>>>>> not talking nonsense here, or if there's an easier way to this, that
>>>>>>> would be great.
>>>>>>>
>>>>>>>
>>>>>>> I don't know if the alternative approach is possible, where you
>>>>>>> convert each resulting KTable back into a stream and just do a
>>>>>>> windowed aggregation somehow. That would feel more natural, but I
>>>>>>> haven't figured out how to correctly window over a changelog in the
>>>>>>> KStream abstraction, feels impossible in the high-level DSL.
>>>>>>>
>>>>>>> Hope that helps,
>>>>>>> Michal
>>>>>>>
>>>>>>> On 02/05/17 18:03, Garrett Barton wrote:
>>>>>>>
>>>>>>>> Lets say I want to sum values over increasing window sizes of
>>>>>>>> 1,5,15,60
>>>>>>>> minutes.  Right now I have them running in parallel, meaning if I am
>>>>>>>> producing 1k/sec records I am consuming 4k/sec to feed each
>>>>>>>>
>>>>>>> calculation.
>>>>>
>>>>>> In reality I am calculating far more than sum, and in this pattern I'm
>>>>>>>> looking at something like (producing rate)*(calculations)*(windows)
>>>>>>>>
>>>>>>> for a
>>>>>
>>>>>> consumption rate.
>>>>>>>>
>>>>>>>>   So I had the idea, could I feed the 1 minute window into the 5
>>>>>>>>
>>>>>>> minute, and
>>>>>
>>>>>> 5 into 15, and 15 into 60. Theoretically I would consume a fraction
>>>>>>>>
>>>>>>> of the
>>>>>
>>>>>> records, not have to scale as huge and be back to something like
>>>>>>>>
>>>>>>> (producing
>>>>>
>>>>>> rate)*(calculations)+(updates).
>>>>>>>>
>>>>>>>>    Thinking this is an awesome idea I went to try and implement it
>>>>>>>> and
>>>>>>>>
>>>>>>> got
>>>>>
>>>>>> twisted around.  These are windowed grouping operations that produce
>>>>>>>> KTables, which means instead of a raw stream I have an update
>>>>>>>> stream.
>>>>>>>>
>>>>>>> To
>>>>>
>>>>>> me this implies that downstream must be aware of this and consume
>>>>>>>>
>>>>>>> stateful
>>>>>
>>>>>> information, knowing that each record is an update and not an in
>>>>>>>>
>>>>>>> addition
>>>>>
>>>>>> to.  Does the high level api handle that construct and let me do
>>>>>>>>
>>>>>>> that?  For
>>>>>
>>>>>> a simple sum it would have to hold each of the latest values for say
>>>>>>>>
>>>>>>> the 5
>>>>>
>>>>>> 1 minute sum's in a given window, to perform the 5 minute sum.
>>>>>>>>
>>>>>>> Reading the
>>>>>
>>>>>> docs which are awesome, I cannot determine if the KTable.groupby()
>>>>>>>>
>>>>>>> would
>>>>>
>>>>>> work over a window, and would reduce or aggregate thus do what I need?
>>>>>>>>
>>>>>>>> Any ideas?
>>>>>>>>
>>>>>>>> --
>>>>>>> Signature
>>>>>>> <http://www.openbet.com/>    Michal Borowiecki
>>>>>>> Senior Software Engineer L4
>>>>>>>       T:      +44 208 742 1600
>>>>>>>
>>>>>>>
>>>>>>>       +44 203 249 8448
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>       E:      michal.borowie...@openbet.com
>>>>>>>       W:      www.openbet.com <http://www.openbet.com/>
>>>>>>>
>>>>>>>
>>>>>>>       OpenBet Ltd
>>>>>>>
>>>>>>>       Chiswick Park Building 9
>>>>>>>
>>>>>>>       566 Chiswick High Rd
>>>>>>>
>>>>>>>       London
>>>>>>>
>>>>>>>       W4 5XT
>>>>>>>
>>>>>>>       UK
>>>>>>>
>>>>>>>
>>>>>>> <https://www.openbet.com/email_promo>
>>>>>>>
>>>>>>> This message is confidential and intended only for the addressee. If
>>>>>>> you have received this message in error, please immediately notify
>>>>>>> the
>>>>>>> postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it
>>>>>>> from your system as well as any copies. The content of e-mails as
>>>>>>> well
>>>>>>> as traffic data may be monitored by OpenBet for employment and
>>>>>>> security purposes. To protect the environment please do not print
>>>>>>> this
>>>>>>> e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick
>>>>>>> Park
>>>>>>> Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A
>>>>>>> company registered in England and Wales. Registered no. 3134634. VAT
>>>>>>> no. GB927523612
>>>>>>>
>>>>>>> --
>>>>>> Signature
>>>>>> <http://www.openbet.com/>     Michal Borowiecki
>>>>>> Senior Software Engineer L4
>>>>>>        T:      +44 208 742 1600
>>>>>>
>>>>>>
>>>>>>        +44 203 249 8448
>>>>>>
>>>>>>
>>>>>>
>>>>>>        E:      michal.borowie...@openbet.com
>>>>>>        W:      www.openbet.com <http://www.openbet.com/>
>>>>>>
>>>>>>
>>>>>>        OpenBet Ltd
>>>>>>
>>>>>>        Chiswick Park Building 9
>>>>>>
>>>>>>        566 Chiswick High Rd
>>>>>>
>>>>>>        London
>>>>>>
>>>>>>        W4 5XT
>>>>>>
>>>>>>        UK
>>>>>>
>>>>>>
>>>>>> <https://www.openbet.com/email_promo>
>>>>>>
>>>>>> This message is confidential and intended only for the addressee. If
>>>>>> you
>>>>>> have received this message in error, please immediately notify the
>>>>>> postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it
>>>>>> from your system as well as any copies. The content of e-mails as well
>>>>>> as traffic data may be monitored by OpenBet for employment and
>>>>>> security
>>>>>> purposes. To protect the environment please do not print this e-mail
>>>>>> unless necessary. OpenBet Ltd. Registered Office: Chiswick Park
>>>>>> Building
>>>>>> 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company
>>>>>> registered in England and Wales. Registered no. 3134634. VAT no.
>>>>>> GB927523612
>>>>>>
>>>>>>
>>>>>
>>
>

Reply via email to