[ https://issues.apache.org/jira/browse/KAFKA-9841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17132901#comment-17132901 ]
Yu Wang commented on KAFKA-9841: -------------------------------- Thank you for checking [~vvcephei] and thank you for your help [~kkonstantine]. > Connector and Task duplicated when a worker join with old generation > assignment > ------------------------------------------------------------------------------- > > Key: KAFKA-9841 > URL: https://issues.apache.org/jira/browse/KAFKA-9841 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 2.4.0, 2.3.1, 2.4.1 > Reporter: Yu Wang > Assignee: Yu Wang > Priority: Major > Labels: pull-request-available > Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1 > > > When using IncrementalCooperativeAssignor.class to assign connectors and > tasks. > Suppose there is a worker 'W' got some connection issue with the coordinator. > During the connection issue, the connectors/tasks on 'W' are assigned to the > others worker > When the connection issue disappear, 'W' will join the group with an old > generation assignment. Then the group leader will get duplicated > connectors/tasks in the metadata sent by the workers. But the duplicated > connectors/tasks will not be revoked. > > Generation 3: > Worker1: > [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 3 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', > leaderUrl='http://xxxxxx-2:8083/', offset=514, connectorIds=[], > taskIds=[misc-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker2: > [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 3 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', > leaderUrl='http://xxxxxx-2:8083/', offset=514, connectorIds=[], > taskIds=[misc-4], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker3: > [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 3 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', > leaderUrl='http://xxxxxx-2:8083/', offset=514, connectorIds=[], > taskIds=[misc-3], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 (org.apache.kafka.connect.runtime.dist 1480 > ributed.DistributedHerder) > Worker4: > [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 3 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', > leaderUrl='http://xxxxxx-2:8083/', offset=514, connectorIds=[misc], > taskIds=[misc-1], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker5: > [2020-03-17 04:31:23,482] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 3 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', > leaderUrl='http://xxxxxx-2:8083/', offset=514, connectorIds=[], > taskIds=[misc-5, misc-2], revokedConnectorIds=[], revokedTaskIds=[], delay=0} > with rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > > Generation 4: > Worker1: > [2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 4 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', > leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], > taskIds=[misc-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker2: > [2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 4 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', > leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], > taskIds=[misc-4], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker3: > [2020-03-17 04:32:35,489] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Group coordinator xxxxxx:9092 (id: > 2147483631 rack: null) is unavailable or invalid, will attempt rediscovery > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [2020-03-17 04:32:35,590] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Discovered group coordinator > xxxxxx:9092 (id: 2147483631 rack: null) > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [2020-03-17 04:32:36,910] INFO WorkerSourceTask\{id=misc-3} Committing > offsets (org.apache.kafka.connect.runtime.WorkerSourceTask) > [2020-03-17 04:32:36,910] INFO WorkerSourceTask\{id=misc-3} flushing 86 > outstanding messages for offset commit > (org.apache.kafka.connect.runtime.WorkerSourceTask) > [2020-03-17 04:32:37,164] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Attempt to heartbeat failed since > group is rebalancing > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [2020-03-17 04:32:37,164] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Rebalance started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) > [2020-03-17 04:32:37,164] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] (Re-)joining group > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > Worker4: > [2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 4 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', > leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[misc], > taskIds=[misc-3, misc-1], revokedConnectorIds=[], revokedTaskIds=[], delay=0} > with rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker5: > [2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 4 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', > leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], > taskIds=[misc-5, misc-2], revokedConnectorIds=[], revokedTaskIds=[], delay=0} > with rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > > Generation 5: > Worker1: > [2020-03-17 04:32:42,757] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 5 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', > leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], > taskIds=[misc-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker2: > [2020-03-17 04:32:42,756] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 5 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', > leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], > taskIds=[misc-4], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker3: > [2020-03-17 04:32:42,757] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 5 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', > leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], > taskIds=[misc-3], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker4: > [2020-03-17 04:32:42,756] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 5 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', > leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[misc], > taskIds=[misc-3, misc-1], revokedConnectorIds=[], revokedTaskIds=[], delay=0} > with rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker5: > [2020-03-17 04:32:42,757] INFO [Worker clientId=connect-1, > groupId=xxxxxx_mm2_fb__connect__group] Joined group at generation 5 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', > leaderUrl='http://xxxxxx-4:8083/', offset=515, connectorIds=[], > taskIds=[misc-5, misc-2], revokedConnectorIds=[], revokedTaskIds=[], delay=0} > with rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > > -- This message was sent by Atlassian Jira (v8.3.4#803005)