[
https://issues.apache.org/jira/browse/KAFKA-9184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16976648#comment-16976648
]
Timur Rubeko edited comment on KAFKA-9184 at 11/18/19 3:50 PM:
---------------------------------------------------------------
Hello. SO question author here.
Following is an example of a sequence of events that typically leads to the
redundant task creation. Set-up: three workers and three connectors. Relevant
logs:
*Worker A*:
{code:java}
[2019-11-03 11:07:26,912] INFO [Worker clientId=connect-1,
groupId=ingest-sources-cluster] Joined group at generation 639 and got
assignment: Assignment{error=0,
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f',
leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[mqtt-source],
taskIds=[mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[],
delay=300000}
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
[2019-11-03 11:12:06,192] INFO [Worker clientId=connect-1,
groupId=ingest-sources-cluster] Joined group at generation 640 and got
assignment: Assignment{error=0,
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f',
leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink,
another-hdfs-sink, mqtt-source], taskIds=[hdfs-sink-0, another-hdfs-sink-0,
mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0}
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
[2019-11-03 11:12:09,247] INFO [Worker clientId=connect-1,
groupId=ingest-sources-cluster] Joined group at generation 641 and got
assignment: Assignment{error=0,
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f',
leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink,
another-hdfs-sink, mqtt-source], taskIds=[hdfs-sink-0, another-hdfs-sink-0,
mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0}
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
[2019-11-03 12:49:05,632] INFO [Worker clientId=connect-1,
groupId=ingest-sources-cluster] Joined group at generation 642 and got
assignment: Assignment{error=0,
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f',
leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink,
another-hdfs-sink, mqtt-source], taskIds=[hdfs-sink-0, another-hdfs-sink-0,
mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0}
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
{code}
*Worker B*:
{code:java}
[2019-11-03 11:07:26,911] INFO [Worker clientId=connect-1,
groupId=ingest-sources-cluster] Joined group at generation 639 and got
assignment: Assignment{error=0,
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f',
leaderUrl='http://10.16.0.18:8083/', offset=250,
connectorIds=[another-hdfs-sink], taskIds=[another-hdfs-sink-0],
revokedConnectorIds=[], revokedTaskIds=[], delay=300000}
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
[2019-11-03 11:12:06,041] INFO [Worker clientId=connect-1,
groupId=ingest-sources-cluster] Attempt to heartbeat failed for since member id
connect-1-bf534716-be2f-4cb3-9f26-521023c6b504 is not valid.
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:947)
[2019-11-03 11:12:09,251] INFO [Worker clientId=connect-1,
groupId=ingest-sources-cluster] Joined group at generation 641 and got
assignment: Assignment{error=0,
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f',
leaderUrl='http://10.16.0.18:8083/', offset=250,
connectorIds=[another-hdfs-sink], taskIds=[another-hdfs-sink-0],
revokedConnectorIds=[], revokedTaskIds=[], delay=0}
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
[2019-11-03 12:49:03,150] INFO [Worker clientId=connect-1,
groupId=ingest-sources-cluster] Attempt to heartbeat failed for since member id
connect-1-c930bdb9-eedf-4313-95e0-4a6927836094 is not valid.
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:947)
[2019-11-03 12:49:05,632] INFO [Worker clientId=connect-1,
groupId=ingest-sources-cluster] Joined group at generation 642 and got
assignment: Assignment{error=0,
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f',
leaderUrl='http://10.16.0.18:8083/', offset=250,
connectorIds=[another-hdfs-sink], taskIds=[another-hdfs-sink-0],
revokedConnectorIds=[], revokedTaskIds=[], delay=0}
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
{code}
*Worker C*:
{code:java}
[2019-11-03 11:07:26,911] INFO [Worker clientId=connect-1,
groupId=ingest-sources-cluster] Joined group at generation 639 and got
assignment: Assignment{error=0,
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f',
leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink],
taskIds=[hdfs-sink-0], revokedConnectorIds=[], revokedTaskIds=[], delay=300000}
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
[2019-11-03 11:12:06,006] INFO [Worker clientId=connect-1,
groupId=ingest-sources-cluster] Attempt to heartbeat failed for since member id
connect-1-d9a36d68-ab64-4404-a2aa-e5eaf3249ec4 is not valid.
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:947)
[2019-11-03 11:12:09,247] INFO [Worker clientId=connect-1,
groupId=ingest-sources-cluster] Joined group at generation 641 and got
assignment: Assignment{error=0,
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f',
leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink],
taskIds=[hdfs-sink-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0}
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
[2019-11-03 12:49:05,632] INFO [Worker clientId=connect-1,
groupId=ingest-sources-cluster] Joined group at generation 642 and got
assignment: Assignment{error=0,
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f',
leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink],
taskIds=[hdfs-sink-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0}
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
{code}
My interpretation of these events:
- Generation 639: tasks are evenly distributed between workers.
- Generation 640: workers B and C are missing from generation 640 (presumably
because they fail to heartbeat - `Attempt to heartbeat failed for since member
id connect-1-c930bdb9-eedf-4313-95e0-4a6927836094 is not valid` messages). Only
worker A is left in generation 640 and it is assigned all of the tasks,
including those that were previously assigned to workers B and C (`hdfs-sink-0`
and `another-hdfs-sink-0` in this example).
- Generation 641: this generation is created just 3 seconds after the
generation 640. Workers B and C are *not releasing* the tasks that were
reassigned to worker A, neither worker A releases those tasks. At this point
these tasks are effectively "duplicated".
- Subsequent generations: further rebalancing does not fix the issue -
redundant tasks are kept (generation 642 is given in these logs as an example).
The only remedy is to restart *all* workers. In my tests restarting one worker
only didn't fix the issue (after restart it got assigned same tasks back, even
though they are also present on another worker).
I have noticed that in all worker logs workers are logged as "Worker
clientId=connect-1, groupId=ingest-sources-cluster" and assumed this (same
client id) may be an issue. However, setting different client ids explicitly
doesn't fix the issue. (According to the documentation worker IP makes part of
the worker identification anyway, so it makes sense that explicit client ids
had no effect).
was (Author: trubeko):
Hello. SO question author here.
Following is an example of a sequence of events that typically leads to the
redundant task creation. Set-up: three workers and 3 connectors. Relevant logs:
*Worker A*:
{code:java}
[2019-11-03 11:07:26,912] INFO [Worker clientId=connect-1,
groupId=ingest-sources-cluster] Joined group at generation 639 and got
assignment: Assignment{error=0,
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f',
leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[mqtt-source],
taskIds=[mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[],
delay=300000}
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
[2019-11-03 11:12:06,192] INFO [Worker clientId=connect-1,
groupId=ingest-sources-cluster] Joined group at generation 640 and got
assignment: Assignment{error=0,
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f',
leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink,
another-hdfs-sink, mqtt-source], taskIds=[hdfs-sink-0, another-hdfs-sink-0,
mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0}
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
[2019-11-03 11:12:09,247] INFO [Worker clientId=connect-1,
groupId=ingest-sources-cluster] Joined group at generation 641 and got
assignment: Assignment{error=0,
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f',
leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink,
another-hdfs-sink, mqtt-source], taskIds=[hdfs-sink-0, another-hdfs-sink-0,
mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0}
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
[2019-11-03 12:49:05,632] INFO [Worker clientId=connect-1,
groupId=ingest-sources-cluster] Joined group at generation 642 and got
assignment: Assignment{error=0,
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f',
leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink,
another-hdfs-sink, mqtt-source], taskIds=[hdfs-sink-0, another-hdfs-sink-0,
mqtt-source-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0}
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
{code}
*Worker B*:
{code:java}
[2019-11-03 11:07:26,911] INFO [Worker clientId=connect-1,
groupId=ingest-sources-cluster] Joined group at generation 639 and got
assignment: Assignment{error=0,
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f',
leaderUrl='http://10.16.0.18:8083/', offset=250,
connectorIds=[another-hdfs-sink], taskIds=[another-hdfs-sink-0],
revokedConnectorIds=[], revokedTaskIds=[], delay=300000}
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
[2019-11-03 11:12:06,041] INFO [Worker clientId=connect-1,
groupId=ingest-sources-cluster] Attempt to heartbeat failed for since member id
connect-1-bf534716-be2f-4cb3-9f26-521023c6b504 is not valid.
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:947)
[2019-11-03 11:12:09,251] INFO [Worker clientId=connect-1,
groupId=ingest-sources-cluster] Joined group at generation 641 and got
assignment: Assignment{error=0,
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f',
leaderUrl='http://10.16.0.18:8083/', offset=250,
connectorIds=[another-hdfs-sink], taskIds=[another-hdfs-sink-0],
revokedConnectorIds=[], revokedTaskIds=[], delay=0}
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
[2019-11-03 12:49:03,150] INFO [Worker clientId=connect-1,
groupId=ingest-sources-cluster] Attempt to heartbeat failed for since member id
connect-1-c930bdb9-eedf-4313-95e0-4a6927836094 is not valid.
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:947)
[2019-11-03 12:49:05,632] INFO [Worker clientId=connect-1,
groupId=ingest-sources-cluster] Joined group at generation 642 and got
assignment: Assignment{error=0,
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f',
leaderUrl='http://10.16.0.18:8083/', offset=250,
connectorIds=[another-hdfs-sink], taskIds=[another-hdfs-sink-0],
revokedConnectorIds=[], revokedTaskIds=[], delay=0}
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
{code}
*Worker C*:
{code:java}
[2019-11-03 11:07:26,911] INFO [Worker clientId=connect-1,
groupId=ingest-sources-cluster] Joined group at generation 639 and got
assignment: Assignment{error=0,
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f',
leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink],
taskIds=[hdfs-sink-0], revokedConnectorIds=[], revokedTaskIds=[], delay=300000}
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
[2019-11-03 11:12:06,006] INFO [Worker clientId=connect-1,
groupId=ingest-sources-cluster] Attempt to heartbeat failed for since member id
connect-1-d9a36d68-ab64-4404-a2aa-e5eaf3249ec4 is not valid.
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:947)
[2019-11-03 11:12:09,247] INFO [Worker clientId=connect-1,
groupId=ingest-sources-cluster] Joined group at generation 641 and got
assignment: Assignment{error=0,
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f',
leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink],
taskIds=[hdfs-sink-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0}
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
[2019-11-03 12:49:05,632] INFO [Worker clientId=connect-1,
groupId=ingest-sources-cluster] Joined group at generation 642 and got
assignment: Assignment{error=0,
leader='connect-1-7d69fcf2-1025-418b-9091-4054b984d18f',
leaderUrl='http://10.16.0.18:8083/', offset=250, connectorIds=[hdfs-sink],
taskIds=[hdfs-sink-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0}
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1397)
{code}
My interpretation of these events:
- Generation 639: tasks are evenly distributed between workers.
- Generation 640: workers B and C are missing from generation 640 (presumably
because they fail to heartbeat - `Attempt to heartbeat failed for since member
id connect-1-c930bdb9-eedf-4313-95e0-4a6927836094 is not valid` messages). Only
worker A is left in generation 640 and it is assigned all of the tasks,
including those that were previously assigned to workers B and C (`hdfs-sink-0`
and `another-hdfs-sink-0` in this example).
- Generation 641: this generation is created just 3 seconds after the
generation 640. Workers B and C are *not releasing* the tasks that were
reassigned to worker A, neither worker A releases those tasks. At this point
these tasks are effectively "duplicated".
- Subsequent generations: further rebalancing does not fix the issue -
redundant tasks are kept (generation 642 is given in these logs as an example).
The only remedy is to restart *all* workers. In my tests restarting one worker
only didn't fix the issue (after restart it got assigned same tasks back, even
though they are also present on another worker).
I have noticed that in all worker logs workers are logged as "Worker
clientId=connect-1, groupId=ingest-sources-cluster" and assumed this (same
client id) may be an issue. However, setting different client ids explictly
doesn't fix the issue. (According to the documentation worker IP makes part of
the worker identification anyway, so it makes sense that exlicit client ids had
no effect).
> Redundant task creation after worker fails to join a specific group generation
> ------------------------------------------------------------------------------
>
> Key: KAFKA-9184
> URL: https://issues.apache.org/jira/browse/KAFKA-9184
> Project: Kafka
> Issue Type: Bug
> Components: KafkaConnect
> Affects Versions: 2.3.2
> Reporter: Konstantine Karantasis
> Assignee: Konstantine Karantasis
> Priority: Blocker
> Fix For: 2.3.2
>
>
> First reported here:
> https://stackoverflow.com/questions/58631092/kafka-connect-assigns-same-task-to-multiple-workers
> There seems to be an issue with task reassignment when a worker rejoins after
> an unsuccessful join request. The worker seems to be outside the group for a
> generation but when it joins again the same task is running in more than one
> worker
--
This message was sent by Atlassian Jira
(v8.3.4#803005)