Liu feel free to share your jira account id on a separate email, so one of
the committers can add you to the project.
Then you or someone else will be able to assign this ticket to you.

I'll review the fix some time this week.

Thanks!
Konstantine

On Mon, Jul 22, 2019 at 5:13 AM Liu Luying <luying...@outlook.com> wrote:

> Hi Adam,
> I have already opened a JIRA ticket(KAFKA-8676<
> https://issues.apache.org/jira/browse/KAFKA-8676>) and a PR(with the
> title of KAFKA-8676<https://issues.apache.org/jira/browse/KAFKA-8676>)
> for this.
>
> Best,
> Luying
> ________________________________
> From: Adam Bellemare <adam.bellem...@gmail.com>
> Sent: Friday, July 19, 2019 10:36
> To: dev@kafka.apache.org <dev@kafka.apache.org>
> Subject: Re: Stopping All Tasks When a New Connector Added
>
> Hi Luying
>
> Would you be willing to make a PR to address this? It seems that you have
> already done most of the work.
>
> Thanks
> Adam
>
> On Thu, Jul 18, 2019 at 11:00 PM Liu Luying <luying...@outlook.com> wrote:
>
> > Hi all,
> > I have noticed that Kafka Connect 2.3.0 will stop all existing tasks and
> > then start all the tasks, including the new tasks and the existing ones
> > when adding a new connector or changing a connector configuration.
> However,
> > I do not think it is a must. Only the new connector and tasks need to be
> > started. As the rebalancing can be applied for both running and suspended
> > tasks.
> >
> > The problem lies in the
> > KafkaConfigBackingStore.ConsumeCallback.onCompletion() function (line 623
> > in KafkaConfigBackingStore.java). When record.key() startsWith "commit-",
> > the tasks are being committed, and the deferred tasks are processed, Some
> > new tasks are added to the 'updatedTasks'(line 623 in
> > KafkaConfigBackingStore.java), and the 'updatedTasks' are sent to
> > updateListener to complete the task configuration update(line 638 in
> > KafkaConfigBackingStore.java). In the updateListener.onTaskConfigUpdate()
> > function, the  'updatedTasks' are added to the member variable,
> > 'taskConfigUpdates', of class DistributedHerder(line 1295 in
> > DistributedHerder.java).
> >
> > In another thread, 'taskConfigUpdates' is copied to
> > 'taskConfigUpdatesCopy' in updateConfigsWithIncrementalCooperative()
> (line
> > 445 in DistributedHerder.java). The 'taskConfigUpdatesCopy' is
> subsequently
> > used in processTaskConfigUpdatesWithIncrementalCooperative() (line 345 in
> > DistributedHerder.java). This function then uses  'taskConfigUpdatesCopy'
> > to find connectors to stop(line 492 in DistributedHerder.java), and
> finally
> > get the tasks to stop, which are all the tasks. The worker thread does
> the
> > actual job of stop(line 499 in DistributedHerder.java).
> >
> > In the original code, all the tasks are added to the 'updatedTasks' (line
> > 623 in KafkaConfigBackingStore.java), which means all the active
> connectors
> > are in the 'connectorsWhoseTasksToStop' set, and all the tasks are in the
> > 'tasksToStop' list. This causes the stops, and of course the subsequent
> > restarts, of all the tasks.
> >
> > So, adding only the 'deferred' tasks to the  'updatedTasks' can avoid the
> > stops and restarts of unnecessary tasks.
> >
> > Best,
> > Luying
> >
> >
>

Reply via email to