Sagar Rao created KAFKA-16056:
---------------------------------
Summary: Worker poll timeout expiry can lead to Duplicate task
assignments.
Key: KAFKA-16056
URL: https://issues.apache.org/jira/browse/KAFKA-16056
Project: Kafka
Issue Type: Bug
Components: KafkaConnect
Reporter: Sagar Rao
Assignee: Sagar Rao
When a poll timeout expiry happens for a worker, it triggers a rebalance
because it leaves the group pro-actively. Under normal scenarios, this leaving
the group would trigger a scheduled rebalance delay. However, one thing to note
is that, the worker which left the group temporarily, doesn't give up it's
assignments and whatever tasks were running on it would remain as is. When the
scheduled rebalance delay elapses, it would just get back it's assignments but
given that there won't be any revocations, it should all work out fine.
But there is an edge case here. Let's assume that a scheduled rebalance delay
was already active on a group and just before a follow up rebalance due to
scheduled rebalance elapsing, one of the worker's poll timeout expires. At this
point, a rebalance is imminent and the leader would track the assignments of
the transiently departed worker as lost
[here|https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L255]
. When
[handleLostAssignments|https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L441]
gets triggered, because the scheduledRebalance delay isn't reset yet and if
[this|https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L473]
condition passes, the leader would assume that it needs to reassign all the
lost assignments which it will.
But because, the worker for which the poll timeout expired, doesn't rescind
it's assignments we would end up noticing duplicate assignments- one set on the
original worker which was already running the tasks and connectors and another
set on the remaining group of workers which got the redistributed work. This
could lead to task failures if connector has been written in a way which
expects no duplicate tasks running across a cluster.
Also, this edge case can be encountered more frequently if the
`rebalance.timeout.ms` config is set to a lower value.
One of the approaches could be to do something similar to
https://issues.apache.org/jira/browse/KAFKA-9184 where upon coordinator
discovery failure, the worker gives up all it's assignments and joins with an
empty assignment. We could do something similar in this case as well.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)