KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2018-12-19 Thread Peter Levart
Hello, I'm trying to use kafka streams to aggregate some time series data using 1 second tumbling time windows. The data is ordered approximately by timestamp with some "jitter" which I'm limiting at the input by a custom TimestampExtractor that moves events into the future if they come in to

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2018-12-19 Thread Peter Levart
I see the list processor managed to smash may beautifully formatted HTML message. For that reason I'm re-sending the sample code snippet in plain text mode...  Here's a sample kafka streams processor:     KStream input =     builder     .stream(     inpu

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2018-12-20 Thread Guozhang Wang
Hello Peter, Thanks for filing this report, I've looked into the source code and I think I may spotted an edge case to your observations. To validate if my suspicion is correct, could you try modifying your DSL code a little bit, to use a very large suppression buffer size --- BTW the StrictBuffer

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2018-12-21 Thread Peter Levart
Hello Guozhang, Thank you for looking into this problem. I noticed that I have been using an internal class constructor and later discovered the right API to create the StrictBufferConfig implementations. But I'm afraid that using your proposed factory method won't change anything since its i

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2018-12-21 Thread Peter Levart
Hello Guozhang, May I just add some more observations which might help you pin-point the problem... When the process that runs the kafka streams processing threads is restarted, I can see duplicates in the output topic. But that is understandable for "at least once semantics" and I don't min

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2018-12-25 Thread Peter Levart
Hello Guozhang, Just wanted to say that I have managed to come up with a different solution that doesn't have these problems. Instead of doing the following:     kStream     .groupByKey()     .windowedBy(timeWindows)     .aggregate(     initializer,     aggregator,

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2018-12-26 Thread Peter Levart
On 12/21/18 3:16 PM, Peter Levart wrote: I also see some results that are actual non-final window aggregations that precede the final aggregations. These non-final results are never emitted out of order (for example, no such non-final result would ever come after the final result for a parti

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-01-07 Thread John Roesler
Hi Peter, Sorry, I just now have seen this thread. You asked if this behavior is unexpected, and the answer is yes. Suppress.untilWindowCloses is intended to emit only the final result, regardless of restarts. You also asked how the suppression buffer can resume after a restart, since it's not p

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-01-08 Thread Peter Levart
Hi John, On 1/7/19 9:10 PM, John Roesler wrote: Hi Peter, Sorry, I just now have seen this thread. You asked if this behavior is unexpected, and the answer is yes. Suppress.untilWindowCloses is intended to emit only the final result, regardless of restarts. You also asked how the suppression

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-01-08 Thread Peter Levart
Hi John, On 1/8/19 12:45 PM, Peter Levart wrote: I looked at your custom transfomer, and it looks almost correct to me. The only flaw seems to be that it only looks for closed windows for the key currently being processed, which means that if you have key "A" buffered, but don't get another e

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-01-08 Thread Peter Levart
On 1/8/19 12:57 PM, Peter Levart wrote: Hi John, On 1/8/19 12:45 PM, Peter Levart wrote: I looked at your custom transfomer, and it looks almost correct to me. The only flaw seems to be that it only looks for closed windows for the key currently being processed, which means that if you ha

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-01-10 Thread John Roesler
Hi Peter, Regarding retention, I was not referring to log retention, but to the window store retention. Since a new window is created every second (for example), there are in principle an unbounded number of windows (the longer the application runs, the more windows there are, with no end). Howeve

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-01-14 Thread John Roesler
Hi Peter, I see your train of thought, but the actual implementation of the window store is structured differently from your mental model. Unlike Key/Value stores, we know that the records in a window store will "expire" on a regular schedule, and also that every single record will eventually expi

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-01-22 Thread John Roesler
Hi Peter, Just to follow up on the actual bug, can you confirm whether: * when you say "restart", do you mean orderly shutdown and restart, or crash and restart? * have you tried this with EOS enabled? I can imagine some ways that there could be duplicates, but they should be impossible with EOS e

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-01-23 Thread Peter Levart
Hi John, Sorry I haven't had time to prepare the minimal reproducer yet. I still have plans to do it though... On 1/22/19 8:02 PM, John Roesler wrote: Hi Peter, Just to follow up on the actual bug, can you confirm whether: * when you say "restart", do you mean orderly shutdown and restart, o

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-01-24 Thread John Roesler
Hi Peter, Thanks for the clarification. When you hit the "stop" button, AFAIK it does send a SIGTERM, but I don't think that Streams automatically registers a shutdown hook. In our examples and demos, we register a shutdown hook "outside" of streams (right next to the code that calls start() ). U

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-01-24 Thread Peter Levart
Hi John, On 1/24/19 3:18 PM, John Roesler wrote: Hi Peter, Thanks for the clarification. When you hit the "stop" button, AFAIK it does send a SIGTERM, but I don't think that Streams automatically registers a shutdown hook. In our examples and demos, we register a shutdown hook "outside" of str

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-01-24 Thread Peter Levart
Hi John, Haven't been able to reinstate the demo yet, but I have been re-reading the following scenario of yours On 1/24/19 11:48 PM, Peter Levart wrote: Hi John, On 1/24/19 3:18 PM, John Roesler wrote: The reason is that, upon restart, the suppression buffer can only "remember" what

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-01-25 Thread John Roesler
Hi Peter, Thanks for the replies. Regarding transactions: Yes, actually, with EOS enabled, the changelog and the output topics are all produced with the same transactional producer, within the same transactions. So it should already be atomic. Regarding restore: Streams doesn't put the store int

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-02-26 Thread John Roesler
Hi again, Peter, Just to close the loop about the bug in Suppress, we did get the (apparent) same report from a few other people: https://issues.apache.org/jira/browse/KAFKA-7895 I also managed to reproduce the duplicate-result behavior, which could cause it to emit both intermediate results and

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-03-01 Thread Jonathan Santilli
Hello John, hope you are well. I have tested the version 2.2 release candidate (although I know it has been postponed). I have been following this email thread because I think am experiencing the same issue. I have reported in an email to this list and also all the details are in OS ( https://stack

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-03-01 Thread Jonathan Santilli
BTW, after stopping the app gracefully (Stream#close()), this error shows up repeatedly: 2019-03-01 17:18:07,819 WARN [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1] internals.ProcessorStateManager (ProcessorStateManager.java:327) - task [0_0] Failed to write offset checkpoint file to [/

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-03-04 Thread John Roesler
Hi Jonathan, Sorry to hear that the feature is causing you trouble as well, and that the 2.2 release candidate didn't seem to fix it. I'll try and do a repro based on the code in your SO post tomorrow. I don't think it's related to the duplicates, but that shutdown error is puzzling. Can you pri

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-03-05 Thread John Roesler
Hi Jonathan, Just a quick update: I have not been able to reproduce the duplicates issue with the 2.2 RC, even with a topology very similar to the one you included in your stackoverflow post. I think we should treat this as a new bug. Would you mind opening a new Jira bug ticket with some steps t

Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-03-08 Thread Jonathan Santilli
Sure John, I will document it. Thanks a lot for your reply. -- Jonathan On Tue, Mar 5, 2019 at 7:38 PM John Roesler wrote: > Hi Jonathan, > > Just a quick update: I have not been able to reproduce the duplicates issue > with the 2.2 RC, even with a topology very similar to the one you includ