Hi Peter, Just to follow up on the actual bug, can you confirm whether: * when you say "restart", do you mean orderly shutdown and restart, or crash and restart? * have you tried this with EOS enabled? I can imagine some ways that there could be duplicates, but they should be impossible with EOS enabled.
Thanks for your help, -John On Mon, Jan 14, 2019 at 1:20 PM John Roesler <[email protected]> wrote: > Hi Peter, > > I see your train of thought, but the actual implementation of the > window store is structured differently from your mental model. > Unlike Key/Value stores, we know that the records in a window > store will "expire" on a regular schedule, and also that every single > record will eventually expire. With this in mind, we have implemented > an optimization to avoid a lot of compaction overhead in RocksDB, as > well as saving on range scans. > > Instead of storing everything in one database, we open several > databases and bucket windows into them. Then, when windows > expire, we just ignore the records (i.e., the API makes them unreachable, > but we don't actually delete them). Once all the windows in a database > are expired, we just close and delete the whole database. Then, we open > a new one for new windows. If you look in the code, these databases are > called "segments". > > Thus, I don't think that you should attempt to use the built-in window > stores > as you described. Instead, it should be straightforward to implement your > own StateStore with a layout that's more favorable to your desired > behavior. > > You should also be able to set up the change log the way you need as well. > Explicitly removed entities also would get removed from the log as well, if > it's a compacted log. > > Actually, what you're describing is *very* similar to the implementation > for suppress. I might actually suggest that you just copy the suppression > implementation and adapt it to your needs, or at the very least, study > how it works. In doing so, you might actually discover the cause of the > bug yourself! > > I hope this helps, and thanks for your help, > -John > > > On Sat, Jan 12, 2019 at 5:45 AM Peter Levart <[email protected]> > wrote: > >> Hi Jonh, >> >> Thank you very much for explaining how WindowStore works. I have some >> more questions... >> >> On 1/10/19 5:33 PM, John Roesler wrote: >> > Hi Peter, >> > >> > Regarding retention, I was not referring to log retention, but to the >> > window store retention. >> > Since a new window is created every second (for example), there are in >> > principle an unbounded >> > number of windows (the longer the application runs, the more windows >> there >> > are, with no end). >> > However, we obviously can't store an infinite amount of data, so the >> window >> > definition includes >> > a retention period. By default, this is 24 hours. After the retention >> > period elapses, all of the data >> > for the window is purged to make room for new windows. >> >> Right. Would the following work for example: >> >> - configure retention of WindowStore to be "infinite" >> - explicitly remove records from the store when windows are flushed out >> - configure WindowStore log topic for compacting >> >> Something like the following: >> >> Stores >> .windowStoreBuilder( >> Stores.persistentWindowStore( >> storeName, >> Duration.of(1000L, ChronoUnit.YEARS), // >> retentionPeriod >> Duration.ofSeconds(10), // windowSize >> false >> ), >> keySerde, valSerde >> ) >> .withCachingEnabled() >> .withLoggingEnabled( >> Map.of( >> TopicConfig.CLEANUP_POLICY_CONFIG, >> TopicConfig.CLEANUP_POLICY_COMPACT >> ) >> ); >> >> Would in above scenario: >> >> - the on-disk WindowStore be kept bounded (there could be some very old >> entries in it but majority will be new - depending on the activity of >> particular input keys) >> - the log topic be kept bounded (explicitly removed entries would be >> removed from compacted log too) >> >> I'm moving away from DSL partly because I have some problems with >> suppression (which I hope we'll be able to fix) and partly because the >> DSL can't give me the complicated semantics that I need for the >> application at hand. I tried to capture what I need in a custom >> Transformer here: >> >> https://gist.github.com/plevart/d3f70bee7346f72161ef633aa60dc94f >> >> Your knowledge of how WindowStore works would greatly help me decide if >> this is a workable idea. >> >> > >> > So what I meant was that if you buffer some key "A" in window (Monday >> > 09:00:00) and then get >> > no further activity for A for over 24 hours, then when you do get that >> next >> > event for A, say at >> > (Tuesday 11:00:00), you'd do the scan but find nothing, since your >> buffered >> > state would already >> > have been purged from the store. >> >> Right. That would be the case when WindowStore was configured with >> default retention of 24 hours. A quick question: What does window size >> configuration for WindowStore (see above) do? Does it have to be >> synchronized with the size of windows stored in it? >> >> > >> > The way I avoided this problem for Suppression was to organize the data >> by >> > timestamp instead >> > of by key, so on *every* update I can search for all the keys that are >> old >> > enough and emit them. >> > I also don't use a window store, so I don't have to worry about the >> > retention time. >> > >> > To answer your question about the window store's topic, it configures a >> > retention time the same >> > length as the store's retention time, (and they keys are the full >> windowed >> > key including the window >> > start time), so it'll have roughly the same size bound as the store >> itself. >> >> Would explicitly removed entries from WindowStore be removed from log >> too if it was a compacting log? >> >> > >> > Back to the process of figuring out what might be wrong with >> Suppression, I >> > don't suppose you >> > would be able to file a Jira and upload a repro program? If not, that's >> ok. >> > I haven't been able to >> > reproduce the bug yet, but it seems like it's happening somewhat >> > consistently for you, so I should >> > be able to get it to happen eventually. >> > >> > Thanks, and sorry again for the troubles. >> > -John >> >> I can prepare a minimal reproducer. No problem... >> >> Regards, Peter >> >> > >> > On Tue, Jan 8, 2019 at 6:48 AM Peter Levart <[email protected]> >> wrote: >> > >> >> >> >> On 1/8/19 12:57 PM, Peter Levart wrote: >> >>> Hi John, >> >>> >> >>> On 1/8/19 12:45 PM, Peter Levart wrote: >> >>>>> I looked at your custom transfomer, and it looks almost correct to >> >>>>> me. The >> >>>>> only flaw seems to be that it only looks >> >>>>> for closed windows for the key currently being processed, which >> >>>>> means that >> >>>>> if you have key "A" buffered, but don't get another event for it >> for a >> >>>>> while after the window closes, you won't emit the final result. This >> >>>>> might >> >>>>> actually take longer than the window retention period, in which >> >>>>> case, the >> >>>>> data would be deleted without ever emitting the final result. >> >>>> So in DSL case, the suppression works by flushing *all* of the "ripe" >> >>>> windows in the whole buffer whenever a singe event comes in with >> >>>> recent enough timestamp regardless of the key of that event? >> >>>> >> >>>> Is the buffer shared among processing tasks or does each task >> >>>> maintain its own private buffer that only contains its share of data >> >>>> pertaining to assigned input partitions? In case the tasks are >> >>>> executed on several processing JVM(s) the buffer can't really be >> >>>> shared, right? In that case a single event can't flush all of the >> >>>> "ripe" windows, but just those that are contained in the task's part >> >>>> of buffer... >> >>> Just a question about your comment above: >> >>> >> >>> /"This might actually take longer than the window retention period, in >> >>> which case, the data would be deleted without ever emitting the final >> >>> result"/ >> >>> >> >>> Are you talking about the buffer log topic retention? Aren't log >> >>> topics configured to "compact" rather than "delete" messages? So the >> >>> last "version" of the buffer entry for a particular key should stay >> >>> forever? What are the keys in suppression buffer log topic? Are they a >> >>> pair of (timestamp, key) ? Probably not since in that case the >> >>> compacted log would grow indefinitely... >> >>> >> >>> Another question: >> >>> >> >>> What are the keys in WindowStore's log topic? If the input keys to the >> >>> processor that uses such WindowStore consist of a bounded set of >> >>> values (for example user ids), would compacted log of such WindowStore >> >>> also be bounded? >> >> In case the key of WindowStore log topic is (timestamp, key) then would >> >> explicitly deleting flushed entries from WindowStore (by putting null >> >> value into the store) keep the compacted log bounded? In other words, >> >> does WindowStore log topic support a special kind of "tombstone" >> message >> >> that effectively removes the key from the compacted log? >> >> >> >> In that case, my custom processor could keep entries in its WindowStore >> >> for as log as needed, depending on the activity of a particular input >> >> key... >> >> >> >>> Regards, Peter >> >>> >> >>> >> >> >> >>
