[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16878549#comment-16878549 ]
Patrik Kleindl edited comment on KAFKA-5998 at 7/4/19 10:57 AM: ---------------------------------------------------------------- As this is hopefully coming to an end thanks to [~vvcephei] I'll try a little writeup so others can compare their setup and hopefully verify that the fix covers all situations. I'll also attach the code I used to reproduce the issue, should be runnable for anyone with docker. [^Kafka5998.zip] Setup: We have a streams application with two subtopologies where the first one does not have state and the second one does. State directories are created for all tasks, but as the first subtopology does not have state it does not get locked internally and is therefor subject to the cleanup process. The "trigger" for the cleanup is the absence of any traffic on a topic partition which will allow the directory to be deleted, so this has higher probability on any low-volume topics (in our case, CDC from a reference table with little to no changes). After the directory is deleted, the next record received (and any thereafter) will trigger the warning because Kafka Streams erroneously tries to write a checkpoint because all tasks tried to do this for all restored partitions, not only those assigned to them. This is what Johns PR covers. We have EOS disabled, so it would be good if people could tell if this has happened with EOS too. Or if this warning has occurred for anyone with a different topology situation or on a high volume topic. For reference our topology: {code:java} Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000 (topics: [inputTopic]) --> KSTREAM-FILTER-0000000001 Processor: KSTREAM-FILTER-0000000001 (stores: []) --> KSTREAM-KEY-SELECT-0000000002 <-- KSTREAM-SOURCE-0000000000 Processor: KSTREAM-KEY-SELECT-0000000002 (stores: []) --> KSTREAM-FILTER-0000000005 <-- KSTREAM-FILTER-0000000001 Processor: KSTREAM-FILTER-0000000005 (stores: []) --> KSTREAM-SINK-0000000004 <-- KSTREAM-KEY-SELECT-0000000002 Sink: KSTREAM-SINK-0000000004 (topic: store-repartition) <-- KSTREAM-FILTER-0000000005 Sub-topology: 1 Source: KSTREAM-SOURCE-0000000006 (topics: [store-repartition]) --> KSTREAM-REDUCE-0000000003 Processor: KSTREAM-REDUCE-0000000003 (stores: [store]) --> KTABLE-TOSTREAM-0000000007 <-- KSTREAM-SOURCE-0000000006 Processor: KTABLE-TOSTREAM-0000000007 (stores: []) --> KSTREAM-SINK-0000000008 <-- KSTREAM-REDUCE-0000000003 Sink: KSTREAM-SINK-0000000008 (topic: outputTopic) <-- KTABLE-TOSTREAM-0000000007{code} was (Author: pkleindl): As this is hopefully coming to an end thanks to [~vvcephei] I'll try a little writeup so others can compare their setup and hopefully verify that the fix covers all situations. I'll also attach the code I used to reproduce the issue, should be runnable for anyone with docker. Setup: We have a streams application with two subtopologies where the first one does not have state and the second one does. State directories are created for all tasks, but as the first subtopology does not have state it does not get locked internally and is therefor subject to the cleanup process. The "trigger" for the cleanup is the absence of any traffic on a topic partition which will allow the directory to be deleted, so this has higher probability on any low-volume topics (in our case, CDC from a reference table with little to no changes). After the directory is deleted, the next record received (and any thereafter) will trigger the warning because Kafka Streams erroneously tries to write a checkpoint because all tasks tried to do this for all restored partitions, not only those assigned to them. This is what Johns PR covers. We have EOS disabled, so it would be good if people could tell if this has happened with EOS too. Or if this warning has occurred for anyone with a different topology situation or on a high volume topic. For reference our topology: {code:java} Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000 (topics: [inputTopic]) --> KSTREAM-FILTER-0000000001 Processor: KSTREAM-FILTER-0000000001 (stores: []) --> KSTREAM-KEY-SELECT-0000000002 <-- KSTREAM-SOURCE-0000000000 Processor: KSTREAM-KEY-SELECT-0000000002 (stores: []) --> KSTREAM-FILTER-0000000005 <-- KSTREAM-FILTER-0000000001 Processor: KSTREAM-FILTER-0000000005 (stores: []) --> KSTREAM-SINK-0000000004 <-- KSTREAM-KEY-SELECT-0000000002 Sink: KSTREAM-SINK-0000000004 (topic: store-repartition) <-- KSTREAM-FILTER-0000000005 Sub-topology: 1 Source: KSTREAM-SOURCE-0000000006 (topics: [store-repartition]) --> KSTREAM-REDUCE-0000000003 Processor: KSTREAM-REDUCE-0000000003 (stores: [store]) --> KTABLE-TOSTREAM-0000000007 <-- KSTREAM-SOURCE-0000000006 Processor: KTABLE-TOSTREAM-0000000007 (stores: []) --> KSTREAM-SINK-0000000008 <-- KSTREAM-REDUCE-0000000003 Sink: KSTREAM-SINK-0000000008 (topic: outputTopic) <-- KTABLE-TOSTREAM-0000000007{code} > /.checkpoint.tmp Not found exception > ------------------------------------ > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1 > Reporter: Yogesh BG > Assignee: Bill Bejeck > Priority: Critical > Attachments: 5998.v1.txt, 5998.v2.txt, Kafka5998.zip, Topology.txt, > exc.txt, props.txt, streams.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.<init>(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.<init>(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > 09:43:25.974 [ks_0_inst-StreamThread-15] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.<init>(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.<init>(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > }} -- This message was sent by Atlassian JIRA (v7.6.3#76005)