Re: Stopping All Tasks When a New Connector Added
Hi Konstantine, My username is Echolly, with the full name of Luying Liu. Best, Luying Liu From: Konstantine Karantasis Sent: Monday, July 22, 2019 9:30 To: dev@kafka.apache.org Subject: Re: Stopping All Tasks When a New Connector Added Liu feel free to share your jira account id on a separate email, so one of the committers can add you to the project. Then you or someone else will be able to assign this ticket to you. I'll review the fix some time this week. Thanks! Konstantine On Mon, Jul 22, 2019 at 5:13 AM Liu Luying wrote: > Hi Adam, > I have already opened a JIRA ticket(KAFKA-8676< > https://issues.apache.org/jira/browse/KAFKA-8676>) and a PR(with the > title of KAFKA-8676<https://issues.apache.org/jira/browse/KAFKA-8676>) > for this. > > Best, > Luying > > From: Adam Bellemare > Sent: Friday, July 19, 2019 10:36 > To: dev@kafka.apache.org > Subject: Re: Stopping All Tasks When a New Connector Added > > Hi Luying > > Would you be willing to make a PR to address this? It seems that you have > already done most of the work. > > Thanks > Adam > > On Thu, Jul 18, 2019 at 11:00 PM Liu Luying wrote: > > > Hi all, > > I have noticed that Kafka Connect 2.3.0 will stop all existing tasks and > > then start all the tasks, including the new tasks and the existing ones > > when adding a new connector or changing a connector configuration. > However, > > I do not think it is a must. Only the new connector and tasks need to be > > started. As the rebalancing can be applied for both running and suspended > > tasks. > > > > The problem lies in the > > KafkaConfigBackingStore.ConsumeCallback.onCompletion() function (line 623 > > in KafkaConfigBackingStore.java). When record.key() startsWith "commit-", > > the tasks are being committed, and the deferred tasks are processed, Some > > new tasks are added to the 'updatedTasks'(line 623 in > > KafkaConfigBackingStore.java), and the 'updatedTasks' are sent to > > updateListener to complete the task configuration update(line 638 in > > KafkaConfigBackingStore.java). In the updateListener.onTaskConfigUpdate() > > function, the 'updatedTasks' are added to the member variable, > > 'taskConfigUpdates', of class DistributedHerder(line 1295 in > > DistributedHerder.java). > > > > In another thread, 'taskConfigUpdates' is copied to > > 'taskConfigUpdatesCopy' in updateConfigsWithIncrementalCooperative() > (line > > 445 in DistributedHerder.java). The 'taskConfigUpdatesCopy' is > subsequently > > used in processTaskConfigUpdatesWithIncrementalCooperative() (line 345 in > > DistributedHerder.java). This function then uses 'taskConfigUpdatesCopy' > > to find connectors to stop(line 492 in DistributedHerder.java), and > finally > > get the tasks to stop, which are all the tasks. The worker thread does > the > > actual job of stop(line 499 in DistributedHerder.java). > > > > In the original code, all the tasks are added to the 'updatedTasks' (line > > 623 in KafkaConfigBackingStore.java), which means all the active > connectors > > are in the 'connectorsWhoseTasksToStop' set, and all the tasks are in the > > 'tasksToStop' list. This causes the stops, and of course the subsequent > > restarts, of all the tasks. > > > > So, adding only the 'deferred' tasks to the 'updatedTasks' can avoid the > > stops and restarts of unnecessary tasks. > > > > Best, > > Luying > > > > >
Re: Stopping All Tasks When a New Connector Added
Liu feel free to share your jira account id on a separate email, so one of the committers can add you to the project. Then you or someone else will be able to assign this ticket to you. I'll review the fix some time this week. Thanks! Konstantine On Mon, Jul 22, 2019 at 5:13 AM Liu Luying wrote: > Hi Adam, > I have already opened a JIRA ticket(KAFKA-8676< > https://issues.apache.org/jira/browse/KAFKA-8676>) and a PR(with the > title of KAFKA-8676<https://issues.apache.org/jira/browse/KAFKA-8676>) > for this. > > Best, > Luying > > From: Adam Bellemare > Sent: Friday, July 19, 2019 10:36 > To: dev@kafka.apache.org > Subject: Re: Stopping All Tasks When a New Connector Added > > Hi Luying > > Would you be willing to make a PR to address this? It seems that you have > already done most of the work. > > Thanks > Adam > > On Thu, Jul 18, 2019 at 11:00 PM Liu Luying wrote: > > > Hi all, > > I have noticed that Kafka Connect 2.3.0 will stop all existing tasks and > > then start all the tasks, including the new tasks and the existing ones > > when adding a new connector or changing a connector configuration. > However, > > I do not think it is a must. Only the new connector and tasks need to be > > started. As the rebalancing can be applied for both running and suspended > > tasks. > > > > The problem lies in the > > KafkaConfigBackingStore.ConsumeCallback.onCompletion() function (line 623 > > in KafkaConfigBackingStore.java). When record.key() startsWith "commit-", > > the tasks are being committed, and the deferred tasks are processed, Some > > new tasks are added to the 'updatedTasks'(line 623 in > > KafkaConfigBackingStore.java), and the 'updatedTasks' are sent to > > updateListener to complete the task configuration update(line 638 in > > KafkaConfigBackingStore.java). In the updateListener.onTaskConfigUpdate() > > function, the 'updatedTasks' are added to the member variable, > > 'taskConfigUpdates', of class DistributedHerder(line 1295 in > > DistributedHerder.java). > > > > In another thread, 'taskConfigUpdates' is copied to > > 'taskConfigUpdatesCopy' in updateConfigsWithIncrementalCooperative() > (line > > 445 in DistributedHerder.java). The 'taskConfigUpdatesCopy' is > subsequently > > used in processTaskConfigUpdatesWithIncrementalCooperative() (line 345 in > > DistributedHerder.java). This function then uses 'taskConfigUpdatesCopy' > > to find connectors to stop(line 492 in DistributedHerder.java), and > finally > > get the tasks to stop, which are all the tasks. The worker thread does > the > > actual job of stop(line 499 in DistributedHerder.java). > > > > In the original code, all the tasks are added to the 'updatedTasks' (line > > 623 in KafkaConfigBackingStore.java), which means all the active > connectors > > are in the 'connectorsWhoseTasksToStop' set, and all the tasks are in the > > 'tasksToStop' list. This causes the stops, and of course the subsequent > > restarts, of all the tasks. > > > > So, adding only the 'deferred' tasks to the 'updatedTasks' can avoid the > > stops and restarts of unnecessary tasks. > > > > Best, > > Luying > > > > >
Re: Stopping All Tasks When a New Connector Added
Hi Adam, I have already opened a JIRA ticket(KAFKA-8676<https://issues.apache.org/jira/browse/KAFKA-8676>) and a PR(with the title of KAFKA-8676<https://issues.apache.org/jira/browse/KAFKA-8676>) for this. Best, Luying From: Adam Bellemare Sent: Friday, July 19, 2019 10:36 To: dev@kafka.apache.org Subject: Re: Stopping All Tasks When a New Connector Added Hi Luying Would you be willing to make a PR to address this? It seems that you have already done most of the work. Thanks Adam On Thu, Jul 18, 2019 at 11:00 PM Liu Luying wrote: > Hi all, > I have noticed that Kafka Connect 2.3.0 will stop all existing tasks and > then start all the tasks, including the new tasks and the existing ones > when adding a new connector or changing a connector configuration. However, > I do not think it is a must. Only the new connector and tasks need to be > started. As the rebalancing can be applied for both running and suspended > tasks. > > The problem lies in the > KafkaConfigBackingStore.ConsumeCallback.onCompletion() function (line 623 > in KafkaConfigBackingStore.java). When record.key() startsWith "commit-", > the tasks are being committed, and the deferred tasks are processed, Some > new tasks are added to the 'updatedTasks'(line 623 in > KafkaConfigBackingStore.java), and the 'updatedTasks' are sent to > updateListener to complete the task configuration update(line 638 in > KafkaConfigBackingStore.java). In the updateListener.onTaskConfigUpdate() > function, the 'updatedTasks' are added to the member variable, > 'taskConfigUpdates', of class DistributedHerder(line 1295 in > DistributedHerder.java). > > In another thread, 'taskConfigUpdates' is copied to > 'taskConfigUpdatesCopy' in updateConfigsWithIncrementalCooperative() (line > 445 in DistributedHerder.java). The 'taskConfigUpdatesCopy' is subsequently > used in processTaskConfigUpdatesWithIncrementalCooperative() (line 345 in > DistributedHerder.java). This function then uses 'taskConfigUpdatesCopy' > to find connectors to stop(line 492 in DistributedHerder.java), and finally > get the tasks to stop, which are all the tasks. The worker thread does the > actual job of stop(line 499 in DistributedHerder.java). > > In the original code, all the tasks are added to the 'updatedTasks' (line > 623 in KafkaConfigBackingStore.java), which means all the active connectors > are in the 'connectorsWhoseTasksToStop' set, and all the tasks are in the > 'tasksToStop' list. This causes the stops, and of course the subsequent > restarts, of all the tasks. > > So, adding only the 'deferred' tasks to the 'updatedTasks' can avoid the > stops and restarts of unnecessary tasks. > > Best, > Luying > >
Re: Stopping All Tasks When a New Connector Added
Hi Luying Would you be willing to make a PR to address this? It seems that you have already done most of the work. Thanks Adam On Thu, Jul 18, 2019 at 11:00 PM Liu Luying wrote: > Hi all, > I have noticed that Kafka Connect 2.3.0 will stop all existing tasks and > then start all the tasks, including the new tasks and the existing ones > when adding a new connector or changing a connector configuration. However, > I do not think it is a must. Only the new connector and tasks need to be > started. As the rebalancing can be applied for both running and suspended > tasks. > > The problem lies in the > KafkaConfigBackingStore.ConsumeCallback.onCompletion() function (line 623 > in KafkaConfigBackingStore.java). When record.key() startsWith "commit-", > the tasks are being committed, and the deferred tasks are processed, Some > new tasks are added to the 'updatedTasks'(line 623 in > KafkaConfigBackingStore.java), and the 'updatedTasks' are sent to > updateListener to complete the task configuration update(line 638 in > KafkaConfigBackingStore.java). In the updateListener.onTaskConfigUpdate() > function, the 'updatedTasks' are added to the member variable, > 'taskConfigUpdates', of class DistributedHerder(line 1295 in > DistributedHerder.java). > > In another thread, 'taskConfigUpdates' is copied to > 'taskConfigUpdatesCopy' in updateConfigsWithIncrementalCooperative() (line > 445 in DistributedHerder.java). The 'taskConfigUpdatesCopy' is subsequently > used in processTaskConfigUpdatesWithIncrementalCooperative() (line 345 in > DistributedHerder.java). This function then uses 'taskConfigUpdatesCopy' > to find connectors to stop(line 492 in DistributedHerder.java), and finally > get the tasks to stop, which are all the tasks. The worker thread does the > actual job of stop(line 499 in DistributedHerder.java). > > In the original code, all the tasks are added to the 'updatedTasks' (line > 623 in KafkaConfigBackingStore.java), which means all the active connectors > are in the 'connectorsWhoseTasksToStop' set, and all the tasks are in the > 'tasksToStop' list. This causes the stops, and of course the subsequent > restarts, of all the tasks. > > So, adding only the 'deferred' tasks to the 'updatedTasks' can avoid the > stops and restarts of unnecessary tasks. > > Best, > Luying > >
Stopping All Tasks When a New Connector Added
Hi all, I have noticed that Kafka Connect 2.3.0 will stop all existing tasks and then start all the tasks, including the new tasks and the existing ones when adding a new connector or changing a connector configuration. However, I do not think it is a must. Only the new connector and tasks need to be started. As the rebalancing can be applied for both running and suspended tasks. The problem lies in the KafkaConfigBackingStore.ConsumeCallback.onCompletion() function (line 623 in KafkaConfigBackingStore.java). When record.key() startsWith "commit-", the tasks are being committed, and the deferred tasks are processed, Some new tasks are added to the 'updatedTasks'(line 623 in KafkaConfigBackingStore.java), and the 'updatedTasks' are sent to updateListener to complete the task configuration update(line 638 in KafkaConfigBackingStore.java). In the updateListener.onTaskConfigUpdate() function, the 'updatedTasks' are added to the member variable, 'taskConfigUpdates', of class DistributedHerder(line 1295 in DistributedHerder.java). In another thread, 'taskConfigUpdates' is copied to 'taskConfigUpdatesCopy' in updateConfigsWithIncrementalCooperative() (line 445 in DistributedHerder.java). The 'taskConfigUpdatesCopy' is subsequently used in processTaskConfigUpdatesWithIncrementalCooperative() (line 345 in DistributedHerder.java). This function then uses 'taskConfigUpdatesCopy' to find connectors to stop(line 492 in DistributedHerder.java), and finally get the tasks to stop, which are all the tasks. The worker thread does the actual job of stop(line 499 in DistributedHerder.java). In the original code, all the tasks are added to the 'updatedTasks' (line 623 in KafkaConfigBackingStore.java), which means all the active connectors are in the 'connectorsWhoseTasksToStop' set, and all the tasks are in the 'tasksToStop' list. This causes the stops, and of course the subsequent restarts, of all the tasks. So, adding only the 'deferred' tasks to the 'updatedTasks' can avoid the stops and restarts of unnecessary tasks. Best, Luying