[ https://issues.apache.org/jira/browse/KAFKA-6767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16562211#comment-16562211 ]
Guozhang Wang commented on KAFKA-6767: -------------------------------------- More specifically: In https://issues.apache.org/jira/browse/KAFKA-6499 we have a related fix trying to avoid writing a checkpoint file anymore. BUT in https://issues.apache.org/jira/browse/KAFKA-6767 we still see people reporting similar issues on 1.1.0, I've looked into the source code of 1.1 but cannot find obvious bugs that could result in checkpointable offsets to be non-empty, i.e. we will still try to write the checkpoint file. > OffsetCheckpoint write assumes parent directory exists > ------------------------------------------------------ > > Key: KAFKA-6767 > URL: https://issues.apache.org/jira/browse/KAFKA-6767 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.1.0 > Reporter: Steven Schlansker > Priority: Minor > > We run Kafka Streams with RocksDB state stores on ephemeral disks (i.e. if an > instance dies it is created from scratch, rather than reusing the existing > RocksDB.) > We routinely see: > {code:java} > 2018-04-09T19:14:35.004Z WARN <> > [chat-0319e3c3-d8b2-4c60-bd69-a8484d8d4435-StreamThread-1] > o.a.k.s.p.i.ProcessorStateManager - task [0_11] Failed to write offset > checkpoint file to /mnt/mesos/sandbox/storage/chat/0_11/.checkpoint: {} > java.io.FileNotFoundException: > /mnt/mesos/sandbox/storage/chat/0_11/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open0(Native Method) > at java.io.FileOutputStream.open(FileOutputStream.java:270) > at java.io.FileOutputStream.<init>(FileOutputStream.java:213) > at java.io.FileOutputStream.<init>(FileOutputStream.java:162) > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:78) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:320) > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:314) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:297) > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:357) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:347) > at > org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:403) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:994) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:811) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720){code} > Inspecting the state store directory, I can indeed see that {{chat/0_11}} > does not exist (although many other partitions do). > > Looking at the OffsetCheckpoint write method, it seems to try to open a new > checkpoint file without first ensuring that the parent directory exists. > > {code:java} > public void write(final Map<TopicPartition, Long> offsets) throws > IOException { > // if there is no offsets, skip writing the file to save disk IOs > if (offsets.isEmpty()) { > return; > } > synchronized (lock) { > // write to temp file and then swap with the existing file > final File temp = new File(file.getAbsolutePath() + ".tmp");{code} > > Either the OffsetCheckpoint class should initialize the directories if > needed, or some precondition of it being called should ensure that is the > case. -- This message was sent by Atlassian JIRA (v7.6.3#76005)