Ilyas Toumlilt created KAFKA-19571:
--------------------------------------
Summary: Race condition between log segment flush and file
deletion causing log dir to go offline
Key: KAFKA-19571
URL: https://issues.apache.org/jira/browse/KAFKA-19571
Project: Kafka
Issue Type: Bug
Components: core, log
Affects Versions: 3.7.1
Reporter: Ilyas Toumlilt
h1. Context
We are using Kafka v3.7.1 with Zookeeper, our brokers are configured with
multiple disks in a JBOD setup, routine intra-broker data rebalancing is
performed using Cruise Control to manage disk utilization. During these
rebalance operations, a race condition between a log segment flush operation
and the file deletion that is part of the replica's directory move. This race
leads to a `NoSuchFileException` when the flush operation targets a file path
that has just been deleted by the rebalance process. This exception incorrectly
forces the broker to take the entire log directory offline.
h1. Logs / Stack trace
{code:java}
2025-07-23 19:03:30,114 WARN Failed to flush file
/var/lib/kafka-08/topic_01-12/00000000024420850595.snapshot (org.apache.kafka.
common.utils.Utils)
java.nio.file.NoSuchFileException:
/var/lib/kafka-08/topic_01-12/00000000024420850595.snapshot
at
java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
at
java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
at
java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
at
java.base/sun.nio.fs.UnixFileSystemProvider.newFileChannel(UnixFileSystemProvider.java:182)
at java.base/java.nio.channels.FileChannel.open(FileChannel.java:292)
at java.base/java.nio.channels.FileChannel.open(FileChannel.java:345)
at
org.apache.kafka.common.utils.Utils.flushFileIfExists(Utils.java:1029)
at
kafka.log.UnifiedLog.$anonfun$flushProducerStateSnapshot$2(UnifiedLog.scala:1766)
at
kafka.log.UnifiedLog.flushProducerStateSnapshot(UnifiedLog.scala:1915)
at kafka.log.UnifiedLog.$anonfun$roll$2(UnifiedLog.scala:1679)
at java.base/java.util.Optional.ifPresent(Optional.java:183)
at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1679)
at
org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
2025-07-23 19:03:30,114 ERROR Error while flushing log for topic_01-12 in dir
/var/lib/kafka-08 with offset 24420850595 (exclusi
ve) and recovery point 24420850595
(org.apache.kafka.storage.internals.log.LogDirFailureChannel)
java.nio.channels.ClosedChannelException
at
java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
at java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452)
at
org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:197)
at
org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:631)
at
org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:627)
at com.yammer.metrics.core.Timer.time(Timer.java:91)
at
org.apache.kafka.storage.internals.log.LogSegment.flush(LogSegment.java:627)
at kafka.log.LocalLog.$anonfun$flush$1(LocalLog.scala:176)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at kafka.log.LocalLog.flush(LocalLog.scala:176)
at kafka.log.UnifiedLog.$anonfun$flush$2(UnifiedLog.scala:1719)
at kafka.log.UnifiedLog.flush(UnifiedLog.scala:1915)
at kafka.log.UnifiedLog.flushUptoOffsetExclusive(UnifiedLog.scala:1700)
at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1680)
at
org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
2025-07-23 19:03:30,115 ERROR Uncaught exception in scheduled task 'flush-log'
(org.apache.kafka.server.util.KafkaScheduler)
org.apache.kafka.common.errors.KafkaStorageException: Error while flushing log
for topic_01-12 in dir /var/lib/kafka-08 with off
set 24420850595 (exclusive) and recovery point 24420850595
Caused by: java.nio.channels.ClosedChannelException
at
java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
at java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452)
at
org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:197)
at
org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:631)
at
org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:627)
at com.yammer.metrics.core.Timer.time(Timer.java:91)
at
org.apache.kafka.storage.internals.log.LogSegment.flush(LogSegment.java:627)
at kafka.log.LocalLog.$anonfun$flush$1(LocalLog.scala:176)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at kafka.log.LocalLog.flush(LocalLog.scala:176)
at kafka.log.UnifiedLog.$anonfun$flush$2(UnifiedLog.scala:1719)
at kafka.log.UnifiedLog.flush(UnifiedLog.scala:1915)
at kafka.log.UnifiedLog.flushUptoOffsetExclusive(UnifiedLog.scala:1700)
at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1680)
at
org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
2025-07-23 19:03:30,117 WARN [ReplicaManager broker=32] Stopping serving
replicas in dir /var/lib/kafka-08 (kafka.server.ReplicaManager) {code}
Stack Trace Analysis
The failure begins with a benign `{{{}WARN`{}}} when a scheduled task tries to
flush a producer state snapshot that was moved during a disk rebalance; this
{{`NoSuchFileException`}} is anticipated and handled gracefully by the code.
However, the same task then attempts to flush the actual log segment, which
fails with a critical, unhandled `{{{}ClosedChannelException{}}}` because the
file handles were invalidated by the directory's move. This unhandled I/O error
propagates up and terminates the background task, causing the
`{{{}KafkaScheduler{}}}` to log it as an uncaught
{{`}}{{{}KafkaStorageException`{}}}. As a direct consequence, the
`{{{}ReplicaManager{}}}` detects this fatal storage error and triggers its
safety mechanism, taking the entire log directory offline to prevent potential
data corruption.
h1. Expected Behavior
A {{`NoSuchFileException`}} in this context should not cause the entire log
directory to be marked as offline.
h1. Workaround
The current workaround is to manually restart the affected Kafka broker. The
restart clears the in-memory state, and upon re-scanning the log directories,
the broker marks the disk as healthy again.
h1. Proposed fix
The proposed solution is to address the race condition at the lowest possible
level: the *{{LogSegment.flush()}}* method. The goal is to catch the
{{ClosedChannelException}} that occurs during the race and intelligently
differentiate it from a legitimate I/O error.
The fix should be implemented within the {{catch}} block for
{{ClosedChannelException}} in {{{}LogSegment.java{}}}. The logic would be as
follows:
# When a {{ClosedChannelException}} is caught, perform a check to see if the
underlying log segment file still exists ({{{}log.file().exists(){}}}).
# {*}If the file does not exist{*}, it confirms our race condition hypothesis:
the replica has been moved or deleted by a rebalance operation. The exception
is benign and should be ignored, with a {{WARN}} message logged for visibility.
# {*}If the file does still exist{*}, the {{ClosedChannelException}} is
unexpected and could signal a real hardware or filesystem issue. In this case,
the exception should be re-thrown to trigger Kafka's standard log directory
failure-handling mechanism.
This targeted fix would resolve the bug by gracefully handling the known race
condition without masking other potentially critical storage errors.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)