Hey Hector, That's a cool idea for the ConnectAssignor plugin.
While this proposal could be viewed as an "assignor" problem that a custom assignor could solve, it's really about providing additional context to the assignor which isn't present currently. This lack of context would prevent a custom assignor from solving the resource utilization problem adequately. Thanks! Greg On Fri, Oct 20, 2023 at 9:58 AM Greg Harris <greg.har...@aiven.io> wrote: > > Mickael, > > Thank you for discussing that rejected alternative, I was almost going > to propose it. > > > I still find the proposed mechanism limited and I'm not sure it really > > addressed the pain points I've experienced with Connect. > > I think that this KIP on its own is insufficient to solve the > operational difficulties of Connect, and an external management layer > is necessary. In this KIP i'm trying to find the minimum viable > abstraction to allow a management layer to make decisions about > placement, knowing that the abstraction may be non-ergonomic for > "direct users" without a management layer mediating. > > > Connectors may also change the assignment of tasks at runtime so for > > example it task-2 is really busy (because it's assigned a partition with > > high throughput), this may not be true in 10 minutes as this partition is > > now assigned to task-1 > > I think this is similar to a concern (#5) that Tom raised, and a > limitation of the "task index" abstraction. I don't know if there is a > way for us to manage this sort of fine-grained dynamic utilization of > tasks. Once we start a task, it has some static resources assigned to > it (via the JVM). If you notice the resource requirements expand, it > will need to stop in order to move JVMs and change its resource > allocation, and stopping the task may cause assignments to change and > the workload to be distributed elsewhere. > > > I think the "hints" where to place a connector/tasks should come from the > > connector configuration as it's the engineers building a pipeline that > > knows best the requirements (in terms of isolation, resources) of their > > workload. This is basically point 3) in my initial email. The mechanism you > > propose puts this burden on the cluster administrators who may well not > > know the workloads and also have to guess/know in advance to properly > > configure workers. > > In the Strimzi proposal I imagined that the resource limits would be > chosen by the users creating the connector configurations, and the > fact that they were beside the connector configuration rather than > inside of it didn't change the ownership. I completely agree that the > users defining configurations should also be responsible for setting > resource limits. > > > I've not looked into the feasibility but I wonder if a simplified > > taint/selector approach could give us enough flexibility to make Connect > > behave better in containerized environments. > > Yes, a taint/selector system would be more helpful for Connect > clusters without an external management layer, but significantly more > complex. Do you have a strategy for specifying unique selectors for an > arbitrary size set of tasks? > A connector config wishes to give a different selector to each task, > like `task-id=connector-id-N` where N is the index of the task. My > best solution was a special $TASK_ID placeholder that would be filled > in with the task number at runtime, but that felt like an inelegant > carve-out. > > Thanks for your thoughts. > Greg > > On Fri, Oct 20, 2023 at 6:25 AM Hector Geraldino (BLOOMBERG/ 919 3RD > A) <hgerald...@bloomberg.net> wrote: > > > > Hi, > > > > I think different algorithms might work for different workload/scenarios. I > > have some thoughts that are somewhat tangential to this KIP: it might be a > > good idea to elevate the ConnectAssignor to the category of plugin, so > > users can provide their own implementation. > > > > The fact that there's a public o.a.k.c.r.distributed.ConnectAssignor > > interface is brilliant (I actually wanted the same thing on the Kafka > > client side, alas > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-795%3A+Add+public+APIs+for+AbstractCoordinator). > > I think it should play well with the future Connect's counterpart of > > KIP-848 (new consumer rebalance protocol). > > > > I don't want to hijack this thread, but will definitely raise a KIP and > > start a discussion around this idea. > > > > From: dev@kafka.apache.org At: 10/20/23 07:21:11 UTC-4:00To: > > dev@kafka.apache.org > > Subject: Re: [DISCUSS] KIP-987: Connect Static Assignments > > > > Hi Greg, > > > > Thanks for the reply. > > > > I still find the proposed mechanism limited and I'm not sure it really > > addressed the pain points I've experienced with Connect. > > As you said different tasks from a connector may have different > > workload. Connectors may also change the assignment of tasks at > > runtime so for example it task-2 is really busy (because it's assigned > > a partition with high throughput), this may not be true in 10 minutes > > as this partition is now assigned to task-1. So having to put which > > tasks can run on each worker does not really help in this case. > > > > I think the "hints" where to place a connector/tasks should come from > > the connector configuration as it's the engineers building a pipeline > > that knows best the requirements (in terms of isolation, resources) of > > their workload. This is basically point 3) in my initial email. The > > mechanism you propose puts this burden on the cluster administrators > > who may well not know the workloads and also have to guess/know in > > advance to properly configure workers. > > > > I've not looked into the feasibility but I wonder if a simplified > > taint/selector approach could give us enough flexibility to make > > Connect behave better in containerized environments. I understand it's > > an alternative you rejected but I think could have some benefits. Here > > is my green field thinking: > > Add 2 new fields in the connector config: placement and tags. > > Placement defines the degree of isolation a task requires, it accept 3 > > values: any (can be placed anywhere like today, the default), > > colocated (can run on a worker with other tasks from this connector), > > isolated (requires a dedicated worker). I think these degrees of > > isolation should cover most use cases. Tags accepts a collections of > > key=value pair. These can have arbitrary values and are meant to mean > > something to the management system (for example Strimzi). The accepted > > values could be configured on the workers by the administrators as > > they also operate the management system. > > > > When a connector is created, the runtime tries to place tasks on the > > available brokers by matching the placement and tags. If no suitable > > workers are found, the tasks stay in unassigned state and the runtime > > waits for the management system to create the necessary workers. > > > > We could even envisage to start with only the placement field as in my > > opinion this is what brings the most value to users. > > > > Thanks, > > Mickael > > > > On Wed, Oct 18, 2023 at 8:12 PM Greg Harris > > <greg.har...@aiven.io.invalid> wrote: > > > > > > Hey Sagar, > > > > > > Thanks for the questions. I hope you find the answers satisfying: > > > > > > 1. This is detailed in the KIP two sentences earlier: "If the > > > connect.protocol is set to static, each worker will send it's > > > static.connectors and static.tasks to the coordinator during > > > rebalances." > > > > > > 2. If you have a static worker and a wildcard worker, the static > > > worker will be assigned the work preferentially. If the static worker > > > goes offline, the wildcard worker will be used as a backup. > > > > > > 3. I don't think that new Connect users will make use of this feature, > > > but I've added that clarification. > > > > > > 4. Users can implement the strategy you're describing by leaving the > > > static.connectors field unset. I think that Connect should include > > > static.connectors for users that do want to control the placement of > > > connectors. > > > > > > 5. Yes. Arbitrary here just means that the assignment is not > > > influenced by the static assignment. > > > > > > 6. Yes. There are no guardrails that ensure that the balance of the > > > static assignments is better than the builtin algorithm because we > > > have no method to compare them. > > > > > > 7. If the whole cluster uses static assignments with each job only > > > specified on one worker, the assignments are completely sticky. If a > > > worker goes offline, those tasks will be offline until that worker > > > comes back. > > > If there are multiple workers for a single job, that is specified as > > > "arbitrary". We could choose to wait for the delay to elapse or > > > immediately reassign it, the KIP as written could be implemented by > > > either. > > > If the assignment would land on a wildcard worker, that should use > > > cooperative rules, so we would need to respect the rebalance delay. > > > > > > Thanks! > > > Greg > > > > > > On Wed, Oct 18, 2023 at 6:21 AM Sagar <sagarmeansoc...@gmail.com> wrote: > > > > > > > > Hi Greg, > > > > > > > > Thanks for the KIP. I have a few questions/comments: > > > > > > > > 1) You mentioned that during a rebalance if all the members of the > > > > cluster > > > > support the static protocol, then it would use the steps outlined in the > > > > Proposed Section to do the assignments. In those steps, the leader > > > > identifies the static/wildcard jobs. It is not clear to me how the > > > > leader > > > > makes that distinction? Are we going to enhance the embedded protocol to > > > > also write the static jobs that the worker owns as part of it's > > > > assignment? > > > > As of today, the workers just write only the owned/revoked > > > > connectors/tasks > > > > in case of incremental and above and only owned connectors/tasks in > > > > case of > > > > eager. > > > > > > > > 2) Could you also elaborate this statement a bit: > > > > > > > > > A cluster with both static and wildcard workers can use wildcard > > > > > workers > > > > > as backups for disaster recovery. > > > > > > > > > > > > I see in the Strimzi proposal you have explained this scenario (the case > > > > where shared workers are empty as pods aren't created yet etc IIUC), but > > > > reading the KIP doesn't make it too obvious. > > > > > > > > 3) A nit comment but we should probably call out that there is no need > > > > to > > > > assign the connector and task to the same worker when configuring them. > > > > I > > > > guess for new users of Connect this could be a source of confusion. > > > > WDYT? > > > > > > > > 4) Staying on the above, I also wanted to know why do we need to set > > > > static.connectors? As such, connectors generally aren't resource > > > > intensive. > > > > Can we not let connectors be assigned using the same algorithm as today > > > > and > > > > let only tasks be pinned to workers? > > > > > > > > 5) In the proposed section you also mentioned that > > > > > > > > If static assignments are not specified, or at least one worker in the > > > > > cluster is not using the static protocol, they are ignored and the > > > > > worker may receive an arbitrary assignment. > > > > > > > > > > > > The arbitrary assignment is driven by the minimum supported protocol > > > > right > > > > (sessioned/eager)? > > > > > > > > 6) Just for my understanding, because now the onus is on the connect > > > > admin > > > > to specify which connector/task should be running where, can there be a > > > > situation where there could be an imbalance in terms of number of > > > > connectors/tasks running on workers (due to some oversight on part of > > > > admin > > > > or some bug in the tooling used to generate worker configs if any) ? > > > > There > > > > could be an imbalance even today as well but the protocol tries to > > > > balance > > > > as much as it can in an automated fashion. > > > > > > > > 7) Lastly, in a cluster where all workers support the static protocol, > > > > there is still no guarantee that all connectors/tasks would be static. > > > > In > > > > such a case, what happens when one of the workers is restarted or is > > > > being > > > > rolled? Would the assignment eventually be sticky when the worker comes > > > > back? Does this new protocol also abide by the scheduled rebalance > > > > delays? > > > > Maybe this has been explained in the KIP but it wasn't clear to me when > > > > I > > > > read it. > > > > > > > > Thanks! > > > > Sagar. > > > > > > > > On Tue, Oct 10, 2023 at 11:22 PM Greg Harris > > > > <greg.har...@aiven.io.invalid> > > > > wrote: > > > > > > > > > Hi Mickael, > > > > > > > > > > I'm not Chris but I hope I can still respond to your questions :) > > > > > > > > > > 1a. This system behaves best when connectors and tasks are known in > > > > > advance, but I don't think that is a requirement. I clarified that the > > > > > worker configurations are permissive of non-existent jobs, which > > > > > allows for bootstrapping an empty cluster and concurrent worker & job > > > > > provisioning. > > > > > 1b. I think the wildcard suggestion is similar to the rejected > > > > > alternative "implement a more complex worker selector...". While we > > > > > could add that sort of functionality, I think it is more difficult to > > > > > express and reason about. For example, with the current configuration, > > > > > users can ensure that at most one job is assigned to a JVM. If a user > > > > > wanted to use wildcards with the same constraint, those wildcards > > > > > would need to express "at most one of <matching job>". > > > > > 1c. The javadoc doesn't say what happens to the additional tasks, but > > > > > I think we could start enforcing it if it makes the > > > > > fixed-static-assignment use-case more reliable. I have only ever seen > > > > > additional tasks emitted due to rather serious bugs in connector > > > > > implementations. > > > > > > > > > > 2. Yes, I've added that information to the proposal. I used the `GET > > > > > /connectors/{connector}` API as it doesn't list the task configs, but > > > > > it performs the same function. > > > > > > > > > > 3. Do you mean the resource limits and requests? This is the rejected > > > > > alternative "Model per-job resource constraints and measure resource > > > > > utilization within a single JVM", let me know if the reasoning there > > > > > is not convincing. > > > > > > > > > > 4. I have not explored rack awareness for Connect in detail. I expect > > > > > that rack awareness would require some compromise on the "least > > > > > disruption" property of incremental cooperative rebalancing for shared > > > > > JVMs. We could discuss that in a future KIP. > > > > > As for this proposal, because the management layer is responsible for > > > > > placing workers and placing tasks on those workers, it would also be > > > > > responsible for implementing rack-awareness in isolated JVMs. > > > > > > > > > > Thanks a lot! > > > > > Greg > > > > > > > > > > On Tue, Oct 10, 2023 at 10:08 AM Greg Harris <greg.har...@aiven.io> > > > > > wrote: > > > > > > > > > > > > Hey Tom, > > > > > > > > > > > > Thanks for your comments! I appreciate your insight here and on the > > > > > > Strimzi proposal. > > > > > > > > > > > > 1. Fixed, thanks. > > > > > > > > > > > > 2. We haven't removed a protocol yet, so workers still announce that > > > > > > they can use the old protocol versions, and the group coordinator > > > > > > will > > > > > > choose the highest mutually supported protocol version. This would > > > > > > be > > > > > > true even if static was not a superset of sessioned. > > > > > > For the static feature, having workers in the cluster that do not > > > > > > support static assignments will cause the common version to not > > > > > > support static assignments, and will mean that workers which > > > > > > announce > > > > > > static.connectors and static.tasks may be assigned jobs outside of > > > > > > their static assignments, which is called out in the KIP. > > > > > > If the version gap is wider, then the other protocol-controlled > > > > > > features will also change behavior (i.e. no longer using > > > > > > incremental-cooperative-assignments, or not including REST > > > > > > authentication headers). There would be no difference to the > > > > > > behavior > > > > > > of this feature. > > > > > > > > > > > > 3. Arbitrary here is specifically meant to say that one of the > > > > > > workers > > > > > > will be chosen, but outside implementations should not rely on the > > > > > > method of choosing. This allows us to change/improve the assignment > > > > > > algorithm in the future without a KIP. The reason I've specified it > > > > > > this way is because I'd like to choose the "least loaded" worker, > > > > > > but > > > > > > I don't know if that is well-defined or tractable to compute. I > > > > > > expect > > > > > > that in practice we will use some heuristics to tie-break, but won't > > > > > > be able to ensure that the algorithm will be deterministic and/or > > > > > > optimal. If we find an improvement to the heuristics, I think that > > > > > > we > > > > > > should be able to implement them without a KIP. > > > > > > > > > > > > 4. I added a section for "Choosing static assignments" that details > > > > > > both the "dynamic" and "fixed" use-cases. With regards to tasks.max: > > > > > > The javadoc for the `taskConfigs` method does state that it > > > > > > "produc[es] at most {@code maxTasks} configurations," so static > > > > > > inference of the task IDs is valid. I think that statically > > > > > > inferring > > > > > > the number of tasks will always come with the trade-off that you can > > > > > > over-provision task workers, it seems fundamental. I think it's a > > > > > > trade-off that users of this feature will make for themselves, and > > > > > > both methods will be supported. > > > > > > > > > > > > 5. No I don't think that it is a safe assumption, as there are > > > > > > certainly connectors out there which violate it. For example, sink > > > > > > connectors with a heterogeneous set of topics, and source connectors > > > > > > which distribute a dynamic heterogeneous workload internally. I > > > > > > believe there are a few connectors with a distinct "task-0" which > > > > > > consumes more resources than the other tasks which would have some > > > > > > index-stability. Index is chosen because it is the only > > > > > > distinguishing > > > > > > feature we have without inspecting task configurations, and have > > > > > > control over via coordination and assignments. > > > > > > > > > > > > Thanks again! > > > > > > Greg > > > > > > > > > > > > > > > > > > On Tue, Oct 10, 2023 at 8:05 AM Mickael Maison > > <mickael.mai...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > > Hi Chris, > > > > > > > > > > > > > > Thanks for the KIP! > > > > > > > > > > > > > > 1) With this mechanism it seems Connect administrators have to > > > > > > > know in > > > > > > > advanced the names of connectors that will be running. As far as > > > > > > > I can > > > > > > > tell, static.connectors and static.tasks will require restarting > > > > > > > the > > > > > > > worker to change. Also you don't specify if these configurations > > > > > > > can > > > > > > > accept wildcards or not. This could be useful for tasks as it's > > > > > > > hard > > > > > > > to know in advance how many tasks will exist, especially as > > > > > > > connectors > > > > > > > can in theory ignore tasks.max (I wonder if we should make the > > > > > > > runtime > > > > > > > enforce that config). > > > > > > > > > > > > > > 2) The KIP does not explicitly mention a mechanism for management > > > > > > > systems to figure out what to do. As far as I can tell (and > > > > > > > looking at > > > > > > > the proposal in Strimzi), these systems will have to poll the > > > > > > > REST API > > > > > > > to do so. If I understand correctly, management systems would > > > > > > > first > > > > > > > have to poll the GET /connectors and then GET > > > > > > > /connectors/{name}/tasks > > > > > > > for each connector, is that right? > > > > > > > > > > > > > > 3) In the Strimzi proposal you mention adding custom fields to the > > > > > > > Connector resource to instruct it what to do. I wonder if these > > > > > > > should > > > > > > > be added directly in the connector configuration, rather than just > > > > > > > exist in the management system. That way all the data necessary to > > > > > > > make decisions would be available in the same place, the REST API. > > > > > > > > > > > > > > 4) This is not something you mention in the KIP and I don't think > > > > > > > it's > > > > > > > a blocker for this KIP but I wonder if you thought about rack > > > > > > > awareness too. I'm bringing it as I think it's a similar concern > > > > > > > when > > > > > > > it comes to deciding where to assign tasks. > > > > > > > > > > > > > > Thanks, > > > > > > > Mickael > > > > > > > > > > > > > > > > > > > > > On Tue, Oct 10, 2023 at 8:39 AM Tom Bentley <tbent...@redhat.com> > > > > > wrote: > > > > > > > > > > > > > > > > Hi Greg, > > > > > > > > > > > > > > > > Many thanks for the KIP. Here are a few initial questions > > > > > > > > > > > > > > > > 1. Incomplete sentence: "But Connect is not situated to be able > > > > > > > > to > > > > > manage > > > > > > > > resources directly, as workers are given a fixed " > > > > > > > > 2. You explain how sessioned is now a subset of static, but what > > > > > happens in > > > > > > > > a cluster where some workers are using static and some are using > > > > > either > > > > > > > > eager or compatible? > > > > > > > > 3. "Assign each unassigned static job to a static worker which > > > > > specifies > > > > > > > > that job, choosing arbitrarily if there are multiple valid > > > > > > > > workers." > > > > > I > > > > > > > > think there might be less ambiguous words than "arbitrary" to > > > > > specify this > > > > > > > > behaviour. Hashing the task name would _appear_ pretty > > > > > > > > arbitrary to > > > > > the > > > > > > > > user, but would be deterministic. Picking at random would be > > > > > > > > non-deterministic. Even better if you have a rationale. > > > > > > > > 4. You don't describe how a user, or automated system, starting > > > > > > > > with > > > > > a set > > > > > > > > of connectors, should find out the tasks that they want to run. > > > > > > > > This > > > > > > > > relates to the contract of > > > > > > > > org.apache.kafka.connect.connector.Connector#taskConfigs(int > > > > > maxTasks). > > > > > > > > AFAIK (please correct me if I'm wrong, because it's a long time > > > > > since I > > > > > > > > looked at this code) there's nothing that validates that the > > > > > returned list > > > > > > > > has at most the `maxTasks` and connectors can, of course, return > > > > > fewer than > > > > > > > > that many tasks. So without deeper knowledge of a particular > > > > > connector it's > > > > > > > > not clear to the user/operator how to configure their static > > > > > > > > workers > > > > > and > > > > > > > > static assignments. > > > > > > > > 5. Is there a lurking assumption that task indices are stable? > > > > > > > > E.g. > > > > > that > > > > > > > > the task with index 3 will always be the resource-intensive > > > > > > > > one. I > > > > > can see > > > > > > > > that that would often be a reliable assumption, but it's not > > > > > > > > clear > > > > > to me > > > > > > > > that it is always the case. > > > > > > > > > > > > > > > > Thanks again, > > > > > > > > > > > > > > > > Tom > > > > > > > > > > > > > > > > On Fri, 6 Oct 2023 at 12:36, Greg Harris > > > > > <greg.har...@aiven.io.invalid> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hey everyone! > > > > > > > > > > > > > > > > > > I'd like to propose an improvement to Kafka Connect's > > > > > > > > > scheduling > > > > > > > > > algorithm, with the goal of improving the operability of > > > > > > > > > connect > > > > > > > > > clusters through resource isolation. > > > > > > > > > > > > > > > > > > Find the KIP here: > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-987%3A+Connect+Static+Assi > > gnments > > > > > > > > > > > > > > > > > > This feature is primarily intended to be consumed by cluster > > > > > > > > > management systems, so I've opened a sister proposal in the > > Strimzi > > > > > > > > > project that uses this feature to provide user-facing resource > > > > > > > > > isolation. The base feature is generic, so it is usable > > > > > > > > > manually > > > > > and > > > > > > > > > with cluster management systems other than Strimzi. > > > > > > > > > > > > > > > > > > Find the proposal here: > > > > > https://github.com/strimzi/proposals/pull/96 > > > > > > > > > > > > > > > > > > Thanks! > > > > > > > > > Greg Harris > > > > > > > > > > > > > > > > > > > > > > > > > > >