Yu Wang created KAFKA-9841:
------------------------------

             Summary: Connector and Task duplicated when a worker join with old 
generation assignment
                 Key: KAFKA-9841
                 URL: https://issues.apache.org/jira/browse/KAFKA-9841
             Project: Kafka
          Issue Type: Bug
          Components: KafkaConnect
    Affects Versions: 2.4.1, 2.4.0
            Reporter: Yu Wang


When using IncrementalCooperativeAssignor.class to assign connectors and tasks.

Suppose there is a worker 'W' got some connection issue with the coordinator.

During the connection issue, the connectors/tasks on 'W' are assigned to the 
others worker

When the connection issue disappear, 'W' will join the group with an old 
generation assignment. Then the group leader will get duplicated 
connectors/tasks in the metadata sent by the workers. But the duplicated 
connectors/tasks will not be revoked.

 

Generation 3:

Worker1:

[2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, 
groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 3 with 
protocol version 2 and got assignment: Assignment\{error=0, 
leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', 
leaderUrl='http://xxxxxx-2:8083/', offset=514, connectorIds=[], 
taskIds=[misc-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder)

Worker2:

[2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, 
groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 3 with 
protocol version 2 and got assignment: Assignment\{error=0, 
leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', 
leaderUrl='http://xxxxxx-2:8083/', offset=514, connectorIds=[], 
taskIds=[misc-4], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder)

Worker3:

[2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, 
groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 3 with 
protocol version 2 and got assignment: Assignment\{error=0, 
leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', 
leaderUrl='http://xxxxxx-2:8083/', offset=514, connectorIds=[], 
taskIds=[misc-3], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
rebalance delay: 0 (org.apache.kafka.connect.runtime.dist 1480 
ributed.DistributedHerder)

Worker4:

[2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, 
groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 3 with 
protocol version 2 and got assignment: Assignment\{error=0, 
leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', 
leaderUrl='http://xxxxxx-2:8083/', offset=514, connectorIds=[misc], 
taskIds=[misc-1], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder)

Worker5:

[2020-03-17 04:31:23,482] INFO [Worker clientId=connect-1, 
groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 3 with 
protocol version 2 and got assignment: Assignment\{error=0, 
leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', 
leaderUrl='http://xxxxxx-2:8083/', offset=514, connectorIds=[], 
taskIds=[misc-5, misc-2], revokedConnectorIds=[], revokedTaskIds=[], delay=0} 
with rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder)

 

Generation 4:

Worker1:

[2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, 
groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 4 with 
protocol version 2 and got assignment: Assignment\{error=0, 
leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', 
leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], 
taskIds=[misc-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder)

Worker2:

[2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, 
groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 4 with 
protocol version 2 and got assignment: Assignment\{error=0, 
leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', 
leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], 
taskIds=[misc-4], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder)

Worker3:

[2020-03-17 04:32:35,489] INFO [Worker clientId=connect-1, 
groupId=xxxxxx_mm2_fb__connect__group] Group coordinator xxxxxx:9092 (id: 
2147483631 rack: null) is unavailable or invalid, will attempt rediscovery 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-03-17 04:32:35,590] INFO [Worker clientId=connect-1, 
groupId=xxxxxx_mm2_fb__connect__group] Discovered group coordinator xxxxxx:9092 
(id: 2147483631 rack: null) 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-03-17 04:32:36,910] INFO WorkerSourceTask\{id=misc-3} Committing offsets 
(org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-03-17 04:32:36,910] INFO WorkerSourceTask\{id=misc-3} flushing 86 
outstanding messages for offset commit 
(org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-03-17 04:32:37,164] INFO [Worker clientId=connect-1, 
groupId=xxxxxx_mm2_fb__connect__group] Attempt to heartbeat failed since group 
is rebalancing (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-03-17 04:32:37,164] INFO [Worker clientId=connect-1, 
groupId=xxxxxx_mm2_fb__connect__group] Rebalance started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator)
[2020-03-17 04:32:37,164] INFO [Worker clientId=connect-1, 
groupId=xxxxxx_mm2_fb__connect__group] (Re-)joining group 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

Worker4:

[2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, 
groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 4 with 
protocol version 2 and got assignment: Assignment\{error=0, 
leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', 
leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[misc], 
taskIds=[misc-3, misc-1], revokedConnectorIds=[], revokedTaskIds=[], delay=0} 
with rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder)

Worker5:

[2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, 
groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 4 with 
protocol version 2 and got assignment: Assignment\{error=0, 
leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', 
leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], 
taskIds=[misc-5, misc-2], revokedConnectorIds=[], revokedTaskIds=[], delay=0} 
with rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder)

 

Generation 5:

Worker1:

[2020-03-17 04:32:42,757] INFO [Worker clientId=connect-1, 
groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 5 with 
protocol version 2 and got assignment: Assignment\{error=0, 
leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', 
leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], 
taskIds=[misc-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder)

Worker2:

[2020-03-17 04:32:42,756] INFO [Worker clientId=connect-1, 
groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 5 with 
protocol version 2 and got assignment: Assignment\{error=0, 
leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', 
leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], 
taskIds=[misc-4], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder)

Worker3:

[2020-03-17 04:32:42,757] INFO [Worker clientId=connect-1, 
groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 5 with 
protocol version 2 and got assignment: Assignment\{error=0, 
leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', 
leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], 
taskIds=[misc-3], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder)

Worker4:

[2020-03-17 04:32:42,756] INFO [Worker clientId=connect-1, 
groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 5 with 
protocol version 2 and got assignment: Assignment\{error=0, 
leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', 
leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[misc], 
taskIds=[misc-3, misc-1], revokedConnectorIds=[], revokedTaskIds=[], delay=0} 
with rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder)

Worker5:

[2020-03-17 04:32:42,757] INFO [Worker clientId=connect-1, 
groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 5 with 
protocol version 2 and got assignment: Assignment\{error=0, 
leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', 
leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], 
taskIds=[misc-5, misc-2], revokedConnectorIds=[], revokedTaskIds=[], delay=0} 
with rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder)

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to