Re: Stopping All Tasks When a New Connector Added

2019-07-22 Thread Liu Luying
Hi Konstantine,
My username is Echolly, with the full name of Luying Liu.

Best,
Luying Liu

From: Konstantine Karantasis 
Sent: Monday, July 22, 2019 9:30
To: dev@kafka.apache.org 
Subject: Re: Stopping All Tasks When a New Connector Added

Liu feel free to share your jira account id on a separate email, so one of
the committers can add you to the project.
Then you or someone else will be able to assign this ticket to you.

I'll review the fix some time this week.

Thanks!
Konstantine

On Mon, Jul 22, 2019 at 5:13 AM Liu Luying  wrote:

> Hi Adam,
> I have already opened a JIRA ticket(KAFKA-8676<
> https://issues.apache.org/jira/browse/KAFKA-8676>) and a PR(with the
> title of KAFKA-8676<https://issues.apache.org/jira/browse/KAFKA-8676>)
> for this.
>
> Best,
> Luying
> 
> From: Adam Bellemare 
> Sent: Friday, July 19, 2019 10:36
> To: dev@kafka.apache.org 
> Subject: Re: Stopping All Tasks When a New Connector Added
>
> Hi Luying
>
> Would you be willing to make a PR to address this? It seems that you have
> already done most of the work.
>
> Thanks
> Adam
>
> On Thu, Jul 18, 2019 at 11:00 PM Liu Luying  wrote:
>
> > Hi all,
> > I have noticed that Kafka Connect 2.3.0 will stop all existing tasks and
> > then start all the tasks, including the new tasks and the existing ones
> > when adding a new connector or changing a connector configuration.
> However,
> > I do not think it is a must. Only the new connector and tasks need to be
> > started. As the rebalancing can be applied for both running and suspended
> > tasks.
> >
> > The problem lies in the
> > KafkaConfigBackingStore.ConsumeCallback.onCompletion() function (line 623
> > in KafkaConfigBackingStore.java). When record.key() startsWith "commit-",
> > the tasks are being committed, and the deferred tasks are processed, Some
> > new tasks are added to the 'updatedTasks'(line 623 in
> > KafkaConfigBackingStore.java), and the 'updatedTasks' are sent to
> > updateListener to complete the task configuration update(line 638 in
> > KafkaConfigBackingStore.java). In the updateListener.onTaskConfigUpdate()
> > function, the  'updatedTasks' are added to the member variable,
> > 'taskConfigUpdates', of class DistributedHerder(line 1295 in
> > DistributedHerder.java).
> >
> > In another thread, 'taskConfigUpdates' is copied to
> > 'taskConfigUpdatesCopy' in updateConfigsWithIncrementalCooperative()
> (line
> > 445 in DistributedHerder.java). The 'taskConfigUpdatesCopy' is
> subsequently
> > used in processTaskConfigUpdatesWithIncrementalCooperative() (line 345 in
> > DistributedHerder.java). This function then uses  'taskConfigUpdatesCopy'
> > to find connectors to stop(line 492 in DistributedHerder.java), and
> finally
> > get the tasks to stop, which are all the tasks. The worker thread does
> the
> > actual job of stop(line 499 in DistributedHerder.java).
> >
> > In the original code, all the tasks are added to the 'updatedTasks' (line
> > 623 in KafkaConfigBackingStore.java), which means all the active
> connectors
> > are in the 'connectorsWhoseTasksToStop' set, and all the tasks are in the
> > 'tasksToStop' list. This causes the stops, and of course the subsequent
> > restarts, of all the tasks.
> >
> > So, adding only the 'deferred' tasks to the  'updatedTasks' can avoid the
> > stops and restarts of unnecessary tasks.
> >
> > Best,
> > Luying
> >
> >
>


Re: Stopping All Tasks When a New Connector Added

2019-07-22 Thread Konstantine Karantasis
Liu feel free to share your jira account id on a separate email, so one of
the committers can add you to the project.
Then you or someone else will be able to assign this ticket to you.

I'll review the fix some time this week.

Thanks!
Konstantine

On Mon, Jul 22, 2019 at 5:13 AM Liu Luying  wrote:

> Hi Adam,
> I have already opened a JIRA ticket(KAFKA-8676<
> https://issues.apache.org/jira/browse/KAFKA-8676>) and a PR(with the
> title of KAFKA-8676<https://issues.apache.org/jira/browse/KAFKA-8676>)
> for this.
>
> Best,
> Luying
> 
> From: Adam Bellemare 
> Sent: Friday, July 19, 2019 10:36
> To: dev@kafka.apache.org 
> Subject: Re: Stopping All Tasks When a New Connector Added
>
> Hi Luying
>
> Would you be willing to make a PR to address this? It seems that you have
> already done most of the work.
>
> Thanks
> Adam
>
> On Thu, Jul 18, 2019 at 11:00 PM Liu Luying  wrote:
>
> > Hi all,
> > I have noticed that Kafka Connect 2.3.0 will stop all existing tasks and
> > then start all the tasks, including the new tasks and the existing ones
> > when adding a new connector or changing a connector configuration.
> However,
> > I do not think it is a must. Only the new connector and tasks need to be
> > started. As the rebalancing can be applied for both running and suspended
> > tasks.
> >
> > The problem lies in the
> > KafkaConfigBackingStore.ConsumeCallback.onCompletion() function (line 623
> > in KafkaConfigBackingStore.java). When record.key() startsWith "commit-",
> > the tasks are being committed, and the deferred tasks are processed, Some
> > new tasks are added to the 'updatedTasks'(line 623 in
> > KafkaConfigBackingStore.java), and the 'updatedTasks' are sent to
> > updateListener to complete the task configuration update(line 638 in
> > KafkaConfigBackingStore.java). In the updateListener.onTaskConfigUpdate()
> > function, the  'updatedTasks' are added to the member variable,
> > 'taskConfigUpdates', of class DistributedHerder(line 1295 in
> > DistributedHerder.java).
> >
> > In another thread, 'taskConfigUpdates' is copied to
> > 'taskConfigUpdatesCopy' in updateConfigsWithIncrementalCooperative()
> (line
> > 445 in DistributedHerder.java). The 'taskConfigUpdatesCopy' is
> subsequently
> > used in processTaskConfigUpdatesWithIncrementalCooperative() (line 345 in
> > DistributedHerder.java). This function then uses  'taskConfigUpdatesCopy'
> > to find connectors to stop(line 492 in DistributedHerder.java), and
> finally
> > get the tasks to stop, which are all the tasks. The worker thread does
> the
> > actual job of stop(line 499 in DistributedHerder.java).
> >
> > In the original code, all the tasks are added to the 'updatedTasks' (line
> > 623 in KafkaConfigBackingStore.java), which means all the active
> connectors
> > are in the 'connectorsWhoseTasksToStop' set, and all the tasks are in the
> > 'tasksToStop' list. This causes the stops, and of course the subsequent
> > restarts, of all the tasks.
> >
> > So, adding only the 'deferred' tasks to the  'updatedTasks' can avoid the
> > stops and restarts of unnecessary tasks.
> >
> > Best,
> > Luying
> >
> >
>


Re: Stopping All Tasks When a New Connector Added

2019-07-21 Thread Liu Luying
Hi Adam,
I have already opened a JIRA 
ticket(KAFKA-8676<https://issues.apache.org/jira/browse/KAFKA-8676>) and a 
PR(with the title of 
KAFKA-8676<https://issues.apache.org/jira/browse/KAFKA-8676>) for this.

Best,
Luying

From: Adam Bellemare 
Sent: Friday, July 19, 2019 10:36
To: dev@kafka.apache.org 
Subject: Re: Stopping All Tasks When a New Connector Added

Hi Luying

Would you be willing to make a PR to address this? It seems that you have
already done most of the work.

Thanks
Adam

On Thu, Jul 18, 2019 at 11:00 PM Liu Luying  wrote:

> Hi all,
> I have noticed that Kafka Connect 2.3.0 will stop all existing tasks and
> then start all the tasks, including the new tasks and the existing ones
> when adding a new connector or changing a connector configuration. However,
> I do not think it is a must. Only the new connector and tasks need to be
> started. As the rebalancing can be applied for both running and suspended
> tasks.
>
> The problem lies in the
> KafkaConfigBackingStore.ConsumeCallback.onCompletion() function (line 623
> in KafkaConfigBackingStore.java). When record.key() startsWith "commit-",
> the tasks are being committed, and the deferred tasks are processed, Some
> new tasks are added to the 'updatedTasks'(line 623 in
> KafkaConfigBackingStore.java), and the 'updatedTasks' are sent to
> updateListener to complete the task configuration update(line 638 in
> KafkaConfigBackingStore.java). In the updateListener.onTaskConfigUpdate()
> function, the  'updatedTasks' are added to the member variable,
> 'taskConfigUpdates', of class DistributedHerder(line 1295 in
> DistributedHerder.java).
>
> In another thread, 'taskConfigUpdates' is copied to
> 'taskConfigUpdatesCopy' in updateConfigsWithIncrementalCooperative() (line
> 445 in DistributedHerder.java). The 'taskConfigUpdatesCopy' is subsequently
> used in processTaskConfigUpdatesWithIncrementalCooperative() (line 345 in
> DistributedHerder.java). This function then uses  'taskConfigUpdatesCopy'
> to find connectors to stop(line 492 in DistributedHerder.java), and finally
> get the tasks to stop, which are all the tasks. The worker thread does the
> actual job of stop(line 499 in DistributedHerder.java).
>
> In the original code, all the tasks are added to the 'updatedTasks' (line
> 623 in KafkaConfigBackingStore.java), which means all the active connectors
> are in the 'connectorsWhoseTasksToStop' set, and all the tasks are in the
> 'tasksToStop' list. This causes the stops, and of course the subsequent
> restarts, of all the tasks.
>
> So, adding only the 'deferred' tasks to the  'updatedTasks' can avoid the
> stops and restarts of unnecessary tasks.
>
> Best,
> Luying
>
>


Re: Stopping All Tasks When a New Connector Added

2019-07-19 Thread Adam Bellemare
Hi Luying

Would you be willing to make a PR to address this? It seems that you have
already done most of the work.

Thanks
Adam

On Thu, Jul 18, 2019 at 11:00 PM Liu Luying  wrote:

> Hi all,
> I have noticed that Kafka Connect 2.3.0 will stop all existing tasks and
> then start all the tasks, including the new tasks and the existing ones
> when adding a new connector or changing a connector configuration. However,
> I do not think it is a must. Only the new connector and tasks need to be
> started. As the rebalancing can be applied for both running and suspended
> tasks.
>
> The problem lies in the
> KafkaConfigBackingStore.ConsumeCallback.onCompletion() function (line 623
> in KafkaConfigBackingStore.java). When record.key() startsWith "commit-",
> the tasks are being committed, and the deferred tasks are processed, Some
> new tasks are added to the 'updatedTasks'(line 623 in
> KafkaConfigBackingStore.java), and the 'updatedTasks' are sent to
> updateListener to complete the task configuration update(line 638 in
> KafkaConfigBackingStore.java). In the updateListener.onTaskConfigUpdate()
> function, the  'updatedTasks' are added to the member variable,
> 'taskConfigUpdates', of class DistributedHerder(line 1295 in
> DistributedHerder.java).
>
> In another thread, 'taskConfigUpdates' is copied to
> 'taskConfigUpdatesCopy' in updateConfigsWithIncrementalCooperative() (line
> 445 in DistributedHerder.java). The 'taskConfigUpdatesCopy' is subsequently
> used in processTaskConfigUpdatesWithIncrementalCooperative() (line 345 in
> DistributedHerder.java). This function then uses  'taskConfigUpdatesCopy'
> to find connectors to stop(line 492 in DistributedHerder.java), and finally
> get the tasks to stop, which are all the tasks. The worker thread does the
> actual job of stop(line 499 in DistributedHerder.java).
>
> In the original code, all the tasks are added to the 'updatedTasks' (line
> 623 in KafkaConfigBackingStore.java), which means all the active connectors
> are in the 'connectorsWhoseTasksToStop' set, and all the tasks are in the
> 'tasksToStop' list. This causes the stops, and of course the subsequent
> restarts, of all the tasks.
>
> So, adding only the 'deferred' tasks to the  'updatedTasks' can avoid the
> stops and restarts of unnecessary tasks.
>
> Best,
> Luying
>
>


Stopping All Tasks When a New Connector Added

2019-07-18 Thread Liu Luying
Hi all,
I have noticed that Kafka Connect 2.3.0 will stop all existing tasks and then 
start all the tasks, including the new tasks and the existing ones when adding 
a new connector or changing a connector configuration. However, I do not think 
it is a must. Only the new connector and tasks need to be started. As the 
rebalancing can be applied for both running and suspended tasks.

The problem lies in the KafkaConfigBackingStore.ConsumeCallback.onCompletion() 
function (line 623 in KafkaConfigBackingStore.java). When record.key() 
startsWith "commit-", the tasks are being committed, and the deferred tasks are 
processed, Some new tasks are added to the 'updatedTasks'(line 623 in 
KafkaConfigBackingStore.java), and the 'updatedTasks' are sent to 
updateListener to complete the task configuration update(line 638 in 
KafkaConfigBackingStore.java). In the updateListener.onTaskConfigUpdate() 
function, the  'updatedTasks' are added to the member variable, 
'taskConfigUpdates', of class DistributedHerder(line 1295 in 
DistributedHerder.java).

In another thread, 'taskConfigUpdates' is copied to 'taskConfigUpdatesCopy' in 
updateConfigsWithIncrementalCooperative() (line 445 in DistributedHerder.java). 
The 'taskConfigUpdatesCopy' is subsequently used in 
processTaskConfigUpdatesWithIncrementalCooperative() (line 345 in 
DistributedHerder.java). This function then uses  'taskConfigUpdatesCopy' to 
find connectors to stop(line 492 in DistributedHerder.java), and finally get 
the tasks to stop, which are all the tasks. The worker thread does the actual 
job of stop(line 499 in DistributedHerder.java).

In the original code, all the tasks are added to the 'updatedTasks' (line 623 
in KafkaConfigBackingStore.java), which means all the active connectors are in 
the 'connectorsWhoseTasksToStop' set, and all the tasks are in the 
'tasksToStop' list. This causes the stops, and of course the subsequent 
restarts, of all the tasks.

So, adding only the 'deferred' tasks to the  'updatedTasks' can avoid the stops 
and restarts of unnecessary tasks.

Best,
Luying