Steven Schlansker created KAFKA-6767:
----------------------------------------
Summary: OffsetCheckpoint write assumes parent directory exists
Key: KAFKA-6767
URL: https://issues.apache.org/jira/browse/KAFKA-6767
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 1.1.0
Reporter: Steven Schlansker
We run Kafka Streams with RocksDB state stores on ephemeral disks (i.e. if an
instance dies it is created from scratch, rather than reusing the existing
RocksDB.)
We routinely see:
{code:java}
2018-04-09T19:14:35.004Z WARN <>
[chat-0319e3c3-d8b2-4c60-bd69-a8484d8d4435-StreamThread-1]
o.a.k.s.p.i.ProcessorStateManager - task [0_11] Failed to write offset
checkpoint file to /mnt/mesos/sandbox/storage/chat/0_11/.checkpoint: {}
java.io.FileNotFoundException:
/mnt/mesos/sandbox/storage/chat/0_11/.checkpoint.tmp (No such file or directory)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(FileOutputStream.java:270)
at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
at
org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:78)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:320)
at
org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:314)
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307)
at
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:297)
at
org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
at
org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:357)
at
org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:347)
at
org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:403)
at
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:994)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:811)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720){code}
Inspecting the state store directory, I can indeed see that {{chat/0_11}} does
not exist (although many other partitions do).
Looking at the OffsetCheckpoint write method, it seems to try to open a new
checkpoint file without first ensuring that the parent directory exists.
{code:java}
public void write(final Map<TopicPartition, Long> offsets) throws
IOException {
// if there is no offsets, skip writing the file to save disk IOs
if (offsets.isEmpty()) {
return;
}
synchronized (lock) {
// write to temp file and then swap with the existing file
final File temp = new File(file.getAbsolutePath() + ".tmp");{code}
Either the OffsetCheckpoint class should initialize the directories if needed,
or some precondition of it being called should ensure that is the case.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)