[ 
https://issues.apache.org/jira/browse/KAFKA-8931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16936293#comment-16936293
 ] 

HeryLong commented on KAFKA-8931:
---------------------------------

Hi [~kkonstantine]:

 

Please see the following log, we just log the variable `updatedTasks` at 
[KafkaConfigBackingStore.java#L637|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L637]
 before `updateListener.onTaskConfigUpdate(updatedTasks);` , which will be 
restart next.

 

The first time, we add a new connector named 
`mysql__loghub_deve_mysql_d8_1_11568272307638` with `tasks.max=1`, the log is:

 
{code:java}
[2019-09-12 15:11:49,946] INFO ====yllu2==== 
deferred={mysql__loghub_deve_mysql_d8_1_11568272307638-0={connector.class=io.debezium.connector.mysql.MySqlConnector,
 database.user=root, database.server.id=223396, 
database.history.kafka.bootstrap.servers=172.31.96.165:9092,172.31.96.166:9092,172.31.96.167:9092,
 
database.history.kafka.topic=loghub_deve_mysql_d8_1_11568272307638.schema.changes,
 database.server.name=loghub_deve_mysql_d8_1_11568272307638, 
include.schema.changes=true, database.port=3306, 
table.whitelist=inventory.tunnel_monitor_record, 
key.converter.schemas.enable=true, 
task.class=io.debezium.connector.mysql.MySqlConnectorTask, 
snapshot.new.tables=parallel, database.hostname=172.31.96.165, 
database.password=debezium, value.converter.schemas.enable=true, 
name=mysql__loghub_deve_mysql_d8_1_11568272307638, 
database.blacklist=mysql,information_schema,performance_schema, 
value.converter=org.apache.kafka.connect.json.JsonConverter, 
key.converter=org.apache.kafka.connect.json.JsonConverter}}, 
taskConfigs.keySet=[mysql__loghub_deve_mysql_d6_2_21568259412646-0, 
mysql__loghub_deve_mysql_d5_1_11568197168903-0, 
mysql__loghub_deve_mysql_d5_1_21568197168903-0, 
mysql__loghub_deve_mysql_d3_1_21568197168903-0, 
mysql__loghub_deve_mysql_d6_1_21568258962192-0, 
mysql__loghub_deve_mysql_d4_1_21568253242922-0, 
mysql__loghub_deve_mysql_dd_1_11568197066662-0, 
mysql__loghub_deve_mysql_d3_1_11568197168903-0, 
mysql__loghub_deve_mysql_d4_1_11568253194763-0, 
mysql__loghub_deve_mysql_d6_1_11568258162071-0, 
mysql__loghub_deve_mysql_dd_1_21568197168903-0, 
mysql__loghub_deve_mysql_d7_1_21568271676731-0, 
mysql__loghub_deve_mysql_d7_1_11568271070252-0] 
(org.apache.kafka.connect.storage.KafkaConfigBackingStore:623)

[2019-09-12 15:11:49,947] INFO ====yllu2==== started 
updateListener.onTaskConfigUpdate=[mysql__loghub_deve_mysql_d6_2_21568259412646-0,
 mysql__loghub_deve_mysql_d5_1_11568197168903-0, 
mysql__loghub_deve_mysql_d5_1_21568197168903-0, 
mysql__loghub_deve_mysql_d3_1_21568197168903-0, 
mysql__loghub_deve_mysql_d6_1_21568258962192-0, 
mysql__loghub_deve_mysql_d4_1_21568253242922-0, 
mysql__loghub_deve_mysql_d8_1_11568272307638-0, 
mysql__loghub_deve_mysql_dd_1_11568197066662-0, 
mysql__loghub_deve_mysql_d3_1_11568197168903-0, 
mysql__loghub_deve_mysql_d4_1_11568253194763-0, 
mysql__loghub_deve_mysql_d6_1_11568258162071-0, 
mysql__loghub_deve_mysql_dd_1_21568197168903-0, 
mysql__loghub_deve_mysql_d7_1_21568271676731-0, 
mysql__loghub_deve_mysql_d7_1_11568271070252-0] 
(org.apache.kafka.connect.storage.KafkaConfigBackingStore:642)

{code}
 

You can see, task `mysql__loghub_deve_mysql_d8_1_11568272307638-0` is in the 
variable `updatedTasks`, and will be restart functionally, the other 
connectorTasks are the undetected side-effect of:  KAFKA-8869 as your mentioned 
before;

 

Secondly, we add another new connector named 
`mysql__loghub_deve_mysql_d8_1_21568272331420`, with `tasks.max=1' also, the 
log is:

 
{code:java}
[2019-09-12 15:12:12,782] INFO ====yllu2==== 
deferred={mysql__loghub_deve_mysql_d8_1_21568272331420-0={connector.class=io.debezium.connector.mysql.MySqlConnector,
 database.user=root, database.server.id=223397, 
database.history.kafka.bootstrap.servers=172.31.96.165:9092,172.31.96.166:9092,172.31.96.167:9092,
 
database.history.kafka.topic=loghub_deve_mysql_d8_1_21568272331420.schema.changes,
 database.server.name=loghub_deve_mysql_d8_1_21568272331420, 
include.schema.changes=true, database.port=3307, 
table.whitelist=inventory.addresses, key.converter.schemas.enable=true, 
task.class=io.debezium.connector.mysql.MySqlConnectorTask, 
snapshot.new.tables=parallel, database.hostname=172.31.96.165, 
database.password=debezium, value.converter.schemas.enable=true, 
name=mysql__loghub_deve_mysql_d8_1_21568272331420, 
database.blacklist=mysql,information_schema,performance_schema, 
value.converter=org.apache.kafka.connect.json.JsonConverter, 
key.converter=org.apache.kafka.connect.json.JsonConverter}}, 
taskConfigs.keySet=[mysql__loghub_deve_mysql_d6_2_21568259412646-0, 
mysql__loghub_deve_mysql_d5_1_11568197168903-0, 
mysql__loghub_deve_mysql_d5_1_21568197168903-0, 
mysql__loghub_deve_mysql_d3_1_21568197168903-0, 
mysql__loghub_deve_mysql_d6_1_21568258962192-0, 
mysql__loghub_deve_mysql_d4_1_21568253242922-0, 
mysql__loghub_deve_mysql_d8_1_11568272307638-0, 
mysql__loghub_deve_mysql_dd_1_11568197066662-0, 
mysql__loghub_deve_mysql_d3_1_11568197168903-0, 
mysql__loghub_deve_mysql_d4_1_11568253194763-0, 
mysql__loghub_deve_mysql_d6_1_11568258162071-0, 
mysql__loghub_deve_mysql_dd_1_21568197168903-0, 
mysql__loghub_deve_mysql_d7_1_21568271676731-0, 
mysql__loghub_deve_mysql_d7_1_11568271070252-0] 
(org.apache.kafka.connect.storage.KafkaConfigBackingStore:623)

[2019-09-12 15:12:12,783] INFO ====yllu2==== started 
updateListener.onTaskConfigUpdate=[mysql__loghub_deve_mysql_d6_2_21568259412646-0,
 mysql__loghub_deve_mysql_d5_1_11568197168903-0, 
mysql__loghub_deve_mysql_d5_1_21568197168903-0, 
mysql__loghub_deve_mysql_d3_1_21568197168903-0, 
mysql__loghub_deve_mysql_d6_1_21568258962192-0, 
mysql__loghub_deve_mysql_d4_1_21568253242922-0, 
mysql__loghub_deve_mysql_d8_1_11568272307638-0, 
mysql__loghub_deve_mysql_dd_1_11568197066662-0, 
mysql__loghub_deve_mysql_d3_1_11568197168903-0, 
mysql__loghub_deve_mysql_d8_1_21568272331420-0, 
mysql__loghub_deve_mysql_d4_1_11568253194763-0, 
mysql__loghub_deve_mysql_d6_1_11568258162071-0, 
mysql__loghub_deve_mysql_dd_1_21568197168903-0, 
mysql__loghub_deve_mysql_d7_1_21568271676731-0, 
mysql__loghub_deve_mysql_d7_1_11568271070252-0] 
(org.apache.kafka.connect.storage.KafkaConfigBackingStore:642)
{code}
Also, task `mysql__loghub_deve_mysql_d8_1_21568272331420-0` is in the variable 
`updatedTasks`, will be restart functionally.

 

But this time, the variable `updatedTasks` also contains task 
`mysql__loghub_deve_mysql_d8_1_11568272307638-0`, which without any 
configuration updated, but will be restarted always.

 

So, PR [#6850|https://github.com/apache/kafka/pull/6850] fix issue KAFKA-8449, 
but lead to restart all cached tasks of variable `updatedTasks`.--

 

And PR [#7348|https://github.com/apache/kafka/pull/7348] filter the variable 
`updatedTasks` with current reconfigured connector, which will just restart the 
current reconfigured connector tasks, not all cached tasks of variable 
`updatedTasks`;

 
{code:java}
[2019-09-16 10:07:15,515] INFO ====yllu2====updateTasks.filter 
before=[mysql__loghub_deve_mysql_d6_2_21568259412646-0, 
mysql__loghub_deve_mysql_d9_3_11568599609464-0, 
mysql__loghub_deve_mysql_d5_1_11568197168903-0, 
mysql__loghub_deve_mysql_d9_2_21568599562723-0, 
mysql__loghub_deve_mysql_d5_1_21568197168903-0, 
mysql__loghub_deve_mysql_d3_1_21568197168903-0, 
mysql__loghub_deve_mysql_d6_1_21568258962192-0, 
mysql__loghub_deve_mysql_d9_1_21568599408056-0, 
mysql__loghub_deve_mysql_d9_2_11568599542235-0, 
mysql__loghub_deve_mysql_d4_1_21568253242922-0, 
mysql__loghub_deve_mysql_d8_1_11568272307638-0, 
mysql__loghub_deve_mysql_dd_1_11568197066662-0, 
mysql__loghub_deve_mysql_d3_1_11568197168903-0, 
mysql__loghub_deve_mysql_d8_1_21568272331420-0, 
mysql__loghub_deve_mysql_d9_1_11568599358091-0, 
mysql__loghub_deve_mysql_d4_1_11568253194763-0, 
mysql__loghub_deve_mysql_d6_1_11568258162071-0, 
mysql__loghub_deve_mysql_dd_1_21568197168903-0, 
mysql__loghub_deve_mysql_d7_1_21568271676731-0, 
mysql__loghub_deve_mysql_d7_1_11568271070252-0, 
mysql__loghub_deve_mysql_d9_3_21568599634184-0] 
(org.apache.kafka.connect.storage.KafkaConfigBackingStore:642)[2019-09-16 
10:07:15,515] INFO ====yllu2====updateTasks.filter 
after=[mysql__loghub_deve_mysql_d9_3_21568599634184-0] 
(org.apache.kafka.connect.storage.KafkaConfigBackingStore:644)
{code}
 

 

 

> PR '#6850' fix issue 'KAFKA-8449', but lead to restart all cached tasks, 
> which conflict to the motivation of incremental cooperative rebalancing. 
> --------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-8931
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8931
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 2.3.0
>            Reporter: HeryLong
>            Priority: Blocker
>         Attachments: 
> 0001-just-restart-current-reconfigurated-connectorTask-no-all-cacheds-ones.patch
>
>
> Recently, we are testing Kafka Connect 2.3.0, checking the incremental 
> cooperative rebalancing mechanism.
>  
> We found that when new connector was putted via REST endpoint, Kafka Connect 
> 2.3.0 will restart all cached tasks;
> And the issuse is  'KAFKA-8449', the PR is 
> '[#6850|https://github.com/apache/kafka/pull/6850]';
>  
> So, the result is, new connector putted will lead to all cached tasks 
> restarted, which conflict to the motivation of incremental cooperative 
> rebalancing. 
>  
> Should it just restart the new reconfigurated connector or tasks, not all 
> cached ones?
> And I have made a PR, please check it out,  
> '[#7348|https://github.com/apache/kafka/pull/7348]'



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to