Sure John, I will document it.

Thanks a lot for your reply.
--
Jonathan



On Tue, Mar 5, 2019 at 7:38 PM John Roesler <j...@confluent.io> wrote:

> Hi Jonathan,
>
> Just a quick update: I have not been able to reproduce the duplicates issue
> with the 2.2 RC, even with a topology very similar to the one you included
> in your stackoverflow post.
>
> I think we should treat this as a new bug. Would you mind opening a new
> Jira bug ticket with some steps to reproduce the problem, and also exactly
> the behavior you observe?
>
> Thanks,
> -John
>
> On Mon, Mar 4, 2019 at 10:41 PM John Roesler <j...@confluent.io> wrote:
>
> > Hi Jonathan,
> >
> > Sorry to hear that the feature is causing you trouble as well, and that
> > the 2.2 release candidate didn't seem to fix it.
> >
> > I'll try and do a repro based on the code in your SO post tomorrow.
> >
> > I don't think it's related to the duplicates, but that shutdown error is
> > puzzling. Can you print the topology (with topology.describe() ) ? This
> > will tell us what is in task 1 (i.e., *1_*) of your program.
> >
> > Thanks,
> > -John
> >
> > On Fri, Mar 1, 2019 at 11:33 AM Jonathan Santilli <
> > jonathansanti...@gmail.com> wrote:
> >
> >> BTW, after stopping the app gracefully (Stream#close()), this error
> shows
> >> up repeatedly:
> >>
> >> 2019-03-01 17:18:07,819 WARN
> >> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
> >> internals.ProcessorStateManager (ProcessorStateManager.java:327) - task
> >> [0_0] Failed to write offset checkpoint file to
> >> [/tmp/kafka-stream/XXX/0_0/.checkpoint]
> >>
> >> java.io.FileNotFoundException: /tmp/kafka-stream/XXX/0_0/.checkpoint.tmp
> >> (No such file or directory)
> >>
> >> at java.io.FileOutputStream.open0(Native Method) ~[?:1.8.0_191]
> >>
> >> at java.io.FileOutputStream.open(FileOutputStream.java:270)
> ~[?:1.8.0_191]
> >>
> >> at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
> >> ~[?:1.8.0_191]
> >>
> >> at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
> >> ~[?:1.8.0_191]
> >>
> >> at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(
> >> OffsetCheckpoint.java:79) ~[kafka-streams-2.2.0.jar:?]
> >>
> >> at
> >>
> >>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(
> >> ProcessorStateManager.java:325) [kafka-streams-2.2.0.jar:?]
> >>
> >> at org.apache.kafka.streams.processor.internals.StreamTask.suspend(
> >> StreamTask.java:599) [kafka-streams-2.2.0.jar:?]
> >>
> >> at org.apache.kafka.streams.processor.internals.StreamTask.close(
> >> StreamTask.java:721) [kafka-streams-2.2.0.jar:?]
> >>
> >> at org.apache.kafka.streams.processor.internals.AssignedTasks.close(
> >> AssignedTasks.java:337) [kafka-streams-2.2.0.jar:?]
> >>
> >> at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(
> >> TaskManager.java:267) [kafka-streams-2.2.0.jar:?]
> >>
> >> at
> >>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(
> >> StreamThread.java:1209) [kafka-streams-2.2.0.jar:?]
> >>
> >> at org.apache.kafka.streams.processor.internals.StreamThread.run(
> >> StreamThread.java:786) [kafka-streams-2.2.0.jar:?]
> >>
> >>
> >> However, I have checked and the folder created starts with: *1_*
> >>
> >> ls -lha /tmp/kafka-stream/XXX/1_1
> >> total 8
> >> drwxr-xr-x   5 a  b   160B  1 Mar 17:18 .
> >> drwxr-xr-x  34 a  b   1.1K  1 Mar 17:15 ..
> >> -rw-r--r--   1 a  b   2.9K  1 Mar 17:18 .checkpoint
> >> -rw-r--r--   1 a  b     0B  1 Mar 16:05 .lock
> >> drwxr-xr-x   3 a  b    96B  1 Mar 16:43
> >> KSTREAM-REDUCE-STATE-STORE-0000000005
> >>
> >>
> >>
> >> Cheers!
> >> --
> >> Jonathan
> >>
> >>
> >>
> >> On Fri, Mar 1, 2019 at 5:11 PM Jonathan Santilli <
> >> jonathansanti...@gmail.com>
> >> wrote:
> >>
> >> > Hello John, hope you are well.
> >> > I have tested the version 2.2 release candidate (although I know it
> has
> >> > been postponed).
> >> > I have been following this email thread because I think am
> experiencing
> >> > the same issue. I have reported in an email to this list and also all
> >> the
> >> > details are in OS (
> >> >
> >>
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
> >> > ).
> >> >
> >> > After the test, the result is the same as before (at least for my
> case),
> >> > already processed records are passed again to the output topic causing
> >> the
> >> > data duplication:
> >> >
> >> > ...
> >> > 2019-03-01 16:55:23,808 INFO
> >> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
> >> > internals.StoreChangelogReader (StoreChangelogReader.java:221) -
> >> > stream-thread
> [XXX-116ba7c8-678e-47f7-9074-7d03627b1e1a-StreamThread-1]
> >> No
> >> > checkpoint found for task 1_10 state store
> >> > KTABLE-SUPPRESS-STATE-STORE-0000000011 changelog
> >> > XXX-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-10 with EOS
> turned
> >> on. *Reinitializing
> >> > the task and restore its state from the beginning.*
> >> >
> >> > ...
> >> >
> >> >
> >> > I was hoping for this to be fixed, but is not the case, at least for
> my
> >> > case.
> >> >
> >> > If you can, please take a look at the question in SO, I was in contact
> >> > with Matthias about it, he also points me the place where probably the
> >> > potential but could be present.
> >> >
> >> > Please, let me know any thoughts.
> >> >
> >> >
> >> > Cheers!
> >> > --
> >> > Jonathan
> >> >
> >> >
> >> > On Tue, Feb 26, 2019 at 5:23 PM John Roesler <j...@confluent.io>
> wrote:
> >> >
> >> >> Hi again, Peter,
> >> >>
> >> >> Just to close the loop about the bug in Suppress, we did get the
> >> >> (apparent)
> >> >> same report from a few other people:
> >> >> https://issues.apache.org/jira/browse/KAFKA-7895
> >> >>
> >> >> I also managed to reproduce the duplicate-result behavior, which
> could
> >> >> cause it to emit both intermediate results and duplicate final
> results.
> >> >>
> >> >> There's a patch for it in the 2.2 release candidate. Perhaps you can
> >> try
> >> >> it
> >> >> out and see if it resolves the issue for you?
> >> >>
> >> >> I'm backporting the fix to 2.1 as well, but I unfortunately missed
> the
> >> >> last
> >> >> 2.1 bugfix release.
> >> >>
> >> >> Thanks,
> >> >> -John
> >> >>
> >> >> On Fri, Jan 25, 2019 at 10:23 AM John Roesler <j...@confluent.io>
> >> wrote:
> >> >>
> >> >> > Hi Peter,
> >> >> >
> >> >> > Thanks for the replies.
> >> >> >
> >> >> > Regarding transactions:
> >> >> > Yes, actually, with EOS enabled, the changelog and the output
> topics
> >> are
> >> >> > all produced with the same transactional producer, within the same
> >> >> > transactions. So it should already be atomic.
> >> >> >
> >> >> > Regarding restore:
> >> >> > Streams doesn't put the store into service until the restore is
> >> >> completed,
> >> >> > so it should be guaranteed not to happen. But there's of course no
> >> >> > guarantee that I didn't mess something up. I'll take a hard look at
> >> it.
> >> >> >
> >> >> > Regarding restoration and offsets:
> >> >> > Your guess is correct: Streams tracks the latest stored offset
> >> outside
> >> >> of
> >> >> > the store implementation itself, specifically by writing a file
> >> (called
> >> >> a
> >> >> > Checkpoint File) in the state directory. If the file is there, it
> >> reads
> >> >> > that offset and restores from that point. If the file is missing,
> it
> >> >> > restores from the beginning of the stream. So it should "just work"
> >> for
> >> >> > you. Just for completeness, there have been several edge cases
> >> >> discovered
> >> >> > where this mechanism isn't completely safe, so in the case of EOS,
> I
> >> >> > believe we actually disregard that checkpoint file and the prior
> >> state
> >> >> and
> >> >> > always rebuild from the earliest offset in the changelog.
> >> >> >
> >> >> > Personally, I would like to see us provide the ability to store the
> >> >> > checkpoint inside the state store, so that checkpoint updates are
> >> >> > linearized correctly w.r.t. data updates, but I actually haven't
> >> >> mentioned
> >> >> > this thought to anyone until now ;)
> >> >> >
> >> >> > Finally, regarding your prior email:
> >> >> > Yes, I was thinking that the "wrong" output values might be part of
> >> >> > rolled-back transactions and therefore enabling read-committed mode
> >> on
> >> >> the
> >> >> > consumer might tell a different story that what you've seen to
> date.
> >> >> >
> >> >> > I'm honestly still baffled about those intermediate results that
> are
> >> >> > sneaking out. I wonder if it's something specific to your data
> >> stream,
> >> >> like
> >> >> > maybe if there is maybe an edge case when two records have exactly
> >> the
> >> >> same
> >> >> > timestamp? I'll have to stare at the code some more...
> >> >> >
> >> >> > Regardless, in order to reap the benefits of running the app with
> >> EOS,
> >> >> you
> >> >> > really have to also set your consumers to read_committed.
> Otherwise,
> >> >> you'll
> >> >> > be seeing output data from aborted (aka rolled-back) transactions,
> >> and
> >> >> you
> >> >> > miss the intended "exactly once" guarantee.
> >> >> >
> >> >> > Thanks,
> >> >> > -John
> >> >> >
> >> >> > On Fri, Jan 25, 2019 at 1:51 AM Peter Levart <
> peter.lev...@gmail.com
> >> >
> >> >> > wrote:
> >> >> >
> >> >> >> Hi John,
> >> >> >>
> >> >> >> Haven't been able to reinstate the demo yet, but I have been
> >> re-reading
> >> >> >> the following scenario of yours....
> >> >> >>
> >> >> >> On 1/24/19 11:48 PM, Peter Levart wrote:
> >> >> >> > Hi John,
> >> >> >> >
> >> >> >> > On 1/24/19 3:18 PM, John Roesler wrote:
> >> >> >> >
> >> >> >> >>
> >> >> >> >> The reason is that, upon restart, the suppression buffer can
> only
> >> >> >> >> "remember" what got sent & committed to its changelog topic
> >> before.
> >> >> >> >>
> >> >> >> >> The scenario I have in mind is:
> >> >> >> >>
> >> >> >> >> ...
> >> >> >> >> * buffer state X
> >> >> >> >> ...
> >> >> >> >> * flush state X to buffer changelog
> >> >> >> >> ...
> >> >> >> >> * commit transaction T0; start new transaction T1
> >> >> >> >> ...
> >> >> >> >> * emit final result X (in uncommitted transaction T1)
> >> >> >> >> ...
> >> >> >> >> * crash before flushing to the changelog the fact that state X
> >> was
> >> >> >> >> emitted.
> >> >> >> >> Also, transaction T1 gets aborted, since we crash before
> >> committing.
> >> >> >> >> ...
> >> >> >> >> * restart, restoring state X again from the changelog (because
> >> the
> >> >> emit
> >> >> >> >> didn't get committed)
> >> >> >> >> * start transaction T2
> >> >> >> >> * emit final result X again (in uncommitted transaction T2)
> >> >> >> >> ...
> >> >> >> >> * commit transaction T2
> >> >> >> >> ...
> >> >> >> >>
> >> >> >> >> So, the result gets emitted twice, but the first time is in an
> >> >> aborted
> >> >> >> >> transaction. This leads me to another clarifying question:
> >> >> >> >>
> >> >> >> >> Based on your first message, it seems like the duplicates you
> >> >> observe
> >> >> >> >> are
> >> >> >> >> in the output topic. When you read the topic, do you configure
> >> your
> >> >> >> >> consumer with "read committed" mode? If not, you'll see
> "results"
> >> >> from
> >> >> >> >> uncommitted transactions, which could explain the duplicates.
> >> >> >>
> >> >> >> ...and I was thinking that perhaps the right solution to the
> >> >> suppression
> >> >> >> problem would be to use transactional producers for the resulting
> >> >> output
> >> >> >> topic AND the store change-log. Is this possible? Does the
> >> compaction
> >> >> of
> >> >> >> the log on the brokers work for transactional producers as
> >> expected? In
> >> >> >> that case, the sending of final result and the marking of that
> fact
> >> in
> >> >> >> the store change log would together be an atomic operation.
> >> >> >> That said, I think there's another problem with suppression which
> >> looks
> >> >> >> like the supression processor is already processing the input
> while
> >> the
> >> >> >> state store has not been fully restored yet or something
> related...
> >> Is
> >> >> >> this guaranteed not to happen?
> >> >> >>
> >> >> >> And now something unrelated I wanted to ask...
> >> >> >>
> >> >> >> I'm trying to create my own custom state store. From the API I can
> >> see
> >> >> >> it is pretty straightforward. One thing that I don't quite
> >> understand
> >> >> is
> >> >> >> how Kafka Streams know whether to replay the whole change log
> after
> >> the
> >> >> >> store registers itself or just a part of it and which part (from
> >> which
> >> >> >> offset per partition). There doesn't seem to be any API point
> >> through
> >> >> >> which the store could communicate this information back to Kafka
> >> >> >> Streams. Is such bookkeeping performed outside the store? Does
> Kafka
> >> >> >> Streams first invoke flush() on the store and then notes down the
> >> >> >> offsets from the change log producer somewhere? So next time the
> >> store
> >> >> >> is brought up, the log is only replayed from last noted down
> >> offset? So
> >> >> >> it can happen that the store gets some log entries that have
> already
> >> >> >> been incorporated in it (from the point of one flush before) but
> >> never
> >> >> >> misses any... In any case there has to be an indication somewhere
> >> that
> >> >> >> the store didn't survive and has to be rebuilt from scratch. How
> do
> >> >> >> Kafka Streams detect that situation? By placing some marker file
> >> into
> >> >> >> the directory reserved for store's local storage?
> >> >> >>
> >> >> >> Regards, Peter
> >> >> >>
> >> >> >>
> >> >>
> >> >
> >> >
> >> > --
> >> > Santilli Jonathan
> >> >
> >>
> >>
> >> --
> >> Santilli Jonathan
> >>
> >
>


-- 
Santilli Jonathan

Reply via email to