[ https://issues.apache.org/jira/browse/KAFKA-8676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16921714#comment-16921714 ]
ASF GitHub Bot commented on KAFKA-8676: --------------------------------------- rhauch commented on pull request #7287: MINOR: Add unit test for KAFKA-8676 to guard against unrequired task restarts URL: https://github.com/apache/kafka/pull/7287 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Avoid Stopping Unnecessary Connectors and Tasks > ------------------------------------------------ > > Key: KAFKA-8676 > URL: https://issues.apache.org/jira/browse/KAFKA-8676 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect > Affects Versions: 2.3.0 > Environment: centOS > Reporter: Luying Liu > Priority: Major > Labels: ready-to-commit > Fix For: 2.3.0 > > Original Estimate: 1h > Remaining Estimate: 1h > > When adding a new connector or changing a connector configuration, Kafka > Connect 2.3.0 will stop all existing tasks and start all the tasks, including > the new tasks and the existing ones. However, it is not necessary at all. > Only the new connector and tasks need to be started. As the rebalancing can > be applied for both running and suspended tasks.The following patch will fix > this problem and starts only the new tasks and connectors. > The problem lies in the > KafkaConfigBackingStore.ConsumeCallback.onCompletion() function (line 623 in > KafkaConfigBackingStore.java). When record.key() startsWith "commit-", the > tasks are being committed, and the deferred tasks are processed, Some new > tasks are added to the 'updatedTasks'(line 623 in > KafkaConfigBackingStore.java), and the 'updatedTasks' are sent to > updateListener to complete the task configuration update(line 638 in > KafkaConfigBackingStore.java). In the updateListener.onTaskConfigUpdate() > function, the 'updatedTasks' are added to the member variable, > 'taskConfigUpdates', of class DistributedHerder(line 1295 in > DistributedHerder.java). > In another thread, 'taskConfigUpdates' is copied to 'taskConfigUpdatesCopy' > in updateConfigsWithIncrementalCooperative() (line 445 in > DistributedHerder.java). The 'taskConfigUpdatesCopy' is subsequently used in > processTaskConfigUpdatesWithIncrementalCooperative() (line 345 in > DistributedHerder.java). This function then uses 'taskConfigUpdatesCopy' to > find connectors to stop(line 492 in DistributedHerder.java), and finally get > the tasks to stop, which are all the tasks. The worker thread does the actual > job of stop(line 499 in DistributedHerder.java). > In the original code, all the tasks are added to the 'updatedTasks' (line 623 > in KafkaConfigBackingStore.java), which means all the active connectors are > in the 'connectorsWhoseTasksToStop' set, and all the tasks are in the > 'tasksToStop' list. This causes the stops, and of course the subsequent > restarts, of all the tasks. > So, adding only the 'deferred' tasks to the 'updatedTasks' can avoid the > stops and restarts of unnecessary tasks. -- This message was sent by Atlassian Jira (v8.3.2#803003)