Re: Stopping All Tasks When a New Connector Added

2019-07-22 Thread Liu Luying
Hi Konstantine,
My username is Echolly, with the full name of Luying Liu.

Best,
Luying Liu

From: Konstantine Karantasis 
Sent: Monday, July 22, 2019 9:30
To: dev@kafka.apache.org 
Subject: Re: Stopping All Tasks When a New Connector Added

Liu feel free to share your jira account id on a separate email, so one of
the committers can add you to the project.
Then you or someone else will be able to assign this ticket to you.

I'll review the fix some time this week.

Thanks!
Konstantine

On Mon, Jul 22, 2019 at 5:13 AM Liu Luying  wrote:

> Hi Adam,
> I have already opened a JIRA ticket(KAFKA-8676<
> https://issues.apache.org/jira/browse/KAFKA-8676>) and a PR(with the
> title of KAFKA-8676<https://issues.apache.org/jira/browse/KAFKA-8676>)
> for this.
>
> Best,
> Luying
> 
> From: Adam Bellemare 
> Sent: Friday, July 19, 2019 10:36
> To: dev@kafka.apache.org 
> Subject: Re: Stopping All Tasks When a New Connector Added
>
> Hi Luying
>
> Would you be willing to make a PR to address this? It seems that you have
> already done most of the work.
>
> Thanks
> Adam
>
> On Thu, Jul 18, 2019 at 11:00 PM Liu Luying  wrote:
>
> > Hi all,
> > I have noticed that Kafka Connect 2.3.0 will stop all existing tasks and
> > then start all the tasks, including the new tasks and the existing ones
> > when adding a new connector or changing a connector configuration.
> However,
> > I do not think it is a must. Only the new connector and tasks need to be
> > started. As the rebalancing can be applied for both running and suspended
> > tasks.
> >
> > The problem lies in the
> > KafkaConfigBackingStore.ConsumeCallback.onCompletion() function (line 623
> > in KafkaConfigBackingStore.java). When record.key() startsWith "commit-",
> > the tasks are being committed, and the deferred tasks are processed, Some
> > new tasks are added to the 'updatedTasks'(line 623 in
> > KafkaConfigBackingStore.java), and the 'updatedTasks' are sent to
> > updateListener to complete the task configuration update(line 638 in
> > KafkaConfigBackingStore.java). In the updateListener.onTaskConfigUpdate()
> > function, the  'updatedTasks' are added to the member variable,
> > 'taskConfigUpdates', of class DistributedHerder(line 1295 in
> > DistributedHerder.java).
> >
> > In another thread, 'taskConfigUpdates' is copied to
> > 'taskConfigUpdatesCopy' in updateConfigsWithIncrementalCooperative()
> (line
> > 445 in DistributedHerder.java). The 'taskConfigUpdatesCopy' is
> subsequently
> > used in processTaskConfigUpdatesWithIncrementalCooperative() (line 345 in
> > DistributedHerder.java). This function then uses  'taskConfigUpdatesCopy'
> > to find connectors to stop(line 492 in DistributedHerder.java), and
> finally
> > get the tasks to stop, which are all the tasks. The worker thread does
> the
> > actual job of stop(line 499 in DistributedHerder.java).
> >
> > In the original code, all the tasks are added to the 'updatedTasks' (line
> > 623 in KafkaConfigBackingStore.java), which means all the active
> connectors
> > are in the 'connectorsWhoseTasksToStop' set, and all the tasks are in the
> > 'tasksToStop' list. This causes the stops, and of course the subsequent
> > restarts, of all the tasks.
> >
> > So, adding only the 'deferred' tasks to the  'updatedTasks' can avoid the
> > stops and restarts of unnecessary tasks.
> >
> > Best,
> > Luying
> >
> >
>


Re: Stopping All Tasks When a New Connector Added

2019-07-22 Thread Konstantine Karantasis
Liu feel free to share your jira account id on a separate email, so one of
the committers can add you to the project.
Then you or someone else will be able to assign this ticket to you.

I'll review the fix some time this week.

Thanks!
Konstantine

On Mon, Jul 22, 2019 at 5:13 AM Liu Luying  wrote:

> Hi Adam,
> I have already opened a JIRA ticket(KAFKA-8676<
> https://issues.apache.org/jira/browse/KAFKA-8676>) and a PR(with the
> title of KAFKA-8676<https://issues.apache.org/jira/browse/KAFKA-8676>)
> for this.
>
> Best,
> Luying
> 
> From: Adam Bellemare 
> Sent: Friday, July 19, 2019 10:36
> To: dev@kafka.apache.org 
> Subject: Re: Stopping All Tasks When a New Connector Added
>
> Hi Luying
>
> Would you be willing to make a PR to address this? It seems that you have
> already done most of the work.
>
> Thanks
> Adam
>
> On Thu, Jul 18, 2019 at 11:00 PM Liu Luying  wrote:
>
> > Hi all,
> > I have noticed that Kafka Connect 2.3.0 will stop all existing tasks and
> > then start all the tasks, including the new tasks and the existing ones
> > when adding a new connector or changing a connector configuration.
> However,
> > I do not think it is a must. Only the new connector and tasks need to be
> > started. As the rebalancing can be applied for both running and suspended
> > tasks.
> >
> > The problem lies in the
> > KafkaConfigBackingStore.ConsumeCallback.onCompletion() function (line 623
> > in KafkaConfigBackingStore.java). When record.key() startsWith "commit-",
> > the tasks are being committed, and the deferred tasks are processed, Some
> > new tasks are added to the 'updatedTasks'(line 623 in
> > KafkaConfigBackingStore.java), and the 'updatedTasks' are sent to
> > updateListener to complete the task configuration update(line 638 in
> > KafkaConfigBackingStore.java). In the updateListener.onTaskConfigUpdate()
> > function, the  'updatedTasks' are added to the member variable,
> > 'taskConfigUpdates', of class DistributedHerder(line 1295 in
> > DistributedHerder.java).
> >
> > In another thread, 'taskConfigUpdates' is copied to
> > 'taskConfigUpdatesCopy' in updateConfigsWithIncrementalCooperative()
> (line
> > 445 in DistributedHerder.java). The 'taskConfigUpdatesCopy' is
> subsequently
> > used in processTaskConfigUpdatesWithIncrementalCooperative() (line 345 in
> > DistributedHerder.java). This function then uses  'taskConfigUpdatesCopy'
> > to find connectors to stop(line 492 in DistributedHerder.java), and
> finally
> > get the tasks to stop, which are all the tasks. The worker thread does
> the
> > actual job of stop(line 499 in DistributedHerder.java).
> >
> > In the original code, all the tasks are added to the 'updatedTasks' (line
> > 623 in KafkaConfigBackingStore.java), which means all the active
> connectors
> > are in the 'connectorsWhoseTasksToStop' set, and all the tasks are in the
> > 'tasksToStop' list. This causes the stops, and of course the subsequent
> > restarts, of all the tasks.
> >
> > So, adding only the 'deferred' tasks to the  'updatedTasks' can avoid the
> > stops and restarts of unnecessary tasks.
> >
> > Best,
> > Luying
> >
> >
>


Re: Stopping All Tasks When a New Connector Added

2019-07-21 Thread Liu Luying
Hi Adam,
I have already opened a JIRA 
ticket(KAFKA-8676<https://issues.apache.org/jira/browse/KAFKA-8676>) and a 
PR(with the title of 
KAFKA-8676<https://issues.apache.org/jira/browse/KAFKA-8676>) for this.

Best,
Luying

From: Adam Bellemare 
Sent: Friday, July 19, 2019 10:36
To: dev@kafka.apache.org 
Subject: Re: Stopping All Tasks When a New Connector Added

Hi Luying

Would you be willing to make a PR to address this? It seems that you have
already done most of the work.

Thanks
Adam

On Thu, Jul 18, 2019 at 11:00 PM Liu Luying  wrote:

> Hi all,
> I have noticed that Kafka Connect 2.3.0 will stop all existing tasks and
> then start all the tasks, including the new tasks and the existing ones
> when adding a new connector or changing a connector configuration. However,
> I do not think it is a must. Only the new connector and tasks need to be
> started. As the rebalancing can be applied for both running and suspended
> tasks.
>
> The problem lies in the
> KafkaConfigBackingStore.ConsumeCallback.onCompletion() function (line 623
> in KafkaConfigBackingStore.java). When record.key() startsWith "commit-",
> the tasks are being committed, and the deferred tasks are processed, Some
> new tasks are added to the 'updatedTasks'(line 623 in
> KafkaConfigBackingStore.java), and the 'updatedTasks' are sent to
> updateListener to complete the task configuration update(line 638 in
> KafkaConfigBackingStore.java). In the updateListener.onTaskConfigUpdate()
> function, the  'updatedTasks' are added to the member variable,
> 'taskConfigUpdates', of class DistributedHerder(line 1295 in
> DistributedHerder.java).
>
> In another thread, 'taskConfigUpdates' is copied to
> 'taskConfigUpdatesCopy' in updateConfigsWithIncrementalCooperative() (line
> 445 in DistributedHerder.java). The 'taskConfigUpdatesCopy' is subsequently
> used in processTaskConfigUpdatesWithIncrementalCooperative() (line 345 in
> DistributedHerder.java). This function then uses  'taskConfigUpdatesCopy'
> to find connectors to stop(line 492 in DistributedHerder.java), and finally
> get the tasks to stop, which are all the tasks. The worker thread does the
> actual job of stop(line 499 in DistributedHerder.java).
>
> In the original code, all the tasks are added to the 'updatedTasks' (line
> 623 in KafkaConfigBackingStore.java), which means all the active connectors
> are in the 'connectorsWhoseTasksToStop' set, and all the tasks are in the
> 'tasksToStop' list. This causes the stops, and of course the subsequent
> restarts, of all the tasks.
>
> So, adding only the 'deferred' tasks to the  'updatedTasks' can avoid the
> stops and restarts of unnecessary tasks.
>
> Best,
> Luying
>
>


Re: Stopping All Tasks When a New Connector Added

2019-07-19 Thread Adam Bellemare
Hi Luying

Would you be willing to make a PR to address this? It seems that you have
already done most of the work.

Thanks
Adam

On Thu, Jul 18, 2019 at 11:00 PM Liu Luying  wrote:

> Hi all,
> I have noticed that Kafka Connect 2.3.0 will stop all existing tasks and
> then start all the tasks, including the new tasks and the existing ones
> when adding a new connector or changing a connector configuration. However,
> I do not think it is a must. Only the new connector and tasks need to be
> started. As the rebalancing can be applied for both running and suspended
> tasks.
>
> The problem lies in the
> KafkaConfigBackingStore.ConsumeCallback.onCompletion() function (line 623
> in KafkaConfigBackingStore.java). When record.key() startsWith "commit-",
> the tasks are being committed, and the deferred tasks are processed, Some
> new tasks are added to the 'updatedTasks'(line 623 in
> KafkaConfigBackingStore.java), and the 'updatedTasks' are sent to
> updateListener to complete the task configuration update(line 638 in
> KafkaConfigBackingStore.java). In the updateListener.onTaskConfigUpdate()
> function, the  'updatedTasks' are added to the member variable,
> 'taskConfigUpdates', of class DistributedHerder(line 1295 in
> DistributedHerder.java).
>
> In another thread, 'taskConfigUpdates' is copied to
> 'taskConfigUpdatesCopy' in updateConfigsWithIncrementalCooperative() (line
> 445 in DistributedHerder.java). The 'taskConfigUpdatesCopy' is subsequently
> used in processTaskConfigUpdatesWithIncrementalCooperative() (line 345 in
> DistributedHerder.java). This function then uses  'taskConfigUpdatesCopy'
> to find connectors to stop(line 492 in DistributedHerder.java), and finally
> get the tasks to stop, which are all the tasks. The worker thread does the
> actual job of stop(line 499 in DistributedHerder.java).
>
> In the original code, all the tasks are added to the 'updatedTasks' (line
> 623 in KafkaConfigBackingStore.java), which means all the active connectors
> are in the 'connectorsWhoseTasksToStop' set, and all the tasks are in the
> 'tasksToStop' list. This causes the stops, and of course the subsequent
> restarts, of all the tasks.
>
> So, adding only the 'deferred' tasks to the  'updatedTasks' can avoid the
> stops and restarts of unnecessary tasks.
>
> Best,
> Luying
>
>