Sure John, I will document it.
Thanks a lot for your reply. -- Jonathan On Tue, Mar 5, 2019 at 7:38 PM John Roesler <j...@confluent.io> wrote: > Hi Jonathan, > > Just a quick update: I have not been able to reproduce the duplicates issue > with the 2.2 RC, even with a topology very similar to the one you included > in your stackoverflow post. > > I think we should treat this as a new bug. Would you mind opening a new > Jira bug ticket with some steps to reproduce the problem, and also exactly > the behavior you observe? > > Thanks, > -John > > On Mon, Mar 4, 2019 at 10:41 PM John Roesler <j...@confluent.io> wrote: > > > Hi Jonathan, > > > > Sorry to hear that the feature is causing you trouble as well, and that > > the 2.2 release candidate didn't seem to fix it. > > > > I'll try and do a repro based on the code in your SO post tomorrow. > > > > I don't think it's related to the duplicates, but that shutdown error is > > puzzling. Can you print the topology (with topology.describe() ) ? This > > will tell us what is in task 1 (i.e., *1_*) of your program. > > > > Thanks, > > -John > > > > On Fri, Mar 1, 2019 at 11:33 AM Jonathan Santilli < > > jonathansanti...@gmail.com> wrote: > > > >> BTW, after stopping the app gracefully (Stream#close()), this error > shows > >> up repeatedly: > >> > >> 2019-03-01 17:18:07,819 WARN > >> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1] > >> internals.ProcessorStateManager (ProcessorStateManager.java:327) - task > >> [0_0] Failed to write offset checkpoint file to > >> [/tmp/kafka-stream/XXX/0_0/.checkpoint] > >> > >> java.io.FileNotFoundException: /tmp/kafka-stream/XXX/0_0/.checkpoint.tmp > >> (No such file or directory) > >> > >> at java.io.FileOutputStream.open0(Native Method) ~[?:1.8.0_191] > >> > >> at java.io.FileOutputStream.open(FileOutputStream.java:270) > ~[?:1.8.0_191] > >> > >> at java.io.FileOutputStream.<init>(FileOutputStream.java:213) > >> ~[?:1.8.0_191] > >> > >> at java.io.FileOutputStream.<init>(FileOutputStream.java:162) > >> ~[?:1.8.0_191] > >> > >> at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write( > >> OffsetCheckpoint.java:79) ~[kafka-streams-2.2.0.jar:?] > >> > >> at > >> > >> > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint( > >> ProcessorStateManager.java:325) [kafka-streams-2.2.0.jar:?] > >> > >> at org.apache.kafka.streams.processor.internals.StreamTask.suspend( > >> StreamTask.java:599) [kafka-streams-2.2.0.jar:?] > >> > >> at org.apache.kafka.streams.processor.internals.StreamTask.close( > >> StreamTask.java:721) [kafka-streams-2.2.0.jar:?] > >> > >> at org.apache.kafka.streams.processor.internals.AssignedTasks.close( > >> AssignedTasks.java:337) [kafka-streams-2.2.0.jar:?] > >> > >> at org.apache.kafka.streams.processor.internals.TaskManager.shutdown( > >> TaskManager.java:267) [kafka-streams-2.2.0.jar:?] > >> > >> at > >> > >> > org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown( > >> StreamThread.java:1209) [kafka-streams-2.2.0.jar:?] > >> > >> at org.apache.kafka.streams.processor.internals.StreamThread.run( > >> StreamThread.java:786) [kafka-streams-2.2.0.jar:?] > >> > >> > >> However, I have checked and the folder created starts with: *1_* > >> > >> ls -lha /tmp/kafka-stream/XXX/1_1 > >> total 8 > >> drwxr-xr-x 5 a b 160B 1 Mar 17:18 . > >> drwxr-xr-x 34 a b 1.1K 1 Mar 17:15 .. > >> -rw-r--r-- 1 a b 2.9K 1 Mar 17:18 .checkpoint > >> -rw-r--r-- 1 a b 0B 1 Mar 16:05 .lock > >> drwxr-xr-x 3 a b 96B 1 Mar 16:43 > >> KSTREAM-REDUCE-STATE-STORE-0000000005 > >> > >> > >> > >> Cheers! > >> -- > >> Jonathan > >> > >> > >> > >> On Fri, Mar 1, 2019 at 5:11 PM Jonathan Santilli < > >> jonathansanti...@gmail.com> > >> wrote: > >> > >> > Hello John, hope you are well. > >> > I have tested the version 2.2 release candidate (although I know it > has > >> > been postponed). > >> > I have been following this email thread because I think am > experiencing > >> > the same issue. I have reported in an email to this list and also all > >> the > >> > details are in OS ( > >> > > >> > https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio > >> > ). > >> > > >> > After the test, the result is the same as before (at least for my > case), > >> > already processed records are passed again to the output topic causing > >> the > >> > data duplication: > >> > > >> > ... > >> > 2019-03-01 16:55:23,808 INFO > >> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1] > >> > internals.StoreChangelogReader (StoreChangelogReader.java:221) - > >> > stream-thread > [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1] > >> No > >> > checkpoint found for task 1_10 state store > >> > KTABLE-SUPPRESS-STATE-STORE-0000000011 changelog > >> > XXX-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-10 with EOS > turned > >> on. *Reinitializing > >> > the task and restore its state from the beginning.* > >> > > >> > ... > >> > > >> > > >> > I was hoping for this to be fixed, but is not the case, at least for > my > >> > case. > >> > > >> > If you can, please take a look at the question in SO, I was in contact > >> > with Matthias about it, he also points me the place where probably the > >> > potential but could be present. > >> > > >> > Please, let me know any thoughts. > >> > > >> > > >> > Cheers! > >> > -- > >> > Jonathan > >> > > >> > > >> > On Tue, Feb 26, 2019 at 5:23 PM John Roesler <j...@confluent.io> > wrote: > >> > > >> >> Hi again, Peter, > >> >> > >> >> Just to close the loop about the bug in Suppress, we did get the > >> >> (apparent) > >> >> same report from a few other people: > >> >> https://issues.apache.org/jira/browse/KAFKA-7895 > >> >> > >> >> I also managed to reproduce the duplicate-result behavior, which > could > >> >> cause it to emit both intermediate results and duplicate final > results. > >> >> > >> >> There's a patch for it in the 2.2 release candidate. Perhaps you can > >> try > >> >> it > >> >> out and see if it resolves the issue for you? > >> >> > >> >> I'm backporting the fix to 2.1 as well, but I unfortunately missed > the > >> >> last > >> >> 2.1 bugfix release. > >> >> > >> >> Thanks, > >> >> -John > >> >> > >> >> On Fri, Jan 25, 2019 at 10:23 AM John Roesler <j...@confluent.io> > >> wrote: > >> >> > >> >> > Hi Peter, > >> >> > > >> >> > Thanks for the replies. > >> >> > > >> >> > Regarding transactions: > >> >> > Yes, actually, with EOS enabled, the changelog and the output > topics > >> are > >> >> > all produced with the same transactional producer, within the same > >> >> > transactions. So it should already be atomic. > >> >> > > >> >> > Regarding restore: > >> >> > Streams doesn't put the store into service until the restore is > >> >> completed, > >> >> > so it should be guaranteed not to happen. But there's of course no > >> >> > guarantee that I didn't mess something up. I'll take a hard look at > >> it. > >> >> > > >> >> > Regarding restoration and offsets: > >> >> > Your guess is correct: Streams tracks the latest stored offset > >> outside > >> >> of > >> >> > the store implementation itself, specifically by writing a file > >> (called > >> >> a > >> >> > Checkpoint File) in the state directory. If the file is there, it > >> reads > >> >> > that offset and restores from that point. If the file is missing, > it > >> >> > restores from the beginning of the stream. So it should "just work" > >> for > >> >> > you. Just for completeness, there have been several edge cases > >> >> discovered > >> >> > where this mechanism isn't completely safe, so in the case of EOS, > I > >> >> > believe we actually disregard that checkpoint file and the prior > >> state > >> >> and > >> >> > always rebuild from the earliest offset in the changelog. > >> >> > > >> >> > Personally, I would like to see us provide the ability to store the > >> >> > checkpoint inside the state store, so that checkpoint updates are > >> >> > linearized correctly w.r.t. data updates, but I actually haven't > >> >> mentioned > >> >> > this thought to anyone until now ;) > >> >> > > >> >> > Finally, regarding your prior email: > >> >> > Yes, I was thinking that the "wrong" output values might be part of > >> >> > rolled-back transactions and therefore enabling read-committed mode > >> on > >> >> the > >> >> > consumer might tell a different story that what you've seen to > date. > >> >> > > >> >> > I'm honestly still baffled about those intermediate results that > are > >> >> > sneaking out. I wonder if it's something specific to your data > >> stream, > >> >> like > >> >> > maybe if there is maybe an edge case when two records have exactly > >> the > >> >> same > >> >> > timestamp? I'll have to stare at the code some more... > >> >> > > >> >> > Regardless, in order to reap the benefits of running the app with > >> EOS, > >> >> you > >> >> > really have to also set your consumers to read_committed. > Otherwise, > >> >> you'll > >> >> > be seeing output data from aborted (aka rolled-back) transactions, > >> and > >> >> you > >> >> > miss the intended "exactly once" guarantee. > >> >> > > >> >> > Thanks, > >> >> > -John > >> >> > > >> >> > On Fri, Jan 25, 2019 at 1:51 AM Peter Levart < > peter.lev...@gmail.com > >> > > >> >> > wrote: > >> >> > > >> >> >> Hi John, > >> >> >> > >> >> >> Haven't been able to reinstate the demo yet, but I have been > >> re-reading > >> >> >> the following scenario of yours.... > >> >> >> > >> >> >> On 1/24/19 11:48 PM, Peter Levart wrote: > >> >> >> > Hi John, > >> >> >> > > >> >> >> > On 1/24/19 3:18 PM, John Roesler wrote: > >> >> >> > > >> >> >> >> > >> >> >> >> The reason is that, upon restart, the suppression buffer can > only > >> >> >> >> "remember" what got sent & committed to its changelog topic > >> before. > >> >> >> >> > >> >> >> >> The scenario I have in mind is: > >> >> >> >> > >> >> >> >> ... > >> >> >> >> * buffer state X > >> >> >> >> ... > >> >> >> >> * flush state X to buffer changelog > >> >> >> >> ... > >> >> >> >> * commit transaction T0; start new transaction T1 > >> >> >> >> ... > >> >> >> >> * emit final result X (in uncommitted transaction T1) > >> >> >> >> ... > >> >> >> >> * crash before flushing to the changelog the fact that state X > >> was > >> >> >> >> emitted. > >> >> >> >> Also, transaction T1 gets aborted, since we crash before > >> committing. > >> >> >> >> ... > >> >> >> >> * restart, restoring state X again from the changelog (because > >> the > >> >> emit > >> >> >> >> didn't get committed) > >> >> >> >> * start transaction T2 > >> >> >> >> * emit final result X again (in uncommitted transaction T2) > >> >> >> >> ... > >> >> >> >> * commit transaction T2 > >> >> >> >> ... > >> >> >> >> > >> >> >> >> So, the result gets emitted twice, but the first time is in an > >> >> aborted > >> >> >> >> transaction. This leads me to another clarifying question: > >> >> >> >> > >> >> >> >> Based on your first message, it seems like the duplicates you > >> >> observe > >> >> >> >> are > >> >> >> >> in the output topic. When you read the topic, do you configure > >> your > >> >> >> >> consumer with "read committed" mode? If not, you'll see > "results" > >> >> from > >> >> >> >> uncommitted transactions, which could explain the duplicates. > >> >> >> > >> >> >> ...and I was thinking that perhaps the right solution to the > >> >> suppression > >> >> >> problem would be to use transactional producers for the resulting > >> >> output > >> >> >> topic AND the store change-log. Is this possible? Does the > >> compaction > >> >> of > >> >> >> the log on the brokers work for transactional producers as > >> expected? In > >> >> >> that case, the sending of final result and the marking of that > fact > >> in > >> >> >> the store change log would together be an atomic operation. > >> >> >> That said, I think there's another problem with suppression which > >> looks > >> >> >> like the supression processor is already processing the input > while > >> the > >> >> >> state store has not been fully restored yet or something > related... > >> Is > >> >> >> this guaranteed not to happen? > >> >> >> > >> >> >> And now something unrelated I wanted to ask... > >> >> >> > >> >> >> I'm trying to create my own custom state store. From the API I can > >> see > >> >> >> it is pretty straightforward. One thing that I don't quite > >> understand > >> >> is > >> >> >> how Kafka Streams know whether to replay the whole change log > after > >> the > >> >> >> store registers itself or just a part of it and which part (from > >> which > >> >> >> offset per partition). There doesn't seem to be any API point > >> through > >> >> >> which the store could communicate this information back to Kafka > >> >> >> Streams. Is such bookkeeping performed outside the store? Does > Kafka > >> >> >> Streams first invoke flush() on the store and then notes down the > >> >> >> offsets from the change log producer somewhere? So next time the > >> store > >> >> >> is brought up, the log is only replayed from last noted down > >> offset? So > >> >> >> it can happen that the store gets some log entries that have > already > >> >> >> been incorporated in it (from the point of one flush before) but > >> never > >> >> >> misses any... In any case there has to be an indication somewhere > >> that > >> >> >> the store didn't survive and has to be rebuilt from scratch. How > do > >> >> >> Kafka Streams detect that situation? By placing some marker file > >> into > >> >> >> the directory reserved for store's local storage? > >> >> >> > >> >> >> Regards, Peter > >> >> >> > >> >> >> > >> >> > >> > > >> > > >> > -- > >> > Santilli Jonathan > >> > > >> > >> > >> -- > >> Santilli Jonathan > >> > > > -- Santilli Jonathan