Hi Peter,

Just to follow up on the actual bug, can you confirm whether:
* when you say "restart", do you mean orderly shutdown and restart, or
crash and restart?
* have you tried this with EOS enabled? I can imagine some ways that there
could be duplicates, but they should be impossible with EOS enabled.

Thanks for your help,
-John

On Mon, Jan 14, 2019 at 1:20 PM John Roesler <j...@confluent.io> wrote:

> Hi Peter,
>
> I see your train of thought, but the actual implementation of the
> window store is structured differently from your mental model.
> Unlike Key/Value stores, we know that the records in a window
> store will "expire" on a regular schedule, and also that every single
> record will eventually expire. With this in mind, we have implemented
> an optimization to avoid a lot of compaction overhead in RocksDB, as
> well as saving on range scans.
>
> Instead of storing everything in one database, we open several
> databases and bucket windows into them. Then, when windows
> expire, we just ignore the records (i.e., the API makes them unreachable,
> but we don't actually delete them). Once all the windows in a database
> are expired, we just close and delete the whole database. Then, we open
> a new one for new windows. If you look in the code, these databases are
> called "segments".
>
> Thus, I don't think that you should attempt to use the built-in window
> stores
> as you described. Instead, it should be straightforward to implement your
> own StateStore with a layout that's more favorable to your desired
> behavior.
>
> You should also be able to set up the change log the way you need as well.
> Explicitly removed entities also would get removed from the log as well, if
> it's a compacted log.
>
> Actually, what you're describing is *very* similar to the implementation
> for suppress. I might actually suggest that you just copy the suppression
> implementation and adapt it to your needs, or at the very least, study
> how it works. In doing so, you might actually discover the cause of the
> bug yourself!
>
> I hope this helps, and thanks for your help,
> -John
>
>
> On Sat, Jan 12, 2019 at 5:45 AM Peter Levart <peter.lev...@gmail.com>
> wrote:
>
>> Hi Jonh,
>>
>> Thank you very much for explaining how WindowStore works. I have some
>> more questions...
>>
>> On 1/10/19 5:33 PM, John Roesler wrote:
>> > Hi Peter,
>> >
>> > Regarding retention, I was not referring to log retention, but to the
>> > window store retention.
>> > Since a new window is created every second (for example), there are in
>> > principle an unbounded
>> > number of windows (the longer the application runs, the more windows
>> there
>> > are, with no end).
>> > However, we obviously can't store an infinite amount of data, so the
>> window
>> > definition includes
>> > a retention period. By default, this is 24 hours. After the retention
>> > period elapses, all of the data
>> > for the window is purged to make room for new windows.
>>
>> Right. Would the following work for example:
>>
>> - configure retention of WindowStore to be "infinite"
>> - explicitly remove records from the store when windows are flushed out
>> - configure WindowStore log topic for compacting
>>
>> Something like the following:
>>
>>          Stores
>>              .windowStoreBuilder(
>>                  Stores.persistentWindowStore(
>>                      storeName,
>>                      Duration.of(1000L, ChronoUnit.YEARS), //
>> retentionPeriod
>>                      Duration.ofSeconds(10), // windowSize
>>                      false
>>                  ),
>>                  keySerde, valSerde
>>              )
>>              .withCachingEnabled()
>>              .withLoggingEnabled(
>>                  Map.of(
>>                      TopicConfig.CLEANUP_POLICY_CONFIG,
>> TopicConfig.CLEANUP_POLICY_COMPACT
>>                  )
>>              );
>>
>> Would in above scenario:
>>
>> - the on-disk WindowStore be kept bounded (there could be some very old
>> entries in it but majority will be new - depending on the activity of
>> particular input keys)
>> - the log topic be kept bounded (explicitly removed entries would be
>> removed from compacted log too)
>>
>> I'm moving away from DSL partly because I have some problems with
>> suppression (which I hope we'll be able to fix) and partly because the
>> DSL can't give me the complicated semantics that I need for the
>> application at hand. I tried to capture what I need in a custom
>> Transformer here:
>>
>> https://gist.github.com/plevart/d3f70bee7346f72161ef633aa60dc94f
>>
>> Your knowledge of how WindowStore works would greatly help me decide if
>> this is a workable idea.
>>
>> >
>> > So what I meant was that if you buffer some key "A" in window (Monday
>> > 09:00:00) and then get
>> > no further activity for A for over 24 hours, then when you do get that
>> next
>> > event for A, say at
>> > (Tuesday 11:00:00), you'd do the scan but find nothing, since your
>> buffered
>> > state would already
>> > have been purged from the store.
>>
>> Right. That would be the case when WindowStore was configured with
>> default retention of 24 hours. A quick question: What does window size
>> configuration for WindowStore (see above) do? Does it have to be
>> synchronized with the size of windows stored in it?
>>
>> >
>> > The way I avoided this problem for Suppression was to organize the data
>> by
>> > timestamp instead
>> > of by key, so on *every* update I can search for all the keys that are
>> old
>> > enough and emit them.
>> > I also don't use a window store, so I don't have to worry about the
>> > retention time.
>> >
>> > To answer your question about the window store's topic, it configures a
>> > retention time the same
>> > length as the store's retention time, (and they keys are the full
>> windowed
>> > key including the window
>> > start time), so it'll have roughly the same size bound as the store
>> itself.
>>
>> Would explicitly removed entries from WindowStore be removed from log
>> too if it was a compacting log?
>>
>> >
>> > Back to the process of figuring out what might be wrong with
>> Suppression, I
>> > don't suppose you
>> > would be able to file a Jira and upload a repro program? If not, that's
>> ok.
>> > I haven't been able to
>> > reproduce the bug yet, but it seems like it's happening somewhat
>> > consistently for you, so I should
>> > be able to get it to happen eventually.
>> >
>> > Thanks, and sorry again for the troubles.
>> > -John
>>
>> I can prepare a minimal reproducer. No problem...
>>
>> Regards, Peter
>>
>> >
>> > On Tue, Jan 8, 2019 at 6:48 AM Peter Levart <peter.lev...@gmail.com>
>> wrote:
>> >
>> >>
>> >> On 1/8/19 12:57 PM, Peter Levart wrote:
>> >>> Hi John,
>> >>>
>> >>> On 1/8/19 12:45 PM, Peter Levart wrote:
>> >>>>> I looked at your custom transfomer, and it looks almost correct to
>> >>>>> me. The
>> >>>>> only flaw seems to be that it only looks
>> >>>>> for closed windows for the key currently being processed, which
>> >>>>> means that
>> >>>>> if you have key "A" buffered, but don't get another event for it
>> for a
>> >>>>> while after the window closes, you won't emit the final result. This
>> >>>>> might
>> >>>>> actually take longer than the window retention period, in which
>> >>>>> case, the
>> >>>>> data would be deleted without ever emitting the final result.
>> >>>> So in DSL case, the suppression works by flushing *all* of the "ripe"
>> >>>> windows in the whole buffer whenever a singe event comes in with
>> >>>> recent enough timestamp regardless of the key of that event?
>> >>>>
>> >>>> Is the buffer shared among processing tasks or does each task
>> >>>> maintain its own private buffer that only contains its share of data
>> >>>> pertaining to assigned input partitions? In case the tasks are
>> >>>> executed on several processing JVM(s) the buffer can't really be
>> >>>> shared, right? In that case a single event can't flush all of the
>> >>>> "ripe" windows, but just those that are contained in the task's part
>> >>>> of buffer...
>> >>> Just a question about your comment above:
>> >>>
>> >>> /"This might actually take longer than the window retention period, in
>> >>> which case, the data would be deleted without ever emitting the final
>> >>> result"/
>> >>>
>> >>> Are you talking about the buffer log topic retention? Aren't log
>> >>> topics configured to "compact" rather than "delete" messages? So the
>> >>> last "version" of the buffer entry for a particular key should stay
>> >>> forever? What are the keys in suppression buffer log topic? Are they a
>> >>> pair of (timestamp, key) ? Probably not since in that case the
>> >>> compacted log would grow indefinitely...
>> >>>
>> >>> Another question:
>> >>>
>> >>> What are the keys in WindowStore's log topic? If the input keys to the
>> >>> processor that uses such WindowStore consist of a bounded set of
>> >>> values (for example user ids), would compacted log of such WindowStore
>> >>> also be bounded?
>> >> In case the key of WindowStore log topic is (timestamp, key) then would
>> >> explicitly deleting flushed entries from WindowStore (by putting null
>> >> value into the store) keep the compacted log bounded? In other words,
>> >> does WindowStore log topic support a special kind of "tombstone"
>> message
>> >> that effectively removes the key from the compacted log?
>> >>
>> >> In that case, my custom processor could keep entries in its WindowStore
>> >> for as log as needed, depending on the activity of a particular input
>> >> key...
>> >>
>> >>> Regards, Peter
>> >>>
>> >>>
>> >>
>>
>>

Reply via email to