[
https://issues.apache.org/jira/browse/KAFKA-8676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Randall Hauch resolved KAFKA-8676.
----------------------------------
Fix Version/s: (was: 2.3.0)
2.3.1
2.4.0
Reviewer: Konstantine Karantasis
Resolution: Fixed
Merged the proposed fix (https://github.com/apache/kafka/pull/7097) and
[~kkonstantine]'s unit test (https://github.com/apache/kafka/pull/7287).
> Avoid Stopping Unnecessary Connectors and Tasks
> ------------------------------------------------
>
> Key: KAFKA-8676
> URL: https://issues.apache.org/jira/browse/KAFKA-8676
> Project: Kafka
> Issue Type: Improvement
> Components: KafkaConnect
> Affects Versions: 2.3.0
> Environment: centOS
> Reporter: Luying Liu
> Priority: Major
> Labels: ready-to-commit
> Fix For: 2.4.0, 2.3.1
>
> Original Estimate: 1h
> Remaining Estimate: 1h
>
> When adding a new connector or changing a connector configuration, Kafka
> Connect 2.3.0 will stop all existing tasks and start all the tasks, including
> the new tasks and the existing ones. However, it is not necessary at all.
> Only the new connector and tasks need to be started. As the rebalancing can
> be applied for both running and suspended tasks.The following patch will fix
> this problem and starts only the new tasks and connectors.
> The problem lies in the
> KafkaConfigBackingStore.ConsumeCallback.onCompletion() function (line 623 in
> KafkaConfigBackingStore.java). When record.key() startsWith "commit-", the
> tasks are being committed, and the deferred tasks are processed, Some new
> tasks are added to the 'updatedTasks'(line 623 in
> KafkaConfigBackingStore.java), and the 'updatedTasks' are sent to
> updateListener to complete the task configuration update(line 638 in
> KafkaConfigBackingStore.java). In the updateListener.onTaskConfigUpdate()
> function, the 'updatedTasks' are added to the member variable,
> 'taskConfigUpdates', of class DistributedHerder(line 1295 in
> DistributedHerder.java).
> In another thread, 'taskConfigUpdates' is copied to 'taskConfigUpdatesCopy'
> in updateConfigsWithIncrementalCooperative() (line 445 in
> DistributedHerder.java). The 'taskConfigUpdatesCopy' is subsequently used in
> processTaskConfigUpdatesWithIncrementalCooperative() (line 345 in
> DistributedHerder.java). This function then uses 'taskConfigUpdatesCopy' to
> find connectors to stop(line 492 in DistributedHerder.java), and finally get
> the tasks to stop, which are all the tasks. The worker thread does the actual
> job of stop(line 499 in DistributedHerder.java).
> In the original code, all the tasks are added to the 'updatedTasks' (line 623
> in KafkaConfigBackingStore.java), which means all the active connectors are
> in the 'connectorsWhoseTasksToStop' set, and all the tasks are in the
> 'tasksToStop' list. This causes the stops, and of course the subsequent
> restarts, of all the tasks.
> So, adding only the 'deferred' tasks to the 'updatedTasks' can avoid the
> stops and restarts of unnecessary tasks.
--
This message was sent by Atlassian Jira
(v8.3.2#803003)