Hi Levani,

1. Thanks for the details.

I figured it must be something like this two-dimensional definition of "rack".

It does seem like, if we make the config take a list of tags, we can define
the semantics to be that the system will make a best effort to distribute
the standbys over each rack dimension.

In your example, there are two clusters and three AZs. The example
configs would be:

Node 1:
instance.tag.cluster: K8s_Cluster1
instance.tag.zone: eu-central-1a
task.assignment.awareness: cluster,zone

Node 2:
instance.tag.cluster: K8s_Cluster1
instance.tag.zone: eu-central-1b
task.assignment.awareness: cluster,zone

Node 3:
instance.tag.cluster: K8s_Cluster1
instance.tag.zone: eu-central-1c
task.assignment.awareness: cluster,zone

Node 4:
instance.tag.cluster: K8s_Cluster2
instance.tag.zone: eu-central-1a
task.assignment.awareness: cluster,zone

Node 5:
instance.tag.cluster: K8s_Cluster2
instance.tag.zone: eu-central-1b
task.assignment.awareness: cluster,zone

Node 6:
instance.tag.cluster: K8s_Cluster2
instance.tag.zone: eu-central-1c
task.assignment.awareness: cluster,zone


Now, if we have a task 0_0 with an active and two replicas,
there are three total copies of the task to distribute over:
* 6 instances
* 2 clusters
* 3 zones

There is a constraint that we _cannot_ assign two copies of a task
to a single instance, but it seems like the default rack awareness
would permit us to assign two copies of a task to a rack, if (and only
if) the number of copies is greater than the number of racks.

So, the assignment we would get is like this:
* assigned to three different instances
* one copy in each of zone a, b, and c
* two copies in one cluster and one in the other cluster

For example, we might have 0_0 assigned to:
* Node 1 (cluster 1, zone a)
* Node 5 (cluster 2, zone b)
* Node 3 (cluster 1, zone c)

Is that what you were also thinking?

Thanks,
-John

On Tue, Feb 2, 2021, at 02:24, Levani Kokhreidze wrote:
> Hi John,
> 
> 1. Main reason was that it seemed easier change compared to having 
> multiple tags assigned to each host.
> 
> ---
> 
> Answering your question what use-case I have in mind:
> Lets say we have two Kubernetes clusters running the same Kafka Streams 
> application. 
> And each Kubernetes cluster is spanned across multiple AZ. 
> So the setup overall looks something like this:
> 
> K8s_Cluster1 [eu-central-1a, eu-central-1b, eu-central-1c]
> K8s_Cluster2 [eu-central-1a, eu-central-1b, eu-central-1c]
> 
> Now, if Kafka Streams application is launched in K8s_Clister1: 
> eu-central-1a,
> ideally I would want standby task to be created in the different K8s 
> cluster and region.
> So in this example it can be K8s_Cluster2: [eu-central-1b, 
> eu-central-1c]
> 
> But giving it a bit more thought, this can be implemented if we change 
> semantics of “tags” a bit.
> So instead of doing full match with tags, we can do iterative matching 
> and it should work.
> (If this is what you had in mind, apologies for the misunderstanding).
> 
> If we consider the same example as mentioned above, for the active task 
> we would
> have following tags: [K8s_Cluster1, eu-central-1a]. In order to 
> distribute standby task
> in the different K8s cluster, plus in the different AWS region, standby 
> task assignment 
> algorithm can compare each tag by index. So steps would be something 
> like:
> 
>  // this will result in selecting client in the different K8s cluster
> 1. clientsInDifferentCluster = (tagsOfActiveTask[0] != allClientTags[0])
>  // this will result in selecting the client in different AWS region
> 2. selectedClientForStandbyTask = (tagsOfActiveTask[1] != 
> clientsInDifferentCluster[1] )
> 
> WDYT?
> 
> If you agree with the use-case I’ve mentioned, the pluggable assignor 
> can be differed to another KIP, yes.
> As it won’t be required for this KIP and use-cases I had in mind to 
> work.
> 
> Regards,
> Levani 
> 
> 
> > On 2. Feb 2021, at 07:55, John Roesler <vvcep...@apache.org> wrote:
> > 
> > Hello Levani,
> > 
> > Thanks for the reply. 
> > 
> > 1. Interesting; why did you change your mind?
> > 
> > I have a gut feeling that we can achieve pretty much any rack awareness 
> > need that people have by using purely config, which is obviously much 
> > easier to use. But if you had a case in mind where this wouldn’t work, it 
> > would be good to know. 
> > 
> > In fact, if that is true, then perhaps you could just defer the whole idea 
> > of a pluggable interface (point 2) to a separate KIP. I do think a 
> > pluggable assignor would be extremely valuable, but it might be nice to cut 
> > the scope of KIP-708 if just a config will suffice.
> > 
> > What do you think?
> > Thanks,
> > John
> > 
> > 
> > On Mon, Feb 1, 2021, at 06:07, Levani Kokhreidze wrote:
> >> Hi John,
> >> 
> >> Thanks a lot for thorough feedback, it’s really valuable.
> >> 
> >> 1. Agree with this. Had the same idea initially.
> >> We can set some upper limit in terms of what’s 
> >> the max number of tags users can set to make 
> >> sure it’s not overused. By default, we can create 
> >> standby tasks where tags are different from active task (full match). 
> >> This should mimic default rack awareness behaviour.
> >> 
> >> 2. I like the idea and I’d be happy to work on 
> >> refactoring TaskAssignor to accommodate rack awareness use-case. 
> >> When I was going through the code, it felt way more natural 
> >> to use pluggable TaskAssignor for achieving rack awareness 
> >> instead of introducing new interface and contract. 
> >> But I thought approach mentioned in the KIP is simpler so 
> >> decided to move forward with it as an initial proposal :). 
> >> But I agree with you, it will be much better if we can have 
> >> TaskAssignor as pluggable interface users can use.
> >> One potential challenge I see with this is that, if we just let
> >> users implement TaskAssignor in its current form, we will be forcing
> >> users to implement functionality for active task assignment, as well as
> >> standby task assignment. This feels like not very clear contract, 
> >> because with
> >> just TaskAssignor interface it’s not really clear they one needs to 
> >> allocate 
> >> standby tasks as well. We can enforce it on some level with the return 
> >> object
> >> You’ve mentioned TaskAssignor#assign has to return, but still feels 
> >> error prone.
> >> In addition, I suspect in most of the cases users would want
> >> to control standby task assignment and leave active task assignment as 
> >> is. 
> >> To make implementation of standby task assignment easier for users, 
> >> what if
> >> we decouple active and standby task assignment from the `TaskAssignor`?
> >> Idea I have in mind is to split TaskAssignor into ActiveTaskAssignor 
> >> and StandbyTaskAssignor
> >> and let users add their own implementation for them separately if they 
> >> like via config.
> >> 
> >> If this approach sounds reasonable, I’ll work on updating KIP this week. 
> >> 
> >> Thanks,
> >> Levani
> >> 
> >>> On 28. Jan 2021, at 19:20, John Roesler <vvcep...@apache.org> wrote:
> >>> 
> >>> Thanks, Levani!
> >>> 
> >>> I was reflecting more on your KIP last night.
> >>> 
> >>> One thing I should mention is that I have previously used
> >>> the rack awareness feature of Elasticsearch, and found it to
> >>> be pretty intuitive and also capable of what we needed in
> >>> our AWS clusters. As you look at related work, you might
> >>> take ES into consideration.
> >>> 
> >>> I was also had some thoughts about your proposal.
> >>> 
> >>> 1. I'm wondering if we instead allow people to add arbitrary
> >>> tags to each host, and then have a configuration to specify
> >>> a combination of tags to use for rack awareness. This seems
> >>> easier to manage than for the use case you anticipate where
> >>> people would concatenate rackId = (clusterId + AZ), and then
> >>> have to parse the rackId back out to compute the assignment.
> >>> 
> >>> 2. About the proposed RackAwareStandbyTaskAssignor, I'm
> >>> wondering if we can change the level of abstraction a little
> >>> bit and capture even more value here. One thing we wanted to
> >>> do in KIP-441, but decided to cut from the scope, was to
> >>> define a public TaskAssignor interface so that people can
> >>> plug in the whole task assignment algorithm.
> >>> 
> >>> In fact, there is already an internal config and interface
> >>> for this (`internal.task.assignor.class`:
> >>> `org.apache.kafka.streams.processor.internals.assignment.Tas
> >>> kAssignor`).
> >>> 
> >>> We kept that interface and config internal because the
> >>> current TaskAssignor interface has a number of flaws, but if
> >>> we correct those flaws, we can offer a nice public interface
> >>> that people can use to control the standby allocation, but
> >>> also active task allocation, based on the tags I suggested
> >>> in (1).
> >>> 
> >>> I don't think we need too much work to refactor
> >>> TaskAssignor, the main problems are that the assign method
> >>> mutates its input and that it doesn't expose the full
> >>> metadata from the cluster members. Therefore, if you like
> >>> this idea, we should propose to refactor TaskAssignor with:
> >>> * input: an immutable description of the cluster, including
> >>> current lags of all stateful tasks and current stateless
> >>> task assignments, as well as metadata for each host.
> >>> * output: an object describing the new assignment as well as
> >>> a flag on whether to schedule a followup probing rebalance.
> >>> 
> >>> An even more stretchy stretch goal would be to include
> >>> metadata of the brokers, which could be used to achieve
> >>> higher levels of rack awareness. For example, we could co-
> >>> locate tasks in the same "rack" (AZ) as the partition leader
> >>> for their input or output topics, to minimize cross-AZ
> >>> traffic. I'm not sure to what extent clients can learn the
> >>> relevant broker metadata, so this stretch might not be
> >>> currently feasible, but as long as we design the
> >>> TaskAssignor for extensibility, we can do something like
> >>> this in the future.
> >>> 
> >>> Thanks again for this proposal, I hope the above is more
> >>> inspiring than annoying :)
> >>> 
> >>> I really think your KIP is super high value in whatever form
> >>> you ultimately land on.
> >>> 
> >>> 
> >>> Thanks,
> >>> John
> >>> 
> >>> On Thu, 2021-01-28 at 13:08 +0200, Levani Kokhreidze wrote:
> >>>> Hi John
> >>>> 
> >>>> Thanks for the feedback (and for the great work on KIP441 :) ). 
> >>>> Makes sense, will add a section in the KIP explaining rack awarenesses 
> >>>> on high level and how it’s implemented in the different distributed 
> >>>> systems.
> >>>> 
> >>>> Thanks,
> >>>> Levani
> >>>> 
> >>>>> On 27. Jan 2021, at 16:07, John Roesler <vvcep...@apache.org> wrote:
> >>>>> 
> >>>>> Hi Levani,
> >>>>> 
> >>>>> Thanks for this KIP! I think this is really high value; it was 
> >>>>> something I was disappointed I didn’t get to do as part of KIP-441.
> >>>>> 
> >>>>> Rack awareness is a feature provided by other distributed systems as 
> >>>>> well. I wonder if your KIP could devote a section to summarizing what 
> >>>>> rack awareness looks like in other distributed systems, to help us put 
> >>>>> this design in context. 
> >>>>> 
> >>>>> Thanks!
> >>>>> John
> >>>>> 
> >>>>> 
> >>>>> On Tue, Jan 26, 2021, at 16:46, Levani Kokhreidze wrote:
> >>>>>> Hello all,
> >>>>>> 
> >>>>>> I’d like to start discussion on KIP-708 [1] that aims to introduce 
> >>>>>> rack 
> >>>>>> aware standby task distribution in Kafka Streams.
> >>>>>> In addition to changes mentioned in the KIP, I’d like to get some 
> >>>>>> ideas 
> >>>>>> on additional change I have in mind. 
> >>>>>> Assuming KIP moves forward, I was wondering if it makes sense to 
> >>>>>> configure Kafka Streams consumer instances with the rack ID passed 
> >>>>>> with 
> >>>>>> the new StreamsConfig#RACK_ID_CONFIG property. 
> >>>>>> In practice, that would mean that when “rack.id <http://rack.id/>” is 
> >>>>>> configured in Kafka Streams, it will automatically translate into 
> >>>>>> ConsumerConfig#CLIENT_RACK_ID config for all the KafkaConsumer clients 
> >>>>>> that is used by Kafka Streams internally.
> >>>>>> 
> >>>>>> [1] 
> >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
> >>>>>>  
> >>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor>
> >>>>>> 
> >>>>>> P.S 
> >>>>>> I have draft PR ready, if it helps the discussion moving forward, I 
> >>>>>> can 
> >>>>>> provide the draft PR link in this thread.
> >>>>>> 
> >>>>>> Regards, 
> >>>>>> Levani
> >>>> 
> >>> 
> >>> 
> >> 
> >> 
> 
>

Reply via email to