[ 
https://issues.apache.org/jira/browse/KAFKA-8676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-8676.
----------------------------------
    Fix Version/s:     (was: 2.3.0)
                   2.3.1
                   2.4.0
         Reviewer: Konstantine Karantasis
       Resolution: Fixed

Merged the proposed fix (https://github.com/apache/kafka/pull/7097) and 
[~kkonstantine]'s unit test (https://github.com/apache/kafka/pull/7287).

> Avoid Stopping Unnecessary Connectors and Tasks 
> ------------------------------------------------
>
>                 Key: KAFKA-8676
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8676
>             Project: Kafka
>          Issue Type: Improvement
>          Components: KafkaConnect
>    Affects Versions: 2.3.0
>         Environment: centOS
>            Reporter: Luying Liu
>            Priority: Major
>              Labels: ready-to-commit
>             Fix For: 2.4.0, 2.3.1
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> When adding a new connector or changing a connector configuration, Kafka 
> Connect 2.3.0 will stop all existing tasks and start all the tasks, including 
> the new tasks and the existing ones. However, it is not necessary at all. 
> Only the new connector and tasks need to be started. As the rebalancing can 
> be applied for both running and suspended tasks.The following patch will fix 
> this problem and starts only the new tasks and connectors.
> The problem lies in the 
> KafkaConfigBackingStore.ConsumeCallback.onCompletion() function (line 623 in 
> KafkaConfigBackingStore.java). When record.key() startsWith "commit-", the 
> tasks are being committed, and the deferred tasks are processed, Some new 
> tasks are added to the 'updatedTasks'(line 623 in 
> KafkaConfigBackingStore.java), and the 'updatedTasks' are sent to 
> updateListener to complete the task configuration update(line 638 in 
> KafkaConfigBackingStore.java). In the updateListener.onTaskConfigUpdate() 
> function, the  'updatedTasks' are added to the member variable, 
> 'taskConfigUpdates', of class DistributedHerder(line 1295 in 
> DistributedHerder.java).
> In another thread, 'taskConfigUpdates' is copied to 'taskConfigUpdatesCopy' 
> in updateConfigsWithIncrementalCooperative() (line 445 in 
> DistributedHerder.java). The 'taskConfigUpdatesCopy' is subsequently used in 
> processTaskConfigUpdatesWithIncrementalCooperative() (line 345 in 
> DistributedHerder.java). This function then uses  'taskConfigUpdatesCopy' to 
> find connectors to stop(line 492 in DistributedHerder.java), and finally get 
> the tasks to stop, which are all the tasks. The worker thread does the actual 
> job of stop(line 499 in DistributedHerder.java). 
> In the original code, all the tasks are added to the 'updatedTasks' (line 623 
> in KafkaConfigBackingStore.java), which means all the active connectors are 
> in the 'connectorsWhoseTasksToStop' set, and all the tasks are in the 
> 'tasksToStop' list. This causes the stops, and of course the subsequent 
> restarts, of all the tasks. 
> So, adding only the 'deferred' tasks to the  'updatedTasks' can avoid the 
> stops and restarts of unnecessary tasks.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to