Hi Robin

I had gone though the link you provided, It is not helpful in my case.
Apart from this, *I am not getting why the tasks are divided in *below
pattern* when they are *first time registered*, which is expected behavior.
I*s there any parameter which we can pass in worker property file which
handle the task assignment strategy like we have range assigner or round
robin in consumer-group ?

connector rest status api result after first registration :

{
  "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
  "connector": {
    "state": "RUNNING",
    "worker_id": "10.0.0.5:*8080*"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "10.0.0.4:*8078*"
    },
    {
      "id": 1,
      "state": "RUNNING",
      "worker_id": "10.0.0.5:*8080*"
    }
  ],
  "type": "sink"
}

and

{
  "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
  "connector": {
    "state": "RUNNING",
    "worker_id": "10.0.0.4:*8078*"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "10.0.0.4:*8078*"
    },
    {
      "id": 1,
      "state": "RUNNING",
      "worker_id": "10.0.0.5:*8080*"
    }
  ],
  "type": "sink"
}


But when I stop the second worker process and wait for
scheduled.rebalance.max.delay.ms time i.e 5 min to over, and start the
process again. Result is different.

{
  "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
  "connector": {
    "state": "RUNNING",
    "worker_id": "10.0.0.5:*8080*"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "10.0.0.5:*8080*"
    },
    {
      "id": 1,
      "state": "RUNNING",
      "worker_id": "10.0.0.5:*8080*"
    }
  ],
  "type": "sink"
}

and

{
  "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
  "connector": {
    "state": "RUNNING",
    "worker_id": "10.0.0.4:*8078*"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "10.0.0.4:*8078*"
    },
    {
      "id": 1,
      "state": "RUNNING",
      "worker_id": "10.0.0.4:*8078*"
    }
  ],
  "type": "sink"
}

Regards and Thanks
Deepak Raghav



On Wed, May 20, 2020 at 9:29 PM Robin Moffatt <ro...@confluent.io> wrote:

> Thanks for the clarification. If this is an actual problem that you're
> encountering and need a solution to then since the task allocation is not
> deterministic it sounds like you need to deploy separate worker clusters
> based on the workload patterns that you are seeing and machine resources
> available.
>
>
> --
>
> Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff
>
>
> On Wed, 20 May 2020 at 16:39, Deepak Raghav <deepakragha...@gmail.com>
> wrote:
>
> > Hi Robin
> >
> > Replying to your query i.e
> >
> > One thing I'd ask at this point is though if it makes any difference
> where
> > the tasks execute?
> >
> > It actually makes difference to us, we have 16 connectors and as I stated
> > tasks division earlier, first 8 connector' task are assigned to first
> > worker process and another connector's task to another worker process and
> > just to mention that these 16 connectors are sink connectors. Each sink
> > connector consumes message from different topic.There may be a case when
> > messages are coming only for first 8 connector's topic and because all
> the
> > tasks of these connectors are assigned to First Worker, load would be
> high
> > on it and another set of connectors in another worker would be idle.
> >
> > Instead, if the task would have been divided evenly then this case would
> > have been avoided. Because tasks of each connector would be present in
> both
> > workers process like below :
> >
> > *W1*                       *W2*
> >  C1T1                    C1T2
> >  C2T2                    C2T2
> >
> > I hope, I gave your answer,
> >
> >
> > Regards and Thanks
> > Deepak Raghav
> >
> >
> >
> > On Wed, May 20, 2020 at 4:42 PM Robin Moffatt <ro...@confluent.io>
> wrote:
> >
> > > OK, I understand better now.
> > >
> > > You can read more about the guts of the rebalancing protocol that Kafka
> > > Connect uses as of Apache Kafka 2.3 an onwards here:
> > >
> >
> https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka/
> > >
> > > One thing I'd ask at this point is though if it makes any difference
> > where
> > > the tasks execute? The point of a cluster is that Kafka Connect manages
> > the
> > > workload allocation. If you need workload separation and
> > > guaranteed execution locality I would suggest separate Kafka Connect
> > > distributed clusters.
> > >
> > >
> > > --
> > >
> > > Robin Moffatt | Senior Developer Advocate | ro...@confluent.io |
> @rmoff
> > >
> > >
> > > On Wed, 20 May 2020 at 10:24, Deepak Raghav <deepakragha...@gmail.com>
> > > wrote:
> > >
> > > > Hi Robin
> > > >
> > > > Thanks for your reply.
> > > >
> > > > We are having two worker on different IP. The example which I gave
> you
> > it
> > > > was just a example. We are using kafka version 2.3.1.
> > > >
> > > > Let me tell you again with a simple example.
> > > >
> > > > Suppose, we have two EC2 node, N1 and N2 having worker process W1 and
> > W2
> > > > running in distribute mode with groupId i.e in same cluster and two
> > > > connectors with having two task each i.e
> > > >
> > > > Node N1: W1 is running
> > > > Node N2 : W2 is running
> > > >
> > > > First Connector (C1) : Task1 with id : C1T1 and task 2 with id : C1T2
> > > > Second Connector (C2) : Task1 with id : C2T1 and task 2 with id :
> C2T2
> > > >
> > > > Now Suppose If both W1 and W2 worker process are running  and I
> > register
> > > > Connector C1 and C2 one after another i.e sequentially, on any of the
> > > > worker process, the tasks division between the worker
> > > > node are happening like below, which is expected.
> > > >
> > > > *W1*                       *W2*
> > > > C1T1                    C1T2
> > > > C2T2                    C2T2
> > > >
> > > > Now, suppose I stop one worker process e.g W2 and start after some
> > time,
> > > > the tasks division is changed like below i.e first connector's task
> > move
> > > to
> > > > W1 and second connector's task move to W2
> > > >
> > > > *W1*                       *W2*
> > > > C1T1                    C2T1
> > > > C1T2                    C2T2
> > > >
> > > >
> > > > Please let me know, If it is understandable or not.
> > > >
> > > > Note : Actually, In production, we are gonna have 16 connectors
> having
> > 10
> > > > task each and two worker node. With above scenario, first 8
> > connectors's
> > > > task move to W1 and next 8 connector' task move to W2, Which is not
> > > > expected.
> > > >
> > > >
> > > > Regards and Thanks
> > > > Deepak Raghav
> > > >
> > > >
> > > >
> > > > On Wed, May 20, 2020 at 1:41 PM Robin Moffatt <ro...@confluent.io>
> > > wrote:
> > > >
> > > > > So you're running two workers on the same machine (10.0.0.4), is
> > > > > that correct? Normally you'd run one worker per machine unless
> there
> > > was
> > > > a
> > > > > particular reason otherwise.
> > > > > What version of Apache Kafka are you using?
> > > > > I'm not clear from your question if the distribution of tasks is
> > > > > presenting a problem to you (if so please describe why), or if
> you're
> > > > just
> > > > > interested in the theory behind the rebalancing protocol?
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Robin Moffatt | Senior Developer Advocate | ro...@confluent.io |
> > > @rmoff
> > > > >
> > > > >
> > > > > On Wed, 20 May 2020 at 06:46, Deepak Raghav <
> > deepakragha...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi
> > > > > >
> > > > > > Please, can anybody help me with this?
> > > > > >
> > > > > > Regards and Thanks
> > > > > > Deepak Raghav
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Tue, May 19, 2020 at 1:37 PM Deepak Raghav <
> > > > deepakragha...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Team
> > > > > > >
> > > > > > > We have two worker node in a cluster and 2 connector with
> having
> > 10
> > > > > tasks
> > > > > > > each.
> > > > > > >
> > > > > > > Now, suppose if we have two kafka connect process W1(Port 8080)
> > and
> > > > > > > W2(Port 8078) started already in distribute mode and then
> > register
> > > > the
> > > > > > > connectors, task of one connector i.e 10 tasks are divided
> > equally
> > > > > > between
> > > > > > > two worker i.e first task of A connector to W1 worker node and
> > sec
> > > > task
> > > > > > of
> > > > > > > A connector to W2 worker node, similarly for first task of B
> > > > connector,
> > > > > > > will go to W1 node and sec task of B connector go to W2 node.
> > > > > > >
> > > > > > > e.g
> > > > > > > *#First Connector : *
> > > > > > > {
> > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> > > > > > >   "connector": {
> > > > > > >     "state": "RUNNING",
> > > > > > >     "worker_id": "10.0.0.4:*8080*"
> > > > > > >   },
> > > > > > >   "tasks": [
> > > > > > >     {
> > > > > > >       "id": 0,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:*8078*"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 1,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 2,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 3,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 4,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 5,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 6,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 7,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 8,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 9,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     }
> > > > > > >   ],
> > > > > > >   "type": "sink"
> > > > > > > }
> > > > > > >
> > > > > > >
> > > > > > > *#Sec connector*
> > > > > > >
> > > > > > > {
> > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> > > > > > >   "connector": {
> > > > > > >     "state": "RUNNING",
> > > > > > >     "worker_id": "10.0.0.4:8078"
> > > > > > >   },
> > > > > > >   "tasks": [
> > > > > > >     {
> > > > > > >       "id": 0,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 1,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 2,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 3,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 4,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 5,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 6,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 7,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 8,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 9,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     }
> > > > > > >   ],
> > > > > > >   "type": "sink"
> > > > > > > }
> > > > > > >
> > > > > > > But I have seen a strange behavior, when I just shutdown W2
> > worker
> > > > node
> > > > > > > and start it again task are divided but in diff way i.e all the
> > > tasks
> > > > > of
> > > > > > A
> > > > > > > connector will get into W1 node and tasks of B Connector into
> W2
> > > > node.
> > > > > > >
> > > > > > > Can you please have a look for this.
> > > > > > >
> > > > > > > *#First Connector*
> > > > > > >
> > > > > > > {
> > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Dchchargeableevent",
> > > > > > >   "connector": {
> > > > > > >     "state": "RUNNING",
> > > > > > >     "worker_id": "10.0.0.4:8080"
> > > > > > >   },
> > > > > > >   "tasks": [
> > > > > > >     {
> > > > > > >       "id": 0,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 1,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 2,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 3,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 4,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 5,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 6,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 7,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 8,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 9,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8080"
> > > > > > >     }
> > > > > > >   ],
> > > > > > >   "type": "sink"
> > > > > > > }
> > > > > > >
> > > > > > > *#Second Connector *:
> > > > > > >
> > > > > > > {
> > > > > > >   "name": "REGION_CODE_UPPER-Cdb_Neatransaction",
> > > > > > >   "connector": {
> > > > > > >     "state": "RUNNING",
> > > > > > >     "worker_id": "10.0.0.4:8078"
> > > > > > >   },
> > > > > > >   "tasks": [
> > > > > > >     {
> > > > > > >       "id": 0,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 1,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 2,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 3,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 4,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 5,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 6,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 7,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 8,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     },
> > > > > > >     {
> > > > > > >       "id": 9,
> > > > > > >       "state": "RUNNING",
> > > > > > >       "worker_id": "10.0.0.4:8078"
> > > > > > >     }
> > > > > > >   ],
> > > > > > >   "type": "sink"
> > > > > > > }
> > > > > > >
> > > > > > >
> > > > > > > Regards and Thanks
> > > > > > > Deepak Raghav
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to