[ 
https://issues.apache.org/jira/browse/KAFKA-15572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucian Ilie updated KAFKA-15572:
--------------------------------
    Description: 
We are using a Kafka cluster with Zookeeper, deployed on top of Kubernetes, 
using banzaicloud/koperator.

We have multiple disks per broker.
We are using Cruise Control remove disk operation in order to aggregate 
multiple smaller disks into a single bigger disk. This CC operation is calling 
Kafka admin with alter replica log dirs operation.

During this operation, *the flush operation fails apparently randomly with 
NoSuchFileException, while alterReplicaLogDirs is executed*. Attached a sample 
of logs for the exception and the previous operations taking place.

Will further detail the cause of this issue.

Say we have 3 brokers:
 * broker 101 with disks /kafka-logs1/kafka, /kafka-logs2/kafka and a bigger 
disk /new-kafka-logs1/kafka
 * broker 201 with same disks
 * broker 301 with same disks

When Cruise Control executes a remove disk operation, it calls Kafka 
"adminClient.alterReplicaLogDirs(replicaAssignment)" with such an assignment as 
to move all data from /kafka-logs1/kafka and /kafka-logs2/kafka to 
/new-kafka-logs1/kafka.

During the alter log dir operation, future logs are created (to move data from 
e.g. "/kafka-logs1/kafka/topic-partition" to 
"/new-kafka-logs1/kafka/topic-partition.hash-future"), data is moved and 
finally the log dir will be renamed from 
"/new-kafka-logs1/kafka/topic-partition.hash-future" to 
"/new-kafka-logs1/kafka/topic-partition". This operation is started in 
[UnifiedLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L713]
 and is locked using the [UnifiedLog 
lock|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268].
 The rename is then delegated to 
[LocalLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111-L113].
 This is the 1st code part that is involved in the race condition.

Meanwhile, log dirs can be rolled based on known conditions (e.g. getting 
full), which will call 
[UnifiedLog.roll|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L1547],
 which is locked using the [UnifiedLog 
lock|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268].
 However, the further delegation to UnifiedLog.flushUptoOffsetExclusive is not 
sharing that lock, since it is [done as a scheduled task in a separate 
thread|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L1547].
 This means that further operations are [not locked at UnifiedLog 
level|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268].
 The operation is further delegated to 
[LocalLog.flush|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177],
 which will also try to [flush the log 
dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177].
 This is the 2nd code part that is involved in the race condition.

Since the log dir flush does not share the lock with the rename dir operation 
(as it is scheduled via the scheduler), the rename dir operation might succeed 
in moving the log dir on disk to "topic-partition", but the LocalLog._dir will 
remain set to "topic-partition.hash-future", and when the flush will attempt to 
flush the "topic-partition.hash-future" directory, it will throw 
NoSuchFileException: "topic-partition.hash-future". Basically, [this 
line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111]
 might succeed, and before [this other 
line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111]
 is executed, flush tries to [flush the future 
dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177].

We tested a fix with a patch on Kafka 3.4.1, on our clusters and it solved the 
issue by synchronizing the flush dir operation. Will reply with a link to a PR.

Note that this bug replicates for every version since 3.0.0, caused by [this 
commit|https://github.com/apache/kafka/commit/db3e5e2c0de367ffcfe4078359d6d208ba722581#diff-eeafed82ed6a8600c397b108787fdf31e03191b0a192774a65c127d0d26edc44R2341]
 when flush dir was added.

  was:
We are using a Kafka cluster with Zookeeper, deployed on top of Kubernetes, 
using banzaicloud/koperator.

We have multiple disks per broker.
We are using Cruise Control remove disk operation in order to aggregate 
multiple smaller disks into a single bigger disk.

When we do this, *the flush operation fails apparently randomly with 
NoSuchFileException, while alterReplicaLogDirs is executed*. Attached a sample 
of logs for the exception and the previous operations taking place.

Will further detail the cause of this issue.

Say we have 3 brokers:
 * broker 101 with disks /kafka-logs1/kafka, /kafka-logs2/kafka and a bigger 
disk /new-kafka-logs1/kafka
 * broker 201 with same disks
 * broker 301 with same disks

When Cruise Control executes a remove disk operation, it calls Kafka 
"adminClient.alterReplicaLogDirs(replicaAssignment)" with such an assignment as 
to move all data from /kafka-logs1/kafka and /kafka-logs2/kafka to 
/new-kafka-logs1/kafka.

During the alter log dir operation, future logs are created (to move data from 
e.g. "/kafka-logs1/kafka/topic-partition" to 
"/new-kafka-logs1/kafka/topic-partition.hash-future"), data is moved and 
finally the log dir will be renamed from 
"/new-kafka-logs1/kafka/topic-partition.hash-future" to 
"/new-kafka-logs1/kafka/topic-partition". This operation is started in 
[UnifiedLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L713]
 and is locked using the [UnifiedLog 
lock|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268].
 The rename is then delegated to 
[LocalLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111-L113].
 This is the 1st code part that is involved in the race condition.

Meanwhile, log dirs can be rolled based on known conditions (e.g. getting 
full), which will call 
[UnifiedLog.roll|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L1547],
 which is locked using the [UnifiedLog 
lock|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268].
 However, the further delegation to UnifiedLog.flushUptoOffsetExclusive is not 
sharing that lock, since it is [done as a scheduled task in a separate 
thread|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L1547].
 This means that further operations are [not locked at UnifiedLog 
level|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268].
 The operation is further delegated to 
[LocalLog.flush|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177],
 which will also try to [flush the log 
dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177].
 This is the 2nd code part that is involved in the race condition.

Since the log dir flush does not share the lock with the rename dir operation 
(as it is scheduled via the scheduler), the rename dir operation might succeed 
in moving the log dir on disk to "topic-partition", but the LocalLog._dir will 
remain set to "topic-partition.hash-future", and when the flush will attempt to 
flush the "topic-partition.hash-future" directory, it will throw 
NoSuchFileException: "topic-partition.hash-future". Basically, [this 
line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111]
 might succeed, and before [this other 
line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111]
 is executed, flush tries to [flush the future 
dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177].

We tested a fix with a patch on Kafka 3.4.1, on our clusters and it solved the 
issue by synchronizing the flush dir operation. Will reply with a link to a PR.

Note that this bug replicates for every version since 3.0.0, caused by [this 
commit|https://github.com/apache/kafka/commit/db3e5e2c0de367ffcfe4078359d6d208ba722581#diff-eeafed82ed6a8600c397b108787fdf31e03191b0a192774a65c127d0d26edc44R2341]
 when flush dir was added.


> Race condition between future log dir roll and replace current with future 
> log during alterReplicaLogDirs
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-15572
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15572
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 3.0.0
>            Reporter: Lucian Ilie
>            Priority: Major
>         Attachments: kafka-alter-log-dir-nosuchfileexception.log
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> We are using a Kafka cluster with Zookeeper, deployed on top of Kubernetes, 
> using banzaicloud/koperator.
> We have multiple disks per broker.
> We are using Cruise Control remove disk operation in order to aggregate 
> multiple smaller disks into a single bigger disk. This CC operation is 
> calling Kafka admin with alter replica log dirs operation.
> During this operation, *the flush operation fails apparently randomly with 
> NoSuchFileException, while alterReplicaLogDirs is executed*. Attached a 
> sample of logs for the exception and the previous operations taking place.
> Will further detail the cause of this issue.
> Say we have 3 brokers:
>  * broker 101 with disks /kafka-logs1/kafka, /kafka-logs2/kafka and a bigger 
> disk /new-kafka-logs1/kafka
>  * broker 201 with same disks
>  * broker 301 with same disks
> When Cruise Control executes a remove disk operation, it calls Kafka 
> "adminClient.alterReplicaLogDirs(replicaAssignment)" with such an assignment 
> as to move all data from /kafka-logs1/kafka and /kafka-logs2/kafka to 
> /new-kafka-logs1/kafka.
> During the alter log dir operation, future logs are created (to move data 
> from e.g. "/kafka-logs1/kafka/topic-partition" to 
> "/new-kafka-logs1/kafka/topic-partition.hash-future"), data is moved and 
> finally the log dir will be renamed from 
> "/new-kafka-logs1/kafka/topic-partition.hash-future" to 
> "/new-kafka-logs1/kafka/topic-partition". This operation is started in 
> [UnifiedLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L713]
>  and is locked using the [UnifiedLog 
> lock|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268].
>  The rename is then delegated to 
> [LocalLog.renameDir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111-L113].
>  This is the 1st code part that is involved in the race condition.
> Meanwhile, log dirs can be rolled based on known conditions (e.g. getting 
> full), which will call 
> [UnifiedLog.roll|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L1547],
>  which is locked using the [UnifiedLog 
> lock|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268].
>  However, the further delegation to UnifiedLog.flushUptoOffsetExclusive is 
> not sharing that lock, since it is [done as a scheduled task in a separate 
> thread|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L1547].
>  This means that further operations are [not locked at UnifiedLog 
> level|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/UnifiedLog.scala#L268].
>  The operation is further delegated to 
> [LocalLog.flush|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177],
>  which will also try to [flush the log 
> dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177].
>  This is the 2nd code part that is involved in the race condition.
> Since the log dir flush does not share the lock with the rename dir operation 
> (as it is scheduled via the scheduler), the rename dir operation might 
> succeed in moving the log dir on disk to "topic-partition", but the 
> LocalLog._dir will remain set to "topic-partition.hash-future", and when the 
> flush will attempt to flush the "topic-partition.hash-future" directory, it 
> will throw NoSuchFileException: "topic-partition.hash-future". Basically, 
> [this 
> line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111]
>  might succeed, and before [this other 
> line|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L111]
>  is executed, flush tries to [flush the future 
> dir|https://github.com/apache/kafka/blob/3.4/core/src/main/scala/kafka/log/LocalLog.scala#L177].
> We tested a fix with a patch on Kafka 3.4.1, on our clusters and it solved 
> the issue by synchronizing the flush dir operation. Will reply with a link to 
> a PR.
> Note that this bug replicates for every version since 3.0.0, caused by [this 
> commit|https://github.com/apache/kafka/commit/db3e5e2c0de367ffcfe4078359d6d208ba722581#diff-eeafed82ed6a8600c397b108787fdf31e03191b0a192774a65c127d0d26edc44R2341]
>  when flush dir was added.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to