Hey Hector,

That's a cool idea for the ConnectAssignor plugin.

While this proposal could be viewed as an "assignor" problem that a
custom assignor could solve, it's really about providing additional
context to the assignor which isn't present currently. This lack of
context would prevent a custom assignor from solving the resource
utilization problem adequately.

Thanks!
Greg

On Fri, Oct 20, 2023 at 9:58 AM Greg Harris <greg.har...@aiven.io> wrote:
>
> Mickael,
>
> Thank you for discussing that rejected alternative, I was almost going
> to propose it.
>
> > I still find the proposed mechanism limited and I'm not sure it really 
> > addressed the pain points I've experienced with Connect.
>
> I think that this KIP on its own is insufficient to solve the
> operational difficulties of Connect, and an external management layer
> is necessary. In this KIP i'm trying to find the minimum viable
> abstraction to allow a management layer to make decisions about
> placement, knowing that the abstraction may be non-ergonomic for
> "direct users" without a management layer mediating.
>
> > Connectors may also change the assignment of tasks at runtime so for 
> > example it task-2 is really busy (because it's assigned a partition with 
> > high throughput), this may not be true in 10 minutes as this partition is 
> > now assigned to task-1
>
> I think this is similar to a concern (#5) that Tom raised, and a
> limitation of the "task index" abstraction. I don't know if there is a
> way for us to manage this sort of fine-grained dynamic utilization of
> tasks. Once we start a task, it has some static resources assigned to
> it (via the JVM). If you notice the resource requirements expand, it
> will need to stop in order to move JVMs and change its resource
> allocation, and stopping the task may cause assignments to change and
> the workload to be distributed elsewhere.
>
> > I think the "hints" where to place a connector/tasks should come from the 
> > connector configuration as it's the engineers building a pipeline that 
> > knows best the requirements (in terms of isolation, resources) of their 
> > workload. This is basically point 3) in my initial email. The mechanism you 
> > propose puts this burden on the cluster administrators who may well not 
> > know the workloads and also have to guess/know in advance to properly 
> > configure workers.
>
> In the Strimzi proposal I imagined that the resource limits would be
> chosen by the users creating the connector configurations, and the
> fact that they were beside the connector configuration rather than
> inside of it didn't change the ownership. I completely agree that the
> users defining configurations should also be responsible for setting
> resource limits.
>
> > I've not looked into the feasibility but I wonder if a simplified 
> > taint/selector approach could give us enough flexibility to make Connect 
> > behave better in containerized environments.
>
> Yes, a taint/selector system would be more helpful for Connect
> clusters without an external management layer, but significantly more
> complex. Do you have a strategy for specifying unique selectors for an
> arbitrary size set of tasks?
> A connector config wishes to give a different selector to each task,
> like `task-id=connector-id-N` where N is the index of the task. My
> best solution was a special $TASK_ID placeholder that would be filled
> in with the task number at runtime, but that felt like an inelegant
> carve-out.
>
> Thanks for your thoughts.
> Greg
>
> On Fri, Oct 20, 2023 at 6:25 AM Hector Geraldino (BLOOMBERG/ 919 3RD
> A) <hgerald...@bloomberg.net> wrote:
> >
> > Hi,
> >
> > I think different algorithms might work for different workload/scenarios. I 
> > have some thoughts that are somewhat tangential to this KIP: it might be a 
> > good idea to elevate the ConnectAssignor to the category of plugin, so 
> > users can provide their own implementation.
> >
> > The fact that there's a public o.a.k.c.r.distributed.ConnectAssignor 
> > interface is brilliant (I actually wanted the same thing on the Kafka 
> > client side, alas  
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-795%3A+Add+public+APIs+for+AbstractCoordinator).
> >  I think it should play well with the future Connect's counterpart of 
> > KIP-848 (new consumer rebalance protocol).
> >
> > I don't want to hijack this thread, but will definitely raise a KIP and 
> > start a discussion around this idea.
> >
> > From: dev@kafka.apache.org At: 10/20/23 07:21:11 UTC-4:00To:  
> > dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-987: Connect Static Assignments
> >
> > Hi Greg,
> >
> > Thanks for the reply.
> >
> > I still find the proposed mechanism limited and I'm not sure it really
> > addressed the pain points I've experienced with Connect.
> > As you said different tasks from a connector may have different
> > workload. Connectors may also change the assignment of tasks at
> > runtime so for example it task-2 is really busy (because it's assigned
> > a partition with high throughput), this may not be true in 10 minutes
> > as this partition is now assigned to task-1. So having to put which
> > tasks can run on each worker does not really help in this case.
> >
> > I think the "hints" where to place a connector/tasks should come from
> > the connector configuration as it's the engineers building a pipeline
> > that knows best the requirements (in terms of isolation, resources) of
> > their workload. This is basically point 3) in my initial email. The
> > mechanism you propose puts this burden on the cluster administrators
> > who may well not know the workloads and also have to guess/know in
> > advance to properly configure workers.
> >
> > I've not looked into the feasibility but I wonder if a simplified
> > taint/selector approach could give us enough flexibility to make
> > Connect behave better in containerized environments. I understand it's
> > an alternative you rejected but I think could have some benefits. Here
> > is my green field thinking:
> > Add 2 new fields in the connector config: placement and tags.
> > Placement defines the degree of isolation a task requires, it accept 3
> > values: any (can be placed anywhere like today, the default),
> > colocated (can run on a worker with other tasks from this connector),
> > isolated (requires a dedicated worker). I think these degrees of
> > isolation should cover most use cases. Tags accepts a collections of
> > key=value pair. These can have arbitrary values and are meant to mean
> > something to the management system (for example Strimzi). The accepted
> > values could be configured on the workers by the administrators as
> > they also operate the management system.
> >
> > When a connector is created, the runtime tries to place tasks on the
> > available brokers by matching the placement and tags. If no suitable
> > workers are found, the tasks stay in unassigned state and the runtime
> > waits for the management system to create the necessary workers.
> >
> > We could even envisage to start with only the placement field as in my
> > opinion this is what brings the most value to users.
> >
> > Thanks,
> > Mickael
> >
> > On Wed, Oct 18, 2023 at 8:12 PM Greg Harris
> > <greg.har...@aiven.io.invalid> wrote:
> > >
> > > Hey Sagar,
> > >
> > > Thanks for the questions. I hope you find the answers satisfying:
> > >
> > > 1. This is detailed in the KIP two sentences earlier: "If the
> > > connect.protocol is set to static, each worker will send it's
> > > static.connectors and static.tasks to the coordinator during
> > > rebalances."
> > >
> > > 2. If you have a static worker and a wildcard worker, the static
> > > worker will be assigned the work preferentially. If the static worker
> > > goes offline, the wildcard worker will be used as a backup.
> > >
> > > 3. I don't think that new Connect users will make use of this feature,
> > > but I've added that clarification.
> > >
> > > 4. Users can implement the strategy you're describing by leaving the
> > > static.connectors field unset. I think that Connect should include
> > > static.connectors for users that do want to control the placement of
> > > connectors.
> > >
> > > 5. Yes. Arbitrary here just means that the assignment is not
> > > influenced by the static assignment.
> > >
> > > 6. Yes. There are no guardrails that ensure that the balance of the
> > > static assignments is better than the builtin algorithm because we
> > > have no method to compare them.
> > >
> > > 7. If the whole cluster uses static assignments with each job only
> > > specified on one worker, the assignments are completely sticky. If a
> > > worker goes offline, those tasks will be offline until that worker
> > > comes back.
> > > If there are multiple workers for a single job, that is specified as
> > > "arbitrary". We could choose to wait for the delay to elapse or
> > > immediately reassign it, the KIP as written could be implemented by
> > > either.
> > > If the assignment would land on a wildcard worker, that should use
> > > cooperative rules, so we would need to respect the rebalance delay.
> > >
> > > Thanks!
> > > Greg
> > >
> > > On Wed, Oct 18, 2023 at 6:21 AM Sagar <sagarmeansoc...@gmail.com> wrote:
> > > >
> > > > Hi Greg,
> > > >
> > > > Thanks for the KIP. I have a few questions/comments:
> > > >
> > > > 1) You mentioned that during a rebalance if all the members of the 
> > > > cluster
> > > > support the static protocol, then it would use the steps outlined in the
> > > > Proposed Section to do the assignments. In those steps, the leader
> > > > identifies the static/wildcard jobs. It is not clear to me how the 
> > > > leader
> > > > makes that distinction? Are we going to enhance the embedded protocol to
> > > > also write the static jobs that the worker owns as part of it's 
> > > > assignment?
> > > > As of today, the workers just write only the owned/revoked 
> > > > connectors/tasks
> > > > in case of incremental and above and only owned connectors/tasks in 
> > > > case of
> > > > eager.
> > > >
> > > > 2) Could you also elaborate this statement a bit:
> > > >
> > > > > A cluster with both static and wildcard workers can use wildcard 
> > > > > workers
> > > > > as backups for disaster recovery.
> > > >
> > > >
> > > > I see in the Strimzi proposal you have explained this scenario (the case
> > > > where shared workers are empty as pods aren't created yet etc IIUC), but
> > > > reading the KIP doesn't make it too obvious.
> > > >
> > > > 3) A nit comment but we should probably call out that there is no need 
> > > > to
> > > > assign the connector and task to the same worker when configuring them. 
> > > > I
> > > > guess for new users of Connect this could be a source of confusion. 
> > > > WDYT?
> > > >
> > > > 4) Staying on the above, I also wanted to know why do we need to set
> > > > static.connectors? As such, connectors generally aren't resource 
> > > > intensive.
> > > > Can we not let connectors be assigned using the same algorithm as today 
> > > > and
> > > > let only tasks be pinned to workers?
> > > >
> > > > 5) In the proposed section you also mentioned that
> > > >
> > > > If static assignments are not specified, or at least one worker in the
> > > > > cluster is not using the static  protocol, they are ignored and the
> > > > > worker may receive an arbitrary assignment.
> > > >
> > > >
> > > > The arbitrary assignment is driven by the minimum supported protocol 
> > > > right
> > > > (sessioned/eager)?
> > > >
> > > > 6) Just for my understanding, because now the onus is on the connect 
> > > > admin
> > > > to specify which connector/task should be running where, can there be a
> > > > situation where there could be an imbalance in terms of number of
> > > > connectors/tasks running on workers (due to some oversight on part of 
> > > > admin
> > > > or some bug in the tooling used to generate worker configs if any) ? 
> > > > There
> > > > could be an imbalance even today as well but the protocol tries to 
> > > > balance
> > > > as much as it can in an automated fashion.
> > > >
> > > > 7) Lastly, in a cluster where all workers support the static protocol,
> > > > there is still no guarantee that all connectors/tasks would be static. 
> > > > In
> > > > such a case, what happens when one of the workers is restarted or is 
> > > > being
> > > > rolled? Would the assignment eventually be sticky when the worker comes
> > > > back? Does this new protocol also abide by the scheduled rebalance 
> > > > delays?
> > > > Maybe this has been explained in the KIP but it wasn't clear to me when 
> > > > I
> > > > read it.
> > > >
> > > > Thanks!
> > > > Sagar.
> > > >
> > > > On Tue, Oct 10, 2023 at 11:22 PM Greg Harris 
> > > > <greg.har...@aiven.io.invalid>
> > > > wrote:
> > > >
> > > > > Hi Mickael,
> > > > >
> > > > > I'm not Chris but I hope I can still respond to your questions :)
> > > > >
> > > > > 1a. This system behaves best when connectors and tasks are known in
> > > > > advance, but I don't think that is a requirement. I clarified that the
> > > > > worker configurations are permissive of non-existent jobs, which
> > > > > allows for bootstrapping an empty cluster and concurrent worker & job
> > > > > provisioning.
> > > > > 1b. I think the wildcard suggestion is similar to the rejected
> > > > > alternative "implement a more complex worker selector...". While we
> > > > > could add that sort of functionality, I think it is more difficult to
> > > > > express and reason about. For example, with the current configuration,
> > > > > users can ensure that at most one job is assigned to a JVM. If a user
> > > > > wanted to use wildcards with the same constraint, those wildcards
> > > > > would need to express "at most one of <matching job>".
> > > > > 1c. The javadoc doesn't say what happens to the additional tasks, but
> > > > > I think we could start enforcing it if it makes the
> > > > > fixed-static-assignment use-case more reliable. I have only ever seen
> > > > > additional tasks emitted due to rather serious bugs in connector
> > > > > implementations.
> > > > >
> > > > > 2. Yes, I've added that information to the proposal. I used the `GET
> > > > > /connectors/{connector}` API as it doesn't list the task configs, but
> > > > > it performs the same function.
> > > > >
> > > > > 3. Do you mean the resource limits and requests? This is the rejected
> > > > > alternative "Model per-job resource constraints and measure resource
> > > > > utilization within a single JVM", let me know if the reasoning there
> > > > > is not convincing.
> > > > >
> > > > > 4. I have not explored rack awareness for Connect in detail. I expect
> > > > > that rack awareness would require some compromise on the "least
> > > > > disruption" property of incremental cooperative rebalancing for shared
> > > > > JVMs. We could discuss that in a future KIP.
> > > > > As for this proposal, because the management layer is responsible for
> > > > > placing workers and placing tasks on those workers, it would also be
> > > > > responsible for implementing rack-awareness in isolated JVMs.
> > > > >
> > > > > Thanks a lot!
> > > > > Greg
> > > > >
> > > > > On Tue, Oct 10, 2023 at 10:08 AM Greg Harris <greg.har...@aiven.io> 
> > > > > wrote:
> > > > > >
> > > > > > Hey Tom,
> > > > > >
> > > > > > Thanks for your comments! I appreciate your insight here and on the
> > > > > > Strimzi proposal.
> > > > > >
> > > > > > 1. Fixed, thanks.
> > > > > >
> > > > > > 2. We haven't removed a protocol yet, so workers still announce that
> > > > > > they can use the old protocol versions, and the group coordinator 
> > > > > > will
> > > > > > choose the highest mutually supported protocol version. This would 
> > > > > > be
> > > > > > true even if static was not a superset of sessioned.
> > > > > > For the static feature, having workers in the cluster that do not
> > > > > > support static assignments will cause the common version to not
> > > > > > support static assignments, and will mean that workers which 
> > > > > > announce
> > > > > > static.connectors and static.tasks may be assigned jobs outside of
> > > > > > their static assignments, which is called out in the KIP.
> > > > > > If the version gap is wider, then the other protocol-controlled
> > > > > > features will also change behavior (i.e. no longer using
> > > > > > incremental-cooperative-assignments, or not including REST
> > > > > > authentication headers). There would be no difference to the 
> > > > > > behavior
> > > > > > of this feature.
> > > > > >
> > > > > > 3. Arbitrary here is specifically meant to say that one of the 
> > > > > > workers
> > > > > > will be chosen, but outside implementations should not rely on the
> > > > > > method of choosing. This allows us to change/improve the assignment
> > > > > > algorithm in the future without a KIP. The reason I've specified it
> > > > > > this way is because I'd like to choose the "least loaded" worker, 
> > > > > > but
> > > > > > I don't know if that is well-defined or tractable to compute. I 
> > > > > > expect
> > > > > > that in practice we will use some heuristics to tie-break, but won't
> > > > > > be able to ensure that the algorithm will be deterministic and/or
> > > > > > optimal. If we find an improvement to the heuristics, I think that 
> > > > > > we
> > > > > > should be able to implement them without a KIP.
> > > > > >
> > > > > > 4. I added a section for "Choosing static assignments" that details
> > > > > > both the "dynamic" and "fixed" use-cases. With regards to tasks.max:
> > > > > > The javadoc for the `taskConfigs` method does state that it
> > > > > > "produc[es] at most {@code maxTasks} configurations," so static
> > > > > > inference of the task IDs is valid. I think that statically 
> > > > > > inferring
> > > > > > the number of tasks will always come with the trade-off that you can
> > > > > > over-provision task workers, it seems fundamental. I think it's a
> > > > > > trade-off that users of this feature will make for themselves, and
> > > > > > both methods will be supported.
> > > > > >
> > > > > > 5. No I don't think that it is a safe assumption, as there are
> > > > > > certainly connectors out there which violate it. For example, sink
> > > > > > connectors with a heterogeneous set of topics, and source connectors
> > > > > > which distribute a dynamic heterogeneous workload internally. I
> > > > > > believe there are a few connectors with a distinct "task-0" which
> > > > > > consumes more resources than the other tasks which would have some
> > > > > > index-stability. Index is chosen because it is the only 
> > > > > > distinguishing
> > > > > > feature we have without inspecting task configurations, and have
> > > > > > control over via coordination and assignments.
> > > > > >
> > > > > > Thanks again!
> > > > > > Greg
> > > > > >
> > > > > >
> > > > > > On Tue, Oct 10, 2023 at 8:05 AM Mickael Maison
> > <mickael.mai...@gmail.com>
> > > > > wrote:
> > > > > > >
> > > > > > > Hi Chris,
> > > > > > >
> > > > > > > Thanks for the KIP!
> > > > > > >
> > > > > > > 1) With this mechanism it seems Connect administrators have to 
> > > > > > > know in
> > > > > > > advanced the names of connectors that will be running. As far as 
> > > > > > > I can
> > > > > > > tell, static.connectors and static.tasks will require restarting 
> > > > > > > the
> > > > > > > worker to change. Also you don't specify if these configurations 
> > > > > > > can
> > > > > > > accept wildcards or not. This could be useful for tasks as it's 
> > > > > > > hard
> > > > > > > to know in advance how many tasks will exist, especially as 
> > > > > > > connectors
> > > > > > > can in theory ignore tasks.max (I wonder if we should make the 
> > > > > > > runtime
> > > > > > > enforce that config).
> > > > > > >
> > > > > > > 2) The KIP does not explicitly mention a mechanism for management
> > > > > > > systems to figure out what to do. As far as I can tell (and 
> > > > > > > looking at
> > > > > > > the proposal in Strimzi), these systems will have to poll the 
> > > > > > > REST API
> > > > > > > to do so. If I understand correctly, management systems would 
> > > > > > > first
> > > > > > > have to poll the GET /connectors and then GET 
> > > > > > > /connectors/{name}/tasks
> > > > > > > for each connector, is that right?
> > > > > > >
> > > > > > > 3) In the Strimzi proposal you mention adding custom fields to the
> > > > > > > Connector resource to instruct it what to do. I wonder if these 
> > > > > > > should
> > > > > > > be added directly in the connector configuration, rather than just
> > > > > > > exist in the management system. That way all the data necessary to
> > > > > > > make decisions would be available in the same place, the REST API.
> > > > > > >
> > > > > > > 4) This is not something you mention in the KIP and I don't think 
> > > > > > > it's
> > > > > > > a blocker for this KIP but I wonder if you thought about rack
> > > > > > > awareness too. I'm bringing it as I think it's a similar concern 
> > > > > > > when
> > > > > > > it comes to deciding where to assign tasks.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Mickael
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Oct 10, 2023 at 8:39 AM Tom Bentley <tbent...@redhat.com>
> > > > > wrote:
> > > > > > > >
> > > > > > > > Hi Greg,
> > > > > > > >
> > > > > > > > Many thanks for the KIP. Here are a few initial questions
> > > > > > > >
> > > > > > > > 1. Incomplete sentence: "But Connect is not situated to be able 
> > > > > > > > to
> > > > > manage
> > > > > > > > resources directly, as workers are given a fixed "
> > > > > > > > 2. You explain how sessioned is now a subset of static, but what
> > > > > happens in
> > > > > > > > a cluster where some workers are using static and some are using
> > > > > either
> > > > > > > > eager or compatible?
> > > > > > > > 3. "Assign each unassigned static job to a static worker which
> > > > > specifies
> > > > > > > > that job, choosing arbitrarily if there are multiple valid 
> > > > > > > > workers."
> > > > > I
> > > > > > > > think there might be less ambiguous words than "arbitrary" to
> > > > > specify this
> > > > > > > > behaviour. Hashing the task name would _appear_ pretty 
> > > > > > > > arbitrary to
> > > > > the
> > > > > > > > user, but would be deterministic. Picking at random would be
> > > > > > > > non-deterministic. Even better if you have a rationale.
> > > > > > > > 4. You don't describe how a user, or automated system, starting 
> > > > > > > > with
> > > > > a set
> > > > > > > > of connectors, should find out the tasks that they want to run. 
> > > > > > > > This
> > > > > > > > relates to the contract of
> > > > > > > > org.apache.kafka.connect.connector.Connector#taskConfigs(int
> > > > > maxTasks).
> > > > > > > > AFAIK (please correct me if I'm wrong, because it's a long time
> > > > > since I
> > > > > > > > looked at this code) there's nothing that validates that the
> > > > > returned list
> > > > > > > > has at most the `maxTasks` and connectors can, of course, return
> > > > > fewer than
> > > > > > > > that many tasks. So without deeper knowledge of a particular
> > > > > connector it's
> > > > > > > > not clear to the user/operator how to configure their static 
> > > > > > > > workers
> > > > > and
> > > > > > > > static assignments.
> > > > > > > > 5. Is there a lurking assumption that task indices are stable? 
> > > > > > > > E.g.
> > > > > that
> > > > > > > > the task with index 3 will always be the resource-intensive 
> > > > > > > > one. I
> > > > > can see
> > > > > > > > that that would often be a reliable assumption, but it's not 
> > > > > > > > clear
> > > > > to me
> > > > > > > > that it is always the case.
> > > > > > > >
> > > > > > > > Thanks again,
> > > > > > > >
> > > > > > > > Tom
> > > > > > > >
> > > > > > > > On Fri, 6 Oct 2023 at 12:36, Greg Harris
> > > > > <greg.har...@aiven.io.invalid>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hey everyone!
> > > > > > > > >
> > > > > > > > > I'd like to propose an improvement to Kafka Connect's 
> > > > > > > > > scheduling
> > > > > > > > > algorithm, with the goal of improving the operability of 
> > > > > > > > > connect
> > > > > > > > > clusters through resource isolation.
> > > > > > > > >
> > > > > > > > > Find the KIP here:
> > > > > > > > >
> > > > > > > > >
> > > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-987%3A+Connect+Static+Assi
> > gnments
> > > > > > > > >
> > > > > > > > > This feature is primarily intended to be consumed by cluster
> > > > > > > > > management systems, so I've opened a sister proposal in the
> > Strimzi
> > > > > > > > > project that uses this feature to provide user-facing resource
> > > > > > > > > isolation. The base feature is generic, so it is usable 
> > > > > > > > > manually
> > > > > and
> > > > > > > > > with cluster management systems other than Strimzi.
> > > > > > > > >
> > > > > > > > > Find the proposal here:
> > > > > https://github.com/strimzi/proposals/pull/96
> > > > > > > > >
> > > > > > > > > Thanks!
> > > > > > > > > Greg Harris
> > > > > > > > >
> > > > > > > > >
> > > > >
> >
> >

Reply via email to