[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16878549#comment-16878549
 ] 

Patrik Kleindl edited comment on KAFKA-5998 at 7/4/19 10:57 AM:
----------------------------------------------------------------

As this is hopefully coming to an end thanks to [~vvcephei] I'll try a little 
writeup so others can compare their setup and hopefully verify that the fix 
covers all situations.

I'll also attach the code I used to reproduce the issue, should be runnable for 
anyone with docker. [^Kafka5998.zip]

Setup:

We have a streams application with two subtopologies where the first one does 
not have state and the second one does.

State directories are created for all tasks, but as the first subtopology does 
not have state it does not get locked internally and is therefor subject to the 
cleanup process.

The "trigger" for the cleanup is the absence of any traffic on a topic 
partition which will allow the directory to be deleted, so this has higher 
probability on any low-volume topics (in our case, CDC from a reference table 
with little to no changes).

After the directory is deleted, the next record received (and any thereafter) 
will trigger the warning because Kafka Streams erroneously tries to write a 
checkpoint because all tasks tried to do this for all restored partitions, not 
only those assigned to them. This is what Johns PR covers.

We have EOS disabled, so it would be good if people could tell if this has 
happened with EOS too.

Or if this warning has occurred for anyone with a different topology situation 
or on a high volume topic.

For reference our topology:
{code:java}
Topologies:
 Sub-topology: 0
 Source: KSTREAM-SOURCE-0000000000 (topics: [inputTopic])
 --> KSTREAM-FILTER-0000000001
 Processor: KSTREAM-FILTER-0000000001 (stores: [])
 --> KSTREAM-KEY-SELECT-0000000002
 <-- KSTREAM-SOURCE-0000000000
 Processor: KSTREAM-KEY-SELECT-0000000002 (stores: [])
 --> KSTREAM-FILTER-0000000005
 <-- KSTREAM-FILTER-0000000001
 Processor: KSTREAM-FILTER-0000000005 (stores: [])
 --> KSTREAM-SINK-0000000004
 <-- KSTREAM-KEY-SELECT-0000000002
 Sink: KSTREAM-SINK-0000000004 (topic: store-repartition)
 <-- KSTREAM-FILTER-0000000005

 Sub-topology: 1
 Source: KSTREAM-SOURCE-0000000006 (topics: [store-repartition])
 --> KSTREAM-REDUCE-0000000003
 Processor: KSTREAM-REDUCE-0000000003 (stores: [store])
 --> KTABLE-TOSTREAM-0000000007
 <-- KSTREAM-SOURCE-0000000006
 Processor: KTABLE-TOSTREAM-0000000007 (stores: [])
 --> KSTREAM-SINK-0000000008
 <-- KSTREAM-REDUCE-0000000003
 Sink: KSTREAM-SINK-0000000008 (topic: outputTopic)
 <-- KTABLE-TOSTREAM-0000000007{code}
 


was (Author: pkleindl):
As this is hopefully coming to an end thanks to [~vvcephei] I'll try a little 
writeup so others can compare their setup and hopefully verify that the fix 
covers all situations.

I'll also attach the code I used to reproduce the issue, should be runnable for 
anyone with docker.

Setup:

We have a streams application with two subtopologies where the first one does 
not have state and the second one does.

State directories are created for all tasks, but as the first subtopology does 
not have state it does not get locked internally and is therefor subject to the 
cleanup process.

The "trigger" for the cleanup is the absence of any traffic on a topic 
partition which will allow the directory to be deleted, so this has higher 
probability on any low-volume topics (in our case, CDC from a reference table 
with little to no changes).

After the directory is deleted, the next record received (and any thereafter) 
will trigger the warning because Kafka Streams erroneously tries to write a 
checkpoint because all tasks tried to do this for all restored partitions, not 
only those assigned to them. This is what Johns PR covers.

We have EOS disabled, so it would be good if people could tell if this has 
happened with EOS too.

Or if this warning has occurred for anyone with a different topology situation 
or on a high volume topic.

For reference our topology:
{code:java}
Topologies:
 Sub-topology: 0
 Source: KSTREAM-SOURCE-0000000000 (topics: [inputTopic])
 --> KSTREAM-FILTER-0000000001
 Processor: KSTREAM-FILTER-0000000001 (stores: [])
 --> KSTREAM-KEY-SELECT-0000000002
 <-- KSTREAM-SOURCE-0000000000
 Processor: KSTREAM-KEY-SELECT-0000000002 (stores: [])
 --> KSTREAM-FILTER-0000000005
 <-- KSTREAM-FILTER-0000000001
 Processor: KSTREAM-FILTER-0000000005 (stores: [])
 --> KSTREAM-SINK-0000000004
 <-- KSTREAM-KEY-SELECT-0000000002
 Sink: KSTREAM-SINK-0000000004 (topic: store-repartition)
 <-- KSTREAM-FILTER-0000000005

 Sub-topology: 1
 Source: KSTREAM-SOURCE-0000000006 (topics: [store-repartition])
 --> KSTREAM-REDUCE-0000000003
 Processor: KSTREAM-REDUCE-0000000003 (stores: [store])
 --> KTABLE-TOSTREAM-0000000007
 <-- KSTREAM-SOURCE-0000000006
 Processor: KTABLE-TOSTREAM-0000000007 (stores: [])
 --> KSTREAM-SINK-0000000008
 <-- KSTREAM-REDUCE-0000000003
 Sink: KSTREAM-SINK-0000000008 (topic: outputTopic)
 <-- KTABLE-TOSTREAM-0000000007{code}
 

> /.checkpoint.tmp Not found exception
> ------------------------------------
>
>                 Key: KAFKA-5998
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5998
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>            Reporter: Yogesh BG
>            Assignee: Bill Bejeck
>            Priority: Critical
>         Attachments: 5998.v1.txt, 5998.v2.txt, Kafka5998.zip, Topology.txt, 
> exc.txt, props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
>         at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
>         at java.io.FileOutputStream.<init>(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
>         at java.io.FileOutputStream.<init>(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
>         at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
>         at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
>         at java.io.FileOutputStream.<init>(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
>         at java.io.FileOutputStream.<init>(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
>         at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> }}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to