[ https://issues.apache.org/jira/browse/KAFKA-8676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Randall Hauch resolved KAFKA-8676. ---------------------------------- Fix Version/s: (was: 2.3.0) 2.3.1 2.4.0 Reviewer: Konstantine Karantasis Resolution: Fixed Merged the proposed fix (https://github.com/apache/kafka/pull/7097) and [~kkonstantine]'s unit test (https://github.com/apache/kafka/pull/7287). > Avoid Stopping Unnecessary Connectors and Tasks > ------------------------------------------------ > > Key: KAFKA-8676 > URL: https://issues.apache.org/jira/browse/KAFKA-8676 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect > Affects Versions: 2.3.0 > Environment: centOS > Reporter: Luying Liu > Priority: Major > Labels: ready-to-commit > Fix For: 2.4.0, 2.3.1 > > Original Estimate: 1h > Remaining Estimate: 1h > > When adding a new connector or changing a connector configuration, Kafka > Connect 2.3.0 will stop all existing tasks and start all the tasks, including > the new tasks and the existing ones. However, it is not necessary at all. > Only the new connector and tasks need to be started. As the rebalancing can > be applied for both running and suspended tasks.The following patch will fix > this problem and starts only the new tasks and connectors. > The problem lies in the > KafkaConfigBackingStore.ConsumeCallback.onCompletion() function (line 623 in > KafkaConfigBackingStore.java). When record.key() startsWith "commit-", the > tasks are being committed, and the deferred tasks are processed, Some new > tasks are added to the 'updatedTasks'(line 623 in > KafkaConfigBackingStore.java), and the 'updatedTasks' are sent to > updateListener to complete the task configuration update(line 638 in > KafkaConfigBackingStore.java). In the updateListener.onTaskConfigUpdate() > function, the 'updatedTasks' are added to the member variable, > 'taskConfigUpdates', of class DistributedHerder(line 1295 in > DistributedHerder.java). > In another thread, 'taskConfigUpdates' is copied to 'taskConfigUpdatesCopy' > in updateConfigsWithIncrementalCooperative() (line 445 in > DistributedHerder.java). The 'taskConfigUpdatesCopy' is subsequently used in > processTaskConfigUpdatesWithIncrementalCooperative() (line 345 in > DistributedHerder.java). This function then uses 'taskConfigUpdatesCopy' to > find connectors to stop(line 492 in DistributedHerder.java), and finally get > the tasks to stop, which are all the tasks. The worker thread does the actual > job of stop(line 499 in DistributedHerder.java). > In the original code, all the tasks are added to the 'updatedTasks' (line 623 > in KafkaConfigBackingStore.java), which means all the active connectors are > in the 'connectorsWhoseTasksToStop' set, and all the tasks are in the > 'tasksToStop' list. This causes the stops, and of course the subsequent > restarts, of all the tasks. > So, adding only the 'deferred' tasks to the 'updatedTasks' can avoid the > stops and restarts of unnecessary tasks. -- This message was sent by Atlassian Jira (v8.3.2#803003)