[ https://issues.apache.org/jira/browse/KAFKA-15572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lucian Ilie updated KAFKA-15572: -------------------------------- Description: We are using a Kafka cluster with Zookeeper, deployed on top of Kubernetes, using banzaicloud/koperator. We have multiple disks per broker. We are using Cruise Control remove disk operation in order to aggregate multiple smaller disks into a single bigger disk. This CC operation is calling Kafka admin with alter replica log dirs operation. During this operation, *the flush operation fails apparently randomly with NoSuchFileException, while alterReplicaLogDirs is executed*. Attached a sample of logs for the exception and the previous operations taking place. Will further detail the cause of this issue. Say we have 3 brokers: * broker 101 with disks /kafka-logs1/kafka, /kafka-logs2/kafka and a bigger disk /new-kafka-logs1/kafka * broker 201 with same disks * broker 301 with same disks When Cruise Control executes a remove disk operation, it calls Kafka "adminClient.alterReplicaLogDirs(replicaAssignment)" with such an assignment as to move all data from /kafka-logs1/kafka and /kafka-logs2/kafka to /new-kafka-logs1/kafka. During the alter log dir operation, future logs are created (to move data from e.g. "/kafka-logs1/kafka/topic-partition" to "/new-kafka-logs1/kafka/topic-partition.hash-future"), data is moved and finally the log dir will be renamed from "/new-kafka-logs1/kafka/topic-partition.hash-future" to "/new-kafka-logs1/kafka/topic-partition". This operation is started in [UnifiedLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L713] and is locked using the [UnifiedLog lock|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268]. The rename is then delegated to [LocalLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111-L113]. This is the 1st code part that is involved in the race condition. Meanwhile, log dirs can be rolled based on known conditions (e.g. getting full), which will call [UnifiedLog.roll|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L1547], which is locked using the [UnifiedLog lock|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268]. However, the further delegation to UnifiedLog.flushUptoOffsetExclusive is not sharing that lock, since it is [done as a scheduled task in a separate thread|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L1547]. This means that further operations are [not locked at UnifiedLog level|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268]. The operation is further delegated to [LocalLog.flush|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177], which will also try to [flush the log dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177]. This is the 2nd code part that is involved in the race condition. Since the log dir flush does not share the lock with the rename dir operation (as it is scheduled via the scheduler), the rename dir operation might succeed in moving the log dir on disk to "topic-partition", but the LocalLog._dir will remain set to "topic-partition.hash-future", and when the flush will attempt to flush the "topic-partition.hash-future" directory, it will throw NoSuchFileException: "topic-partition.hash-future". Basically, [this line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111] might succeed, and before [this other line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111] is executed, flush tries to [flush the future dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177]. We tested a fix with a patch on Kafka 3.4.1, on our clusters and it solved the issue by synchronizing the flush dir operation. Will reply with a link to a PR. Note that this bug replicates for every version since 3.0.0, caused by [this commit|https://github.com/apache/kafka/commit/db3e5e2c0de367ffcfe4078359d6d208ba722581#diff-eeafed82ed6a8600c397b108787fdf31e03191b0a192774a65c127d0d26edc44R2341] when flush dir was added. was: We are using a Kafka cluster with Zookeeper, deployed on top of Kubernetes, using banzaicloud/koperator. We have multiple disks per broker. We are using Cruise Control remove disk operation in order to aggregate multiple smaller disks into a single bigger disk. When we do this, *the flush operation fails apparently randomly with NoSuchFileException, while alterReplicaLogDirs is executed*. Attached a sample of logs for the exception and the previous operations taking place. Will further detail the cause of this issue. Say we have 3 brokers: * broker 101 with disks /kafka-logs1/kafka, /kafka-logs2/kafka and a bigger disk /new-kafka-logs1/kafka * broker 201 with same disks * broker 301 with same disks When Cruise Control executes a remove disk operation, it calls Kafka "adminClient.alterReplicaLogDirs(replicaAssignment)" with such an assignment as to move all data from /kafka-logs1/kafka and /kafka-logs2/kafka to /new-kafka-logs1/kafka. During the alter log dir operation, future logs are created (to move data from e.g. "/kafka-logs1/kafka/topic-partition" to "/new-kafka-logs1/kafka/topic-partition.hash-future"), data is moved and finally the log dir will be renamed from "/new-kafka-logs1/kafka/topic-partition.hash-future" to "/new-kafka-logs1/kafka/topic-partition". This operation is started in [UnifiedLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L713] and is locked using the [UnifiedLog lock|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268]. The rename is then delegated to [LocalLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111-L113]. This is the 1st code part that is involved in the race condition. Meanwhile, log dirs can be rolled based on known conditions (e.g. getting full), which will call [UnifiedLog.roll|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L1547], which is locked using the [UnifiedLog lock|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268]. However, the further delegation to UnifiedLog.flushUptoOffsetExclusive is not sharing that lock, since it is [done as a scheduled task in a separate thread|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L1547]. This means that further operations are [not locked at UnifiedLog level|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268]. The operation is further delegated to [LocalLog.flush|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177], which will also try to [flush the log dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177]. This is the 2nd code part that is involved in the race condition. Since the log dir flush does not share the lock with the rename dir operation (as it is scheduled via the scheduler), the rename dir operation might succeed in moving the log dir on disk to "topic-partition", but the LocalLog._dir will remain set to "topic-partition.hash-future", and when the flush will attempt to flush the "topic-partition.hash-future" directory, it will throw NoSuchFileException: "topic-partition.hash-future". Basically, [this line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111] might succeed, and before [this other line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111] is executed, flush tries to [flush the future dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177]. We tested a fix with a patch on Kafka 3.4.1, on our clusters and it solved the issue by synchronizing the flush dir operation. Will reply with a link to a PR. Note that this bug replicates for every version since 3.0.0, caused by [this commit|https://github.com/apache/kafka/commit/db3e5e2c0de367ffcfe4078359d6d208ba722581#diff-eeafed82ed6a8600c397b108787fdf31e03191b0a192774a65c127d0d26edc44R2341] when flush dir was added. > Race condition between future log dir roll and replace current with future > log during alterReplicaLogDirs > --------------------------------------------------------------------------------------------------------- > > Key: KAFKA-15572 > URL: https://issues.apache.org/jira/browse/KAFKA-15572 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 3.0.0 > Reporter: Lucian Ilie > Priority: Major > Attachments: kafka-alter-log-dir-nosuchfileexception.log > > Original Estimate: 48h > Remaining Estimate: 48h > > We are using a Kafka cluster with Zookeeper, deployed on top of Kubernetes, > using banzaicloud/koperator. > We have multiple disks per broker. > We are using Cruise Control remove disk operation in order to aggregate > multiple smaller disks into a single bigger disk. This CC operation is > calling Kafka admin with alter replica log dirs operation. > During this operation, *the flush operation fails apparently randomly with > NoSuchFileException, while alterReplicaLogDirs is executed*. Attached a > sample of logs for the exception and the previous operations taking place. > Will further detail the cause of this issue. > Say we have 3 brokers: > * broker 101 with disks /kafka-logs1/kafka, /kafka-logs2/kafka and a bigger > disk /new-kafka-logs1/kafka > * broker 201 with same disks > * broker 301 with same disks > When Cruise Control executes a remove disk operation, it calls Kafka > "adminClient.alterReplicaLogDirs(replicaAssignment)" with such an assignment > as to move all data from /kafka-logs1/kafka and /kafka-logs2/kafka to > /new-kafka-logs1/kafka. > During the alter log dir operation, future logs are created (to move data > from e.g. "/kafka-logs1/kafka/topic-partition" to > "/new-kafka-logs1/kafka/topic-partition.hash-future"), data is moved and > finally the log dir will be renamed from > "/new-kafka-logs1/kafka/topic-partition.hash-future" to > "/new-kafka-logs1/kafka/topic-partition". This operation is started in > [UnifiedLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L713] > and is locked using the [UnifiedLog > lock|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268]. > The rename is then delegated to > [LocalLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111-L113]. > This is the 1st code part that is involved in the race condition. > Meanwhile, log dirs can be rolled based on known conditions (e.g. getting > full), which will call > [UnifiedLog.roll|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L1547], > which is locked using the [UnifiedLog > lock|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268]. > However, the further delegation to UnifiedLog.flushUptoOffsetExclusive is > not sharing that lock, since it is [done as a scheduled task in a separate > thread|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L1547]. > This means that further operations are [not locked at UnifiedLog > level|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268]. > The operation is further delegated to > [LocalLog.flush|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177], > which will also try to [flush the log > dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177]. > This is the 2nd code part that is involved in the race condition. > Since the log dir flush does not share the lock with the rename dir operation > (as it is scheduled via the scheduler), the rename dir operation might > succeed in moving the log dir on disk to "topic-partition", but the > LocalLog._dir will remain set to "topic-partition.hash-future", and when the > flush will attempt to flush the "topic-partition.hash-future" directory, it > will throw NoSuchFileException: "topic-partition.hash-future". Basically, > [this > line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111] > might succeed, and before [this other > line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111] > is executed, flush tries to [flush the future > dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177]. > We tested a fix with a patch on Kafka 3.4.1, on our clusters and it solved > the issue by synchronizing the flush dir operation. Will reply with a link to > a PR. > Note that this bug replicates for every version since 3.0.0, caused by [this > commit|https://github.com/apache/kafka/commit/db3e5e2c0de367ffcfe4078359d6d208ba722581#diff-eeafed82ed6a8600c397b108787fdf31e03191b0a192774a65c127d0d26edc44R2341] > when flush dir was added. -- This message was sent by Atlassian Jira (v8.20.10#820010)