I still find the proposed mechanism limited and I'm not sure it really
addressed the pain points I've experienced with Connect.
As you said different tasks from a connector may have different
workload. Connectors may also change the assignment of tasks at
runtime so for example it task-2 is really busy (because it's assigned
a partition with high throughput), this may not be true in 10 minutes
as this partition is now assigned to task-1. So having to put which
tasks can run on each worker does not really help in this case.

I think the "hints" where to place a connector/tasks should come from
the connector configuration as it's the engineers building a pipeline
that knows best the requirements (in terms of isolation, resources) of
their workload. This is basically point 3) in my initial email. The
mechanism you propose puts this burden on the cluster administrators
who may well not know the workloads and also have to guess/know in
advance to properly configure workers.

I've not looked into the feasibility but I wonder if a simplified
taint/selector approach could give us enough flexibility to make
Connect behave better in containerized environments. I understand it's
an alternative you rejected but I think could have some benefits. Here
is my green field thinking:
Add 2 new fields in the connector config: placement and tags.
Placement defines the degree of isolation a task requires, it accept 3
values: any (can be placed anywhere like today, the default),
colocated (can run on a worker with other tasks from this connector),
isolated (requires a dedicated worker). I think these degrees of
isolation should cover most use cases. Tags accepts a collections of
key=value pair. These can have arbitrary values and are meant to mean
something to the management system (for example Strimzi). The accepted
values could be configured on the workers by the administrators as
they also operate the management system.

When a connector is created, the runtime tries to place tasks on the
available brokers by matching the placement and tags. If no suitable
workers are found, the tasks stay in unassigned state and the runtime
waits for the management system to create the necessary workers.

We could even envisage to start with only the placement field as in my
opinion this is what brings the most value to users.


On Wed, Oct 18, 2023 at 8:12 PM Greg Harris
<greg.har...@aiven.io.invalid> wrote:
> Hey Sagar,
> Thanks for the questions. I hope you find the answers satisfying:
> 1. This is detailed in the KIP two sentences earlier: "If the
> connect.protocol is set to static, each worker will send it's
> static.connectors and static.tasks to the coordinator during
> rebalances."
> 2. If you have a static worker and a wildcard worker, the static
> worker will be assigned the work preferentially. If the static worker
> goes offline, the wildcard worker will be used as a backup.
> 3. I don't think that new Connect users will make use of this feature,
> but I've added that clarification.
> 4. Users can implement the strategy you're describing by leaving the
> static.connectors field unset. I think that Connect should include
> static.connectors for users that do want to control the placement of
> connectors.
> 5. Yes. Arbitrary here just means that the assignment is not
> influenced by the static assignment.
> 6. Yes. There are no guardrails that ensure that the balance of the
> static assignments is better than the builtin algorithm because we
> have no method to compare them.
> 7. If the whole cluster uses static assignments with each job only
> specified on one worker, the assignments are completely sticky. If a
> worker goes offline, those tasks will be offline until that worker
> comes back.
> If there are multiple workers for a single job, that is specified as
> "arbitrary". We could choose to wait for the delay to elapse or
> immediately reassign it, the KIP as written could be implemented by
> either.
> If the assignment would land on a wildcard worker, that should use
> cooperative rules, so we would need to respect the rebalance delay.
Thanks!
Greg
> Greg
On Wed, Oct 18, 2023 at 6:21 AM Sagar <sagarmeansoc...@gmail.com> wrote:
> >
> > Hi Greg,
> >
> > Thanks for the KIP. I have a few questions/comments:
> >
> > 1) You mentioned that during a rebalance if all the members of the cluster
> > support the static protocol, then it would use the steps outlined in the
> > Proposed Section to do the assignments. In those steps, the leader
> > identifies the static/wildcard jobs. It is not clear to me how the leader
> > makes that distinction? Are we going to enhance the embedded protocol to
> > also write the static jobs that the worker owns as part of it's assignment?
> > As of today, the workers just write only the owned/revoked connectors/tasks
> > in case of incremental and above and only owned connectors/tasks in case of
> > eager.
> >
> > 2) Could you also elaborate this statement a bit:
> >
> > > A cluster with both static and wildcard workers can use wildcard workers
> > > as backups for disaster recovery.
> >
> >
> > I see in the Strimzi proposal you have explained this scenario (the case
> > where shared workers are empty as pods aren't created yet etc IIUC), but
> > reading the KIP doesn't make it too obvious.
> >
> > 3) A nit comment but we should probably call out that there is no need to
> > assign the connector and task to the same worker when configuring them. I
> > guess for new users of Connect this could be a source of confusion. WDYT?
> >
> > 4) Staying on the above, I also wanted to know why do we need to set
> > static.connectors? As such, connectors generally aren't resource intensive.
> > Can we not let connectors be assigned using the same algorithm as today and
> > let only tasks be pinned to workers?
> >
> > 5) In the proposed section you also mentioned that
> >
> > If static assignments are not specified, or at least one worker in the
> > > cluster is not using the static  protocol, they are ignored and the
> > > worker may receive an arbitrary assignment.
> >
> >
> > The arbitrary assignment is driven by the minimum supported protocol right
> > (sessioned/eager)?
> >
> > 6) Just for my understanding, because now the onus is on the connect admin
> > to specify which connector/task should be running where, can there be a
> > situation where there could be an imbalance in terms of number of
> > connectors/tasks running on workers (due to some oversight on part of admin
> > or some bug in the tooling used to generate worker configs if any) ? There
> > could be an imbalance even today as well but the protocol tries to balance
> > as much as it can in an automated fashion.
> >
> > 7) Lastly, in a cluster where all workers support the static protocol,
> > there is still no guarantee that all connectors/tasks would be static. In
> > such a case, what happens when one of the workers is restarted or is being
> > rolled? Would the assignment eventually be sticky when the worker comes
> > back? Does this new protocol also abide by the scheduled rebalance delays?
> > Maybe this has been explained in the KIP but it wasn't clear to me when I
> > read it.
> >
> > Thanks!
> > Sagar.
> >
On Tue, Oct 10, 2023 at 11:22 PM Greg Harris <greg.har...@aiven.io.invalid>
wrote:
> > wrote:
> >
> > > Hi Mickael,
> > >
> > > I'm not Chris but I hope I can still respond to your questions :)
> > >
> > > 1a. This system behaves best when connectors and tasks are known in
> > > advance, but I don't think that is a requirement. I clarified that the
> > > worker configurations are permissive of non-existent jobs, which
> > > allows for bootstrapping an empty cluster and concurrent worker & job
> > > provisioning.
> > > 1b. I think the wildcard suggestion is similar to the rejected
> > > alternative "implement a more complex worker selector...". While we
> > > could add that sort of functionality, I think it is more difficult to
> > > express and reason about. For example, with the current configuration,
> > > users can ensure that at most one job is assigned to a JVM. If a user
> > > wanted to use wildcards with the same constraint, those wildcards
> > > would need to express "at most one of <matching job>".
> > > 1c. The javadoc doesn't say what happens to the additional tasks, but
> > > I think we could start enforcing it if it makes the
> > > fixed-static-assignment use-case more reliable. I have only ever seen
> > > additional tasks emitted due to rather serious bugs in connector
> > > implementations.
> > >
> > > 2. Yes, I've added that information to the proposal. I used the `GET
> > > /connectors/{connector}` API as it doesn't list the task configs, but
> > > it performs the same function.
> > >
> > > 3. Do you mean the resource limits and requests? This is the rejected
> > > alternative "Model per-job resource constraints and measure resource
> > > utilization within a single JVM", let me know if the reasoning there
> > > is not convincing.
> > >
> > > 4. I have not explored rack awareness for Connect in detail. I expect
> > > that rack awareness would require some compromise on the "least
> > > disruption" property of incremental cooperative rebalancing for shared
> > > JVMs. We could discuss that in a future KIP.
> > > As for this proposal, because the management layer is responsible for
> > > placing workers and placing tasks on those workers, it would also be
> > > responsible for implementing rack-awareness in isolated JVMs.
> > >
Thanks a lot!
Greg
> > > Greg
> > >
On Tue, Oct 10, 2023 at 10:08 AM Greg Harris <greg.har...@aiven.io> wrote:
> > > >
> > > > Hey Tom,
> > > >
> > > > Thanks for your comments! I appreciate your insight here and on the
> > > > Strimzi proposal.
> > > >
> > > > 1. Fixed, thanks.
> > > >
> > > > 2. We haven't removed a protocol yet, so workers still announce that
> > > > they can use the old protocol versions, and the group coordinator will
> > > > choose the highest mutually supported protocol version. This would be
> > > > true even if static was not a superset of sessioned.
> > > > For the static feature, having workers in the cluster that do not
> > > > support static assignments will cause the common version to not
> > > > support static assignments, and will mean that workers which announce
> > > > static.connectors and static.tasks may be assigned jobs outside of
> > > > their static assignments, which is called out in the KIP.
> > > > If the version gap is wider, then the other protocol-controlled
> > > > features will also change behavior (i.e. no longer using
> > > > incremental-cooperative-assignments, or not including REST
> > > > authentication headers). There would be no difference to the behavior
> > > > of this feature.
> > > >
> > > > 3. Arbitrary here is specifically meant to say that one of the workers
> > > > will be chosen, but outside implementations should not rely on the
> > > > method of choosing. This allows us to change/improve the assignment
> > > > algorithm in the future without a KIP. The reason I've specified it
> > > > this way is because I'd like to choose the "least loaded" worker, but
> > > > I don't know if that is well-defined or tractable to compute. I expect
> > > > that in practice we will use some heuristics to tie-break, but won't
> > > > be able to ensure that the algorithm will be deterministic and/or
> > > > optimal. If we find an improvement to the heuristics, I think that we
> > > > should be able to implement them without a KIP.
> > > >
> > > > 4. I added a section for "Choosing static assignments" that details
> > > > both the "dynamic" and "fixed" use-cases. With regards to tasks.max:
> > > > The javadoc for the `taskConfigs` method does state that it
> > > > "produc[es] at most {@code maxTasks} configurations," so static
> > > > inference of the task IDs is valid. I think that statically inferring
> > > > the number of tasks will always come with the trade-off that you can
> > > > over-provision task workers, it seems fundamental. I think it's a
> > > > trade-off that users of this feature will make for themselves, and
> > > > both methods will be supported.
> > > >
> > > > 5. No I don't think that it is a safe assumption, as there are
> > > > certainly connectors out there which violate it. For example, sink
> > > > connectors with a heterogeneous set of topics, and source connectors
> > > > which distribute a dynamic heterogeneous workload internally. I
> > > > believe there are a few connectors with a distinct "task-0" which
> > > > consumes more resources than the other tasks which would have some
> > > > index-stability. Index is chosen because it is the only distinguishing
> > > > feature we have without inspecting task configurations, and have
> > > > control over via coordination and assignments.
> > > >
Thanks again!
Greg
> > > > Greg
> > > >
> > > >
On Tue, Oct 10, 2023 at 8:05 AM Mickael Maison 
<mickael.mai...@gmail.com>
wrote: 
> > > > <mickael.mai...@gmail.com>
> > > wrote:
> > > > >
> > > > > Hi Chris,
> > > > >
> > > > > Thanks for the KIP!
> > > > >
> > > > > 1) With this mechanism it seems Connect administrators have to know in
> > > > > advanced the names of connectors that will be running. As far as I can
> > > > > tell, static.connectors and static.tasks will require restarting the
> > > > > worker to change. Also you don't specify if these configurations can
> > > > > accept wildcards or not. This could be useful for tasks as it's hard
> > > > > to know in advance how many tasks will exist, especially as connectors
> > > > > can in theory ignore tasks.max (I wonder if we should make the runtime
> > > > > enforce that config).
> > > > >
> > > > > 2) The KIP does not explicitly mention a mechanism for management
> > > > > systems to figure out what to do. As far as I can tell (and looking at
> > > > > the proposal in Strimzi), these systems will have to poll the REST API
> > > > > to do so. If I understand correctly, management systems would first
> > > > > have to poll the GET /connectors and then GET /connectors/{name}/tasks
> > > > > for each connector, is that right?
> > > > >
> > > > > 3) In the Strimzi proposal you mention adding custom fields to the
> > > > > Connector resource to instruct it what to do. I wonder if these should
> > > > > be added directly in the connector configuration, rather than just
> > > > > exist in the management system. That way all the data necessary to
> > > > > make decisions would be available in the same place, the REST API.
> > > > >
> > > > > 4) This is not something you mention in the KIP and I don't think it's
> > > > > a blocker for this KIP but I wonder if you thought about rack
> > > > > awareness too. I'm bringing it as I think it's a similar concern when
> > > > > it comes to deciding where to assign tasks.
> > > > >
Thanks,
Mickael
> > > > > Mickael
> > > > >
> > > > >
On Tue, Oct 10, 2023 at 8:39 AM Tom Bentley <tbent...@redhat.com>
wrote:
> > > wrote:
> > > > > >
> > > > > > Hi Greg,
> > > > > >
> > > > > > Many thanks for the KIP. Here are a few initial questions
> > > > > >
> > > > > > 1. Incomplete sentence: "But Connect is not situated to be able to
> > > manage
> > > > > > resources directly, as workers are given a fixed "
> > > > > > 2. You explain how sessioned is now a subset of static, but what
> > > happens in
> > > > > > a cluster where some workers are using static and some are using
> > > either
> > > > > > eager or compatible?
> > > > > > 3. "Assign each unassigned static job to a static worker which
> > > specifies
> > > > > > that job, choosing arbitrarily if there are multiple valid workers."
> > > I
> > > > > > think there might be less ambiguous words than "arbitrary" to
> > > specify this
> > > > > > behaviour. Hashing the task name would _appear_ pretty arbitrary to
> > > the
> > > > > > user, but would be deterministic. Picking at random would be
> > > > > > non-deterministic. Even better if you have a rationale.
> > > > > > 4. You don't describe how a user, or automated system, starting with
> > > a set
> > > > > > of connectors, should find out the tasks that they want to run. This
> > > > > > relates to the contract of
> > > > > > org.apache.kafka.connect.connector.Connector#taskConfigs(int
> > > maxTasks).
> > > > > > AFAIK (please correct me if I'm wrong, because it's a long time
> > > since I
> > > > > > looked at this code) there's nothing that validates that the
> > > returned list
> > > > > > has at most the `maxTasks` and connectors can, of course, return
> > > fewer than
> > > > > > that many tasks. So without deeper knowledge of a particular
> > > connector it's
> > > > > > not clear to the user/operator how to configure their static workers
> > > and
> > > > > > static assignments.
> > > > > > 5. Is there a lurking assumption that task indices are stable? E.g.
> > > that
> > > > > > the task with index 3 will always be the resource-intensive one. I
> > > can see
> > > > > > that that would often be a reliable assumption, but it's not clear
> > > to me
> > > > > > that it is always the case.
> > > > > >
Thanks again,

Tom
> > > > > >
> > > > > > Tom
> > > > > >
On Fri, 6 Oct 2023 at 12:36, Greg Harris
<greg.har...@aiven.io.invalid>
wrote:
> > > <greg.har...@aiven.io.invalid>
> > > > > > wrote:
> > > > > >
> > > > > > > Hey everyone!
> > > > > > >
> > > > > > > I'd like to propose an improvement to Kafka Connect's scheduling
> > > > > > > algorithm, with the goal of improving the operability of connect
> > > > > > > clusters through resource isolation.
> > > > > > >
> > > > > > > Find the KIP here:
> > > > > > >
> > > > > > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-987%3A+Connect+Static+Assignments
> > > > > > >
> > > > > > > This feature is primarily intended to be consumed by cluster
> > > > > > > management systems, so I've opened a sister proposal in the 
> > > > > > > Strimzi
> > > > > > > project that uses this feature to provide user-facing resource
> > > > > > > isolation. The base feature is generic, so it is usable manually
> > > and
> > > > > > > with cluster management systems other than Strimzi.
> > > > > > >
> > > > > > > Find the proposal here:
> > > https://github.com/strimzi/proposals/pull/96
> > > > > > >
Thanks!
Greg Harris
> > > > > > > Greg Harris
> > > > > > >
> > > > > > >
> > >

