[ 
https://issues.apache.org/jira/browse/KAFKA-10763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shao Wang updated KAFKA-10763:
------------------------------
    Description: 
In our production environment, upon start two KafkaConnect workers, during the 
first couple of minutes, the leader bounces between worker1 and worker2. And a 
lot of tasks throw Task already exists in this worker exception on worker2.

The sequence of events:

worker2(hostname:sinkdp2)

gen3 assign
 Start task 1

gen4 assign task 1

gen5 assign task 1

gen6 skip stopping task 1 and removal due to rebalance unresolved
 revoke

gen7 assign task 1
 Start task 1(Task already exists eror)

 

WorkerA(hostname: sinkdp1)
{code:java}
03:36:07,340 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Rebalance started   [DistributedHerder-connect-1][WorkerCoordinator.java:233]
03:36:10,460 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Joined group at generation 1 with protocol version 1 and got assignment: 
Assignment{error=0, leader='connect-1-a5790b31-6890-4958-905d-d44be9e18842', 
leaderUrl='http://sinkdp1:8083/', offset=6457, 
connectorIds=[dp-hive-sink-connector-dptask_475_22, 
dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta
03:36:10,694 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Starting task dp-hive-sink-connector-dptask_475_22-0   
[pool-9-thread-5][DistributedHerder.java:1073]
03:36:10,979 [INFO ] Instantiated task dp-hive-sink-connector-dptask_475_22-0 
with version 0.14.0-SNAPSHOT of type 
com.datapipeline.sink.connector.hive.HiveConnectorTask   
[pool-9-thread-5][Worker.java:426]
03:36:37,692 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Rebalance started   [DistributedHerder-connect-1][WorkerCoordinator.java:233]
03:36:37,806 [INFO ] Stopping task dp-hive-sink-connector-dptask_475_22-0   
[pool-9-thread-5][Worker.java:702]
03:40:09,721 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Finished stopping tasks in preparation for rebalance   
[DistributedHerder-connect-1][DistributedHerder.java:1502]
03:40:09,722 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Joined group at generation 2 with protocol version 1 and got assignment: 
Assignment{error=0, leader='connect-1-a5790b31-6890-4958-905d-d44be9e18842', 
leaderUrl='http://sinkdp1:8083/', offset=6457, 
connectorIds=[dp-hive-sink-connector-dptask_599_20, 
dp-tidb-connector-dptask_580_13, dp-hive-sink-connector-dpta
03:40:09,722 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Rebalance started   [DistributedHerder-connect-1][WorkerCoordinator.java:233]
03:41:10,650 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Wasn't unable to resume work after last rebalance, can skip stopping connectors 
and tasks   [DistributedHerder-connect-1][DistributedHerder.java:1517]
03:41:10,650 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Joined group at generation 4 with protocol version 1 and got assignment: 
Assignment{error=0, leader='connect-1-ff52e9fd-c96c-4e50-93cf-25cf2ae430a4', 
leaderUrl='http://sinkdp2:8083/', offset=6457, connectorIds=[], taskIds=[], 
revokedConnectorIds=[dp-hive-sink-connector-dptask_599_20, 
dp-tidb-connector-dptask
03:41:10,651 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Rebalance started   [DistributedHerder-connect-1][WorkerCoordinator.java:233]
03:42:10,815 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Joined group at generation 5 with protocol version 1 and got assignment: 
Assignment{error=0, leader='connect-1-5a0dbfd9-10d8-42dd-8efd-52ac6ba81e17', 
leaderUrl='http://sinkdp1:8083/', offset=6457, 
connectorIds=[dp-hive-sink-connector-dptask_475_22, 
dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta
03:42:10,953 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Starting task dp-hive-sink-connector-dptask_475_22-0   
[pool-9-thread-8][DistributedHerder.java:1073]
03:42:10,953 [INFO ] Instantiated task dp-hive-sink-connector-dptask_475_22-0 
with version 0.14.0-SNAPSHOT of type 
com.datapipeline.sink.connector.hive.HiveConnectorTask   
[pool-9-thread-8][Worker.java:426]
03:42:29,429 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Rebalance started   [DistributedHerder-connect-1][WorkerCoordinator.java:233]
03:43:28,336 [INFO ] Stopping task dp-hive-sink-connector-dptask_475_22-0   
[pool-9-thread-2][Worker.java:702]
03:46:05,804 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Finished stopping tasks in preparation for rebalance   
[DistributedHerder-connect-1][DistributedHerder.java:1502]
03:46:05,806 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Joined group at generation 6 with protocol version 1 and got assignment: 
Assignment{error=0, leader='connect-1-5a0dbfd9-10d8-42dd-8efd-52ac6ba81e17', 
leaderUrl='http://sinkdp1:8083/', offset=6457, 
connectorIds=[dp-hive-sink-connector-dptask_599_20, 
dp-tidb-connector-dptask_580_13, dp-hive-sink-connector-dpta
03:46:05,806 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Rebalance started   [DistributedHerder-connect-1][WorkerCoordinator.java:233]
03:47:06,564 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Wasn't unable to resume work after last rebalance, can skip stopping connectors 
and tasks   [DistributedHerder-connect-1][DistributedHerder.java:1517]
{code}
WorkerB (hostname: sinkdp2)
{code:java}
03:36:35,984 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Rebalance started   [DistributedHerder-connect-1][WorkerCoordinator.java:233]
03:36:37,780 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Joined group at generation 2 with protocol version 1 and got assignment: 
Assignment{error=0, leader='connect-1-a5790b31-6890-4958-905d-d44be9e18842', 
leaderUrl='http://sinkdp1:8083/', offset=6457, connectorIds=[], taskIds=[], 
revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0   
[Dist
03:37:40,789 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Rebalance started   [DistributedHerder-connect-1][WorkerCoordinator.java:233]
03:37:40,916 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Joined group at generation 3 with protocol version 1 and got assignment: 
Assignment{error=0, leader='connect-1-ff52e9fd-c96c-4e50-93cf-25cf2ae430a4', 
leaderUrl='http://sinkdp2:8083/', offset=6457, 
connectorIds=[dp-hive-sink-connector-dptask_475_22, 
dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta
03:37:41,151 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Starting task dp-hive-sink-connector-dptask_475_22-0   
[pool-9-thread-1][DistributedHerder.java:1073]
03:37:41,507 [INFO ] Instantiated task dp-hive-sink-connector-dptask_475_22-0 
with version 0.14.0-SNAPSHOT of type 
com.datapipeline.sink.connector.hive.HiveConnectorTask   
[pool-9-thread-1][Worker.java:426]
03:40:13,254 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Rebalance started   [DistributedHerder-connect-1][WorkerCoordinator.java:233]
03:42:27,376 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Finished stopping tasks in preparation for rebalance   
[DistributedHerder-connect-1][DistributedHerder.java:1502]
03:42:27,377 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Joined group at generation 4 with protocol version 1 and got assignment: 
Assignment{error=0, leader='connect-1-ff52e9fd-c96c-4e50-93cf-25cf2ae430a4', 
leaderUrl='http://sinkdp2:8083/', offset=6457, 
connectorIds=[dp-hive-sink-connector-dptask_475_22, 
dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta
03:42:27,378 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Rebalance started   [DistributedHerder-connect-1][WorkerCoordinator.java:233]
03:43:28,190 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Wasn't unable to resume work after last rebalance, can skip stopping connectors 
and tasks   [DistributedHerder-connect-1][DistributedHerder.java:1517]
03:43:28,191 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Joined group at generation 6 with protocol version 1 and got assignment: 
Assignment{error=0, leader='connect-1-5a0dbfd9-10d8-42dd-8efd-52ac6ba81e17', 
leaderUrl='http://sinkdp1:8083/', offset=6457, connectorIds=[], taskIds=[], 
revokedConnectorIds=[dp-hive-sink-connector-dptask_475_22, 
dp-hive-sink-connector-d
03:43:28,191 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Rebalance started   [DistributedHerder-connect-1][WorkerCoordinator.java:233]
03:44:28,358 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Joined group at generation 7 with protocol version 1 and got assignment: 
Assignment{error=0, leader='connect-1-8494a329-d5a8-443c-8c90-ba710cc44c2d', 
leaderUrl='http://sinkdp2:8083/', offset=6457, 
connectorIds=[dp-hive-sink-connector-dptask_475_22, 
dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta
03:44:28,692 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Starting task dp-hive-sink-connector-dptask_475_22-0   
[pool-9-thread-7][DistributedHerder.java:1073]
kafka.connect.errors.ConnectException: Task already exists in this worker: 
dp-hive-sink-connector-dptask_475_22-0
03:46:07,401 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Rebalance started   [DistributedHerder-connect-1][WorkerCoordinator.java:233]
03:48:07,024 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Finished stopping tasks in preparation for rebalance   
[DistributedHerder-connect-1][DistributedHerder.java:1502]
03:48:07,246 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Joined group at generation 8 with protocol version 1 and got assignment: 
Assignment{error=0, leader='connect-1-8494a329-d5a8-443c-8c90-ba710cc44c2d', 
leaderUrl='http://sinkdp2:8083/', offset=6457, 
connectorIds=[dp-hive-sink-connector-dptask_475_22, 
dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta
03:48:07,246 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Rebalance started   [DistributedHerder-connect-1][WorkerCoordinator.java:233]
03:49:07,446 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Wasn't unable to resume work after last rebalance, can skip stopping connectors 
and tasks   [DistributedHerder-connect-1][DistributedHerder.java:1517]
03:49:07,446 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Joined group at generation 10 with protocol version 1 and got assignment: 
Assignment{error=0, leader='connect-1-2a498edc-a517-457b-b982-c84cc9ff4521', 
leaderUrl='http://sinkdp1:8083/', offset=6457, connectorIds=[], taskIds=[], 
revokedConnectorIds=[dp-hive-sink-connector-dptask_475_22, 
dp-hive-sink-connector-
03:49:07,447 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Rebalance started   [DistributedHerder-connect-1][WorkerCoordinator.java:233]
03:50:07,677 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Joined group at generation 11 with protocol version 1 and got assignment: 
Assignment{error=0, leader='connect-1-d0be6a23-8dfa-4289-a818-8a655effa48d', 
leaderUrl='http://sinkdp2:8083/', offset=6457, 
connectorIds=[dp-hive-sink-connector-dptask_475_22, 
dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpt
03:50:08,079 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Starting task dp-hive-sink-connector-dptask_475_22-0   
[pool-9-thread-3][DistributedHerder.java:1073]
kafka.connect.errors.ConnectException: Task already exists in this worker: 
dp-hive-sink-connector-dptask_475_22-0
{code}

  was:
In our production environment, upon start two KafkaConnect workers, during the 
first couple of minutes, the leader bounces between worker1 and worker2. And a 
lot of tasks throw Task already exists in this worker exception on worker2.

The sequence of events:

worker2(hostname:sinkdp2)
 
g3 assign
 Start task 1

g4 assign task 1

g5 assign task 1
 
g6 skip stopping task 1 and removal due to rebalance unresolved
 revoke

g7 assign task 1
 Start task 1(Task already exists eror)

 

WorkerA(hostname: sinkdp1)
{code:java}
03:36:07,340 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Rebalance started   [DistributedHerder-connect-1][WorkerCoordinator.java:233]
03:36:10,460 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Joined group at generation 1 with protocol version 1 and got assignment: 
Assignment{error=0, leader='connect-1-a5790b31-6890-4958-905d-d44be9e18842', 
leaderUrl='http://sinkdp1:8083/', offset=6457, 
connectorIds=[dp-hive-sink-connector-dptask_475_22, 
dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta
03:36:10,694 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Starting task dp-hive-sink-connector-dptask_475_22-0   
[pool-9-thread-5][DistributedHerder.java:1073]
03:36:10,979 [INFO ] Instantiated task dp-hive-sink-connector-dptask_475_22-0 
with version 0.14.0-SNAPSHOT of type 
com.datapipeline.sink.connector.hive.HiveConnectorTask   
[pool-9-thread-5][Worker.java:426]
03:36:37,692 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Rebalance started   [DistributedHerder-connect-1][WorkerCoordinator.java:233]
03:36:37,806 [INFO ] Stopping task dp-hive-sink-connector-dptask_475_22-0   
[pool-9-thread-5][Worker.java:702]
03:40:09,721 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Finished stopping tasks in preparation for rebalance   
[DistributedHerder-connect-1][DistributedHerder.java:1502]
03:40:09,722 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Joined group at generation 2 with protocol version 1 and got assignment: 
Assignment{error=0, leader='connect-1-a5790b31-6890-4958-905d-d44be9e18842', 
leaderUrl='http://sinkdp1:8083/', offset=6457, 
connectorIds=[dp-hive-sink-connector-dptask_599_20, 
dp-tidb-connector-dptask_580_13, dp-hive-sink-connector-dpta
03:40:09,722 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Rebalance started   [DistributedHerder-connect-1][WorkerCoordinator.java:233]
03:41:10,650 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Wasn't unable to resume work after last rebalance, can skip stopping connectors 
and tasks   [DistributedHerder-connect-1][DistributedHerder.java:1517]
03:41:10,650 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Joined group at generation 4 with protocol version 1 and got assignment: 
Assignment{error=0, leader='connect-1-ff52e9fd-c96c-4e50-93cf-25cf2ae430a4', 
leaderUrl='http://sinkdp2:8083/', offset=6457, connectorIds=[], taskIds=[], 
revokedConnectorIds=[dp-hive-sink-connector-dptask_599_20, 
dp-tidb-connector-dptask
03:41:10,651 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Rebalance started   [DistributedHerder-connect-1][WorkerCoordinator.java:233]
03:42:10,815 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Joined group at generation 5 with protocol version 1 and got assignment: 
Assignment{error=0, leader='connect-1-5a0dbfd9-10d8-42dd-8efd-52ac6ba81e17', 
leaderUrl='http://sinkdp1:8083/', offset=6457, 
connectorIds=[dp-hive-sink-connector-dptask_475_22, 
dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta
03:42:10,953 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Starting task dp-hive-sink-connector-dptask_475_22-0   
[pool-9-thread-8][DistributedHerder.java:1073]
03:42:10,953 [INFO ] Instantiated task dp-hive-sink-connector-dptask_475_22-0 
with version 0.14.0-SNAPSHOT of type 
com.datapipeline.sink.connector.hive.HiveConnectorTask   
[pool-9-thread-8][Worker.java:426]
03:42:29,429 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Rebalance started   [DistributedHerder-connect-1][WorkerCoordinator.java:233]
03:43:28,336 [INFO ] Stopping task dp-hive-sink-connector-dptask_475_22-0   
[pool-9-thread-2][Worker.java:702]
03:46:05,804 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Finished stopping tasks in preparation for rebalance   
[DistributedHerder-connect-1][DistributedHerder.java:1502]
03:46:05,806 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Joined group at generation 6 with protocol version 1 and got assignment: 
Assignment{error=0, leader='connect-1-5a0dbfd9-10d8-42dd-8efd-52ac6ba81e17', 
leaderUrl='http://sinkdp1:8083/', offset=6457, 
connectorIds=[dp-hive-sink-connector-dptask_599_20, 
dp-tidb-connector-dptask_580_13, dp-hive-sink-connector-dpta
03:46:05,806 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Rebalance started   [DistributedHerder-connect-1][WorkerCoordinator.java:233]
03:47:06,564 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Wasn't unable to resume work after last rebalance, can skip stopping connectors 
and tasks   [DistributedHerder-connect-1][DistributedHerder.java:1517]
{code}
WorkerB (hostname: sinkdp2)
{code:java}
03:36:35,984 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Rebalance started   [DistributedHerder-connect-1][WorkerCoordinator.java:233]
03:36:37,780 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Joined group at generation 2 with protocol version 1 and got assignment: 
Assignment{error=0, leader='connect-1-a5790b31-6890-4958-905d-d44be9e18842', 
leaderUrl='http://sinkdp1:8083/', offset=6457, connectorIds=[], taskIds=[], 
revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0   
[Dist
03:37:40,789 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Rebalance started   [DistributedHerder-connect-1][WorkerCoordinator.java:233]
03:37:40,916 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Joined group at generation 3 with protocol version 1 and got assignment: 
Assignment{error=0, leader='connect-1-ff52e9fd-c96c-4e50-93cf-25cf2ae430a4', 
leaderUrl='http://sinkdp2:8083/', offset=6457, 
connectorIds=[dp-hive-sink-connector-dptask_475_22, 
dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta
03:37:41,151 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Starting task dp-hive-sink-connector-dptask_475_22-0   
[pool-9-thread-1][DistributedHerder.java:1073]
03:37:41,507 [INFO ] Instantiated task dp-hive-sink-connector-dptask_475_22-0 
with version 0.14.0-SNAPSHOT of type 
com.datapipeline.sink.connector.hive.HiveConnectorTask   
[pool-9-thread-1][Worker.java:426]
03:40:13,254 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Rebalance started   [DistributedHerder-connect-1][WorkerCoordinator.java:233]
03:42:27,376 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Finished stopping tasks in preparation for rebalance   
[DistributedHerder-connect-1][DistributedHerder.java:1502]
03:42:27,377 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Joined group at generation 4 with protocol version 1 and got assignment: 
Assignment{error=0, leader='connect-1-ff52e9fd-c96c-4e50-93cf-25cf2ae430a4', 
leaderUrl='http://sinkdp2:8083/', offset=6457, 
connectorIds=[dp-hive-sink-connector-dptask_475_22, 
dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta
03:42:27,378 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Rebalance started   [DistributedHerder-connect-1][WorkerCoordinator.java:233]
03:43:28,190 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Wasn't unable to resume work after last rebalance, can skip stopping connectors 
and tasks   [DistributedHerder-connect-1][DistributedHerder.java:1517]
03:43:28,191 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Joined group at generation 6 with protocol version 1 and got assignment: 
Assignment{error=0, leader='connect-1-5a0dbfd9-10d8-42dd-8efd-52ac6ba81e17', 
leaderUrl='http://sinkdp1:8083/', offset=6457, connectorIds=[], taskIds=[], 
revokedConnectorIds=[dp-hive-sink-connector-dptask_475_22, 
dp-hive-sink-connector-d
03:43:28,191 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Rebalance started   [DistributedHerder-connect-1][WorkerCoordinator.java:233]
03:44:28,358 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Joined group at generation 7 with protocol version 1 and got assignment: 
Assignment{error=0, leader='connect-1-8494a329-d5a8-443c-8c90-ba710cc44c2d', 
leaderUrl='http://sinkdp2:8083/', offset=6457, 
connectorIds=[dp-hive-sink-connector-dptask_475_22, 
dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta
03:44:28,692 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Starting task dp-hive-sink-connector-dptask_475_22-0   
[pool-9-thread-7][DistributedHerder.java:1073]
kafka.connect.errors.ConnectException: Task already exists in this worker: 
dp-hive-sink-connector-dptask_475_22-0
03:46:07,401 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Rebalance started   [DistributedHerder-connect-1][WorkerCoordinator.java:233]
03:48:07,024 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Finished stopping tasks in preparation for rebalance   
[DistributedHerder-connect-1][DistributedHerder.java:1502]
03:48:07,246 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Joined group at generation 8 with protocol version 1 and got assignment: 
Assignment{error=0, leader='connect-1-8494a329-d5a8-443c-8c90-ba710cc44c2d', 
leaderUrl='http://sinkdp2:8083/', offset=6457, 
connectorIds=[dp-hive-sink-connector-dptask_475_22, 
dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta
03:48:07,246 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Rebalance started   [DistributedHerder-connect-1][WorkerCoordinator.java:233]
03:49:07,446 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Wasn't unable to resume work after last rebalance, can skip stopping connectors 
and tasks   [DistributedHerder-connect-1][DistributedHerder.java:1517]
03:49:07,446 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Joined group at generation 10 with protocol version 1 and got assignment: 
Assignment{error=0, leader='connect-1-2a498edc-a517-457b-b982-c84cc9ff4521', 
leaderUrl='http://sinkdp1:8083/', offset=6457, connectorIds=[], taskIds=[], 
revokedConnectorIds=[dp-hive-sink-connector-dptask_475_22, 
dp-hive-sink-connector-
03:49:07,447 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Rebalance started   [DistributedHerder-connect-1][WorkerCoordinator.java:233]
03:50:07,677 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Joined group at generation 11 with protocol version 1 and got assignment: 
Assignment{error=0, leader='connect-1-d0be6a23-8dfa-4289-a818-8a655effa48d', 
leaderUrl='http://sinkdp2:8083/', offset=6457, 
connectorIds=[dp-hive-sink-connector-dptask_475_22, 
dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpt
03:50:08,079 [INFO ] [Worker clientId=connect-1, groupId=group_connect_sink_dp] 
Starting task dp-hive-sink-connector-dptask_475_22-0   
[pool-9-thread-3][DistributedHerder.java:1073]
kafka.connect.errors.ConnectException: Task already exists in this worker: 
dp-hive-sink-connector-dptask_475_22-0
{code}


> Task already exists error on same worker due to skip removal of tasks
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-10763
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10763
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 2.3.0
>            Reporter: Shao Wang
>            Priority: Major
>
> In our production environment, upon start two KafkaConnect workers, during 
> the first couple of minutes, the leader bounces between worker1 and worker2. 
> And a lot of tasks throw Task already exists in this worker exception on 
> worker2.
> The sequence of events:
> worker2(hostname:sinkdp2)
> gen3 assign
>  Start task 1
> gen4 assign task 1
> gen5 assign task 1
> gen6 skip stopping task 1 and removal due to rebalance unresolved
>  revoke
> gen7 assign task 1
>  Start task 1(Task already exists eror)
>  
> WorkerA(hostname: sinkdp1)
> {code:java}
> 03:36:07,340 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Rebalance started   
> [DistributedHerder-connect-1][WorkerCoordinator.java:233]
> 03:36:10,460 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Joined group at generation 1 with protocol 
> version 1 and got assignment: Assignment{error=0, 
> leader='connect-1-a5790b31-6890-4958-905d-d44be9e18842', 
> leaderUrl='http://sinkdp1:8083/', offset=6457, 
> connectorIds=[dp-hive-sink-connector-dptask_475_22, 
> dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta
> 03:36:10,694 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Starting task 
> dp-hive-sink-connector-dptask_475_22-0   
> [pool-9-thread-5][DistributedHerder.java:1073]
> 03:36:10,979 [INFO ] Instantiated task dp-hive-sink-connector-dptask_475_22-0 
> with version 0.14.0-SNAPSHOT of type 
> com.datapipeline.sink.connector.hive.HiveConnectorTask   
> [pool-9-thread-5][Worker.java:426]
> 03:36:37,692 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Rebalance started   
> [DistributedHerder-connect-1][WorkerCoordinator.java:233]
> 03:36:37,806 [INFO ] Stopping task dp-hive-sink-connector-dptask_475_22-0   
> [pool-9-thread-5][Worker.java:702]
> 03:40:09,721 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Finished stopping tasks in preparation for 
> rebalance   [DistributedHerder-connect-1][DistributedHerder.java:1502]
> 03:40:09,722 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Joined group at generation 2 with protocol 
> version 1 and got assignment: Assignment{error=0, 
> leader='connect-1-a5790b31-6890-4958-905d-d44be9e18842', 
> leaderUrl='http://sinkdp1:8083/', offset=6457, 
> connectorIds=[dp-hive-sink-connector-dptask_599_20, 
> dp-tidb-connector-dptask_580_13, dp-hive-sink-connector-dpta
> 03:40:09,722 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Rebalance started   
> [DistributedHerder-connect-1][WorkerCoordinator.java:233]
> 03:41:10,650 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Wasn't unable to resume work after last 
> rebalance, can skip stopping connectors and tasks   
> [DistributedHerder-connect-1][DistributedHerder.java:1517]
> 03:41:10,650 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Joined group at generation 4 with protocol 
> version 1 and got assignment: Assignment{error=0, 
> leader='connect-1-ff52e9fd-c96c-4e50-93cf-25cf2ae430a4', 
> leaderUrl='http://sinkdp2:8083/', offset=6457, connectorIds=[], taskIds=[], 
> revokedConnectorIds=[dp-hive-sink-connector-dptask_599_20, 
> dp-tidb-connector-dptask
> 03:41:10,651 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Rebalance started   
> [DistributedHerder-connect-1][WorkerCoordinator.java:233]
> 03:42:10,815 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Joined group at generation 5 with protocol 
> version 1 and got assignment: Assignment{error=0, 
> leader='connect-1-5a0dbfd9-10d8-42dd-8efd-52ac6ba81e17', 
> leaderUrl='http://sinkdp1:8083/', offset=6457, 
> connectorIds=[dp-hive-sink-connector-dptask_475_22, 
> dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta
> 03:42:10,953 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Starting task 
> dp-hive-sink-connector-dptask_475_22-0   
> [pool-9-thread-8][DistributedHerder.java:1073]
> 03:42:10,953 [INFO ] Instantiated task dp-hive-sink-connector-dptask_475_22-0 
> with version 0.14.0-SNAPSHOT of type 
> com.datapipeline.sink.connector.hive.HiveConnectorTask   
> [pool-9-thread-8][Worker.java:426]
> 03:42:29,429 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Rebalance started   
> [DistributedHerder-connect-1][WorkerCoordinator.java:233]
> 03:43:28,336 [INFO ] Stopping task dp-hive-sink-connector-dptask_475_22-0   
> [pool-9-thread-2][Worker.java:702]
> 03:46:05,804 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Finished stopping tasks in preparation for 
> rebalance   [DistributedHerder-connect-1][DistributedHerder.java:1502]
> 03:46:05,806 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Joined group at generation 6 with protocol 
> version 1 and got assignment: Assignment{error=0, 
> leader='connect-1-5a0dbfd9-10d8-42dd-8efd-52ac6ba81e17', 
> leaderUrl='http://sinkdp1:8083/', offset=6457, 
> connectorIds=[dp-hive-sink-connector-dptask_599_20, 
> dp-tidb-connector-dptask_580_13, dp-hive-sink-connector-dpta
> 03:46:05,806 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Rebalance started   
> [DistributedHerder-connect-1][WorkerCoordinator.java:233]
> 03:47:06,564 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Wasn't unable to resume work after last 
> rebalance, can skip stopping connectors and tasks   
> [DistributedHerder-connect-1][DistributedHerder.java:1517]
> {code}
> WorkerB (hostname: sinkdp2)
> {code:java}
> 03:36:35,984 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Rebalance started   
> [DistributedHerder-connect-1][WorkerCoordinator.java:233]
> 03:36:37,780 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Joined group at generation 2 with protocol 
> version 1 and got assignment: Assignment{error=0, 
> leader='connect-1-a5790b31-6890-4958-905d-d44be9e18842', 
> leaderUrl='http://sinkdp1:8083/', offset=6457, connectorIds=[], taskIds=[], 
> revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0   
> [Dist
> 03:37:40,789 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Rebalance started   
> [DistributedHerder-connect-1][WorkerCoordinator.java:233]
> 03:37:40,916 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Joined group at generation 3 with protocol 
> version 1 and got assignment: Assignment{error=0, 
> leader='connect-1-ff52e9fd-c96c-4e50-93cf-25cf2ae430a4', 
> leaderUrl='http://sinkdp2:8083/', offset=6457, 
> connectorIds=[dp-hive-sink-connector-dptask_475_22, 
> dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta
> 03:37:41,151 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Starting task 
> dp-hive-sink-connector-dptask_475_22-0   
> [pool-9-thread-1][DistributedHerder.java:1073]
> 03:37:41,507 [INFO ] Instantiated task dp-hive-sink-connector-dptask_475_22-0 
> with version 0.14.0-SNAPSHOT of type 
> com.datapipeline.sink.connector.hive.HiveConnectorTask   
> [pool-9-thread-1][Worker.java:426]
> 03:40:13,254 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Rebalance started   
> [DistributedHerder-connect-1][WorkerCoordinator.java:233]
> 03:42:27,376 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Finished stopping tasks in preparation for 
> rebalance   [DistributedHerder-connect-1][DistributedHerder.java:1502]
> 03:42:27,377 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Joined group at generation 4 with protocol 
> version 1 and got assignment: Assignment{error=0, 
> leader='connect-1-ff52e9fd-c96c-4e50-93cf-25cf2ae430a4', 
> leaderUrl='http://sinkdp2:8083/', offset=6457, 
> connectorIds=[dp-hive-sink-connector-dptask_475_22, 
> dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta
> 03:42:27,378 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Rebalance started   
> [DistributedHerder-connect-1][WorkerCoordinator.java:233]
> 03:43:28,190 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Wasn't unable to resume work after last 
> rebalance, can skip stopping connectors and tasks   
> [DistributedHerder-connect-1][DistributedHerder.java:1517]
> 03:43:28,191 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Joined group at generation 6 with protocol 
> version 1 and got assignment: Assignment{error=0, 
> leader='connect-1-5a0dbfd9-10d8-42dd-8efd-52ac6ba81e17', 
> leaderUrl='http://sinkdp1:8083/', offset=6457, connectorIds=[], taskIds=[], 
> revokedConnectorIds=[dp-hive-sink-connector-dptask_475_22, 
> dp-hive-sink-connector-d
> 03:43:28,191 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Rebalance started   
> [DistributedHerder-connect-1][WorkerCoordinator.java:233]
> 03:44:28,358 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Joined group at generation 7 with protocol 
> version 1 and got assignment: Assignment{error=0, 
> leader='connect-1-8494a329-d5a8-443c-8c90-ba710cc44c2d', 
> leaderUrl='http://sinkdp2:8083/', offset=6457, 
> connectorIds=[dp-hive-sink-connector-dptask_475_22, 
> dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta
> 03:44:28,692 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Starting task 
> dp-hive-sink-connector-dptask_475_22-0   
> [pool-9-thread-7][DistributedHerder.java:1073]
> kafka.connect.errors.ConnectException: Task already exists in this worker: 
> dp-hive-sink-connector-dptask_475_22-0
> 03:46:07,401 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Rebalance started   
> [DistributedHerder-connect-1][WorkerCoordinator.java:233]
> 03:48:07,024 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Finished stopping tasks in preparation for 
> rebalance   [DistributedHerder-connect-1][DistributedHerder.java:1502]
> 03:48:07,246 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Joined group at generation 8 with protocol 
> version 1 and got assignment: Assignment{error=0, 
> leader='connect-1-8494a329-d5a8-443c-8c90-ba710cc44c2d', 
> leaderUrl='http://sinkdp2:8083/', offset=6457, 
> connectorIds=[dp-hive-sink-connector-dptask_475_22, 
> dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta
> 03:48:07,246 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Rebalance started   
> [DistributedHerder-connect-1][WorkerCoordinator.java:233]
> 03:49:07,446 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Wasn't unable to resume work after last 
> rebalance, can skip stopping connectors and tasks   
> [DistributedHerder-connect-1][DistributedHerder.java:1517]
> 03:49:07,446 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Joined group at generation 10 with protocol 
> version 1 and got assignment: Assignment{error=0, 
> leader='connect-1-2a498edc-a517-457b-b982-c84cc9ff4521', 
> leaderUrl='http://sinkdp1:8083/', offset=6457, connectorIds=[], taskIds=[], 
> revokedConnectorIds=[dp-hive-sink-connector-dptask_475_22, 
> dp-hive-sink-connector-
> 03:49:07,447 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Rebalance started   
> [DistributedHerder-connect-1][WorkerCoordinator.java:233]
> 03:50:07,677 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Joined group at generation 11 with protocol 
> version 1 and got assignment: Assignment{error=0, 
> leader='connect-1-d0be6a23-8dfa-4289-a818-8a655effa48d', 
> leaderUrl='http://sinkdp2:8083/', offset=6457, 
> connectorIds=[dp-hive-sink-connector-dptask_475_22, 
> dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpt
> 03:50:08,079 [INFO ] [Worker clientId=connect-1, 
> groupId=group_connect_sink_dp] Starting task 
> dp-hive-sink-connector-dptask_475_22-0   
> [pool-9-thread-3][DistributedHerder.java:1073]
> kafka.connect.errors.ConnectException: Task already exists in this worker: 
> dp-hive-sink-connector-dptask_475_22-0
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to