Hello all,

I’ve updated KIP-708 [1] to reflect the latest discussion outcomes. 
I’m looking forward to your feedback.

Regards,
Levani

[1] - 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+awarness+for+Kafka+Streams

> On 2. Feb 2021, at 22:03, Levani Kokhreidze <levani.co...@gmail.com> wrote:
> 
> Hi John.
> 
> Thanks a lot for this detailed analysis! 
> Yes, that is what I had in mind as well. 
> I also like that idea of having “task.assignment.awareness” configuration
> to tell which instance tags can be used for rack awareness.
> I may borrow it for this KIP if you don’t mind :) 
> 
> Thanks again John for this discussion, it’s really valuable.
> 
> I’ll update the proposal and share it once again in this discussion thread.
> 
> Regards,
> Levani 
> 
>> On 2. Feb 2021, at 18:47, John Roesler <vvcep...@apache.org 
>> <mailto:vvcep...@apache.org>> wrote:
>> 
>> Hi Levani,
>> 
>> 1. Thanks for the details.
>> 
>> I figured it must be something like this two-dimensional definition of 
>> "rack".
>> 
>> It does seem like, if we make the config take a list of tags, we can define
>> the semantics to be that the system will make a best effort to distribute
>> the standbys over each rack dimension.
>> 
>> In your example, there are two clusters and three AZs. The example
>> configs would be:
>> 
>> Node 1:
>> instance.tag.cluster: K8s_Cluster1
>> instance.tag.zone: eu-central-1a
>> task.assignment.awareness: cluster,zone
>> 
>> Node 2:
>> instance.tag.cluster: K8s_Cluster1
>> instance.tag.zone: eu-central-1b
>> task.assignment.awareness: cluster,zone
>> 
>> Node 3:
>> instance.tag.cluster: K8s_Cluster1
>> instance.tag.zone: eu-central-1c
>> task.assignment.awareness: cluster,zone
>> 
>> Node 4:
>> instance.tag.cluster: K8s_Cluster2
>> instance.tag.zone: eu-central-1a
>> task.assignment.awareness: cluster,zone
>> 
>> Node 5:
>> instance.tag.cluster: K8s_Cluster2
>> instance.tag.zone: eu-central-1b
>> task.assignment.awareness: cluster,zone
>> 
>> Node 6:
>> instance.tag.cluster: K8s_Cluster2
>> instance.tag.zone: eu-central-1c
>> task.assignment.awareness: cluster,zone
>> 
>> 
>> Now, if we have a task 0_0 with an active and two replicas,
>> there are three total copies of the task to distribute over:
>> * 6 instances
>> * 2 clusters
>> * 3 zones
>> 
>> There is a constraint that we _cannot_ assign two copies of a task
>> to a single instance, but it seems like the default rack awareness
>> would permit us to assign two copies of a task to a rack, if (and only
>> if) the number of copies is greater than the number of racks.
>> 
>> So, the assignment we would get is like this:
>> * assigned to three different instances
>> * one copy in each of zone a, b, and c
>> * two copies in one cluster and one in the other cluster
>> 
>> For example, we might have 0_0 assigned to:
>> * Node 1 (cluster 1, zone a)
>> * Node 5 (cluster 2, zone b)
>> * Node 3 (cluster 1, zone c)
>> 
>> Is that what you were also thinking?
>> 
>> Thanks,
>> -John
>> 
>> On Tue, Feb 2, 2021, at 02:24, Levani Kokhreidze wrote:
>>> Hi John,
>>> 
>>> 1. Main reason was that it seemed easier change compared to having 
>>> multiple tags assigned to each host.
>>> 
>>> ---
>>> 
>>> Answering your question what use-case I have in mind:
>>> Lets say we have two Kubernetes clusters running the same Kafka Streams 
>>> application. 
>>> And each Kubernetes cluster is spanned across multiple AZ. 
>>> So the setup overall looks something like this:
>>> 
>>> K8s_Cluster1 [eu-central-1a, eu-central-1b, eu-central-1c]
>>> K8s_Cluster2 [eu-central-1a, eu-central-1b, eu-central-1c]
>>> 
>>> Now, if Kafka Streams application is launched in K8s_Clister1: 
>>> eu-central-1a,
>>> ideally I would want standby task to be created in the different K8s 
>>> cluster and region.
>>> So in this example it can be K8s_Cluster2: [eu-central-1b, 
>>> eu-central-1c]
>>> 
>>> But giving it a bit more thought, this can be implemented if we change 
>>> semantics of “tags” a bit.
>>> So instead of doing full match with tags, we can do iterative matching 
>>> and it should work.
>>> (If this is what you had in mind, apologies for the misunderstanding).
>>> 
>>> If we consider the same example as mentioned above, for the active task 
>>> we would
>>> have following tags: [K8s_Cluster1, eu-central-1a]. In order to 
>>> distribute standby task
>>> in the different K8s cluster, plus in the different AWS region, standby 
>>> task assignment 
>>> algorithm can compare each tag by index. So steps would be something 
>>> like:
>>> 
>>> // this will result in selecting client in the different K8s cluster
>>> 1. clientsInDifferentCluster = (tagsOfActiveTask[0] != allClientTags[0])
>>> // this will result in selecting the client in different AWS region
>>> 2. selectedClientForStandbyTask = (tagsOfActiveTask[1] != 
>>> clientsInDifferentCluster[1] )
>>> 
>>> WDYT?
>>> 
>>> If you agree with the use-case I’ve mentioned, the pluggable assignor 
>>> can be differed to another KIP, yes.
>>> As it won’t be required for this KIP and use-cases I had in mind to 
>>> work.
>>> 
>>> Regards,
>>> Levani 
>>> 
>>> 
>>>> On 2. Feb 2021, at 07:55, John Roesler <vvcep...@apache.org 
>>>> <mailto:vvcep...@apache.org>> wrote:
>>>> 
>>>> Hello Levani,
>>>> 
>>>> Thanks for the reply. 
>>>> 
>>>> 1. Interesting; why did you change your mind?
>>>> 
>>>> I have a gut feeling that we can achieve pretty much any rack awareness 
>>>> need that people have by using purely config, which is obviously much 
>>>> easier to use. But if you had a case in mind where this wouldn’t work, it 
>>>> would be good to know. 
>>>> 
>>>> In fact, if that is true, then perhaps you could just defer the whole idea 
>>>> of a pluggable interface (point 2) to a separate KIP. I do think a 
>>>> pluggable assignor would be extremely valuable, but it might be nice to 
>>>> cut the scope of KIP-708 if just a config will suffice.
>>>> 
>>>> What do you think?
>>>> Thanks,
>>>> John
>>>> 
>>>> 
>>>> On Mon, Feb 1, 2021, at 06:07, Levani Kokhreidze wrote:
>>>>> Hi John,
>>>>> 
>>>>> Thanks a lot for thorough feedback, it’s really valuable.
>>>>> 
>>>>> 1. Agree with this. Had the same idea initially.
>>>>> We can set some upper limit in terms of what’s 
>>>>> the max number of tags users can set to make 
>>>>> sure it’s not overused. By default, we can create 
>>>>> standby tasks where tags are different from active task (full match). 
>>>>> This should mimic default rack awareness behaviour.
>>>>> 
>>>>> 2. I like the idea and I’d be happy to work on 
>>>>> refactoring TaskAssignor to accommodate rack awareness use-case. 
>>>>> When I was going through the code, it felt way more natural 
>>>>> to use pluggable TaskAssignor for achieving rack awareness 
>>>>> instead of introducing new interface and contract. 
>>>>> But I thought approach mentioned in the KIP is simpler so 
>>>>> decided to move forward with it as an initial proposal :). 
>>>>> But I agree with you, it will be much better if we can have 
>>>>> TaskAssignor as pluggable interface users can use.
>>>>> One potential challenge I see with this is that, if we just let
>>>>> users implement TaskAssignor in its current form, we will be forcing
>>>>> users to implement functionality for active task assignment, as well as
>>>>> standby task assignment. This feels like not very clear contract, 
>>>>> because with
>>>>> just TaskAssignor interface it’s not really clear they one needs to 
>>>>> allocate 
>>>>> standby tasks as well. We can enforce it on some level with the return 
>>>>> object
>>>>> You’ve mentioned TaskAssignor#assign has to return, but still feels 
>>>>> error prone.
>>>>> In addition, I suspect in most of the cases users would want
>>>>> to control standby task assignment and leave active task assignment as 
>>>>> is. 
>>>>> To make implementation of standby task assignment easier for users, 
>>>>> what if
>>>>> we decouple active and standby task assignment from the `TaskAssignor`?
>>>>> Idea I have in mind is to split TaskAssignor into ActiveTaskAssignor 
>>>>> and StandbyTaskAssignor
>>>>> and let users add their own implementation for them separately if they 
>>>>> like via config.
>>>>> 
>>>>> If this approach sounds reasonable, I’ll work on updating KIP this week. 
>>>>> 
>>>>> Thanks,
>>>>> Levani
>>>>> 
>>>>>> On 28. Jan 2021, at 19:20, John Roesler <vvcep...@apache.org 
>>>>>> <mailto:vvcep...@apache.org>> wrote:
>>>>>> 
>>>>>> Thanks, Levani!
>>>>>> 
>>>>>> I was reflecting more on your KIP last night.
>>>>>> 
>>>>>> One thing I should mention is that I have previously used
>>>>>> the rack awareness feature of Elasticsearch, and found it to
>>>>>> be pretty intuitive and also capable of what we needed in
>>>>>> our AWS clusters. As you look at related work, you might
>>>>>> take ES into consideration.
>>>>>> 
>>>>>> I was also had some thoughts about your proposal.
>>>>>> 
>>>>>> 1. I'm wondering if we instead allow people to add arbitrary
>>>>>> tags to each host, and then have a configuration to specify
>>>>>> a combination of tags to use for rack awareness. This seems
>>>>>> easier to manage than for the use case you anticipate where
>>>>>> people would concatenate rackId = (clusterId + AZ), and then
>>>>>> have to parse the rackId back out to compute the assignment.
>>>>>> 
>>>>>> 2. About the proposed RackAwareStandbyTaskAssignor, I'm
>>>>>> wondering if we can change the level of abstraction a little
>>>>>> bit and capture even more value here. One thing we wanted to
>>>>>> do in KIP-441, but decided to cut from the scope, was to
>>>>>> define a public TaskAssignor interface so that people can
>>>>>> plug in the whole task assignment algorithm.
>>>>>> 
>>>>>> In fact, there is already an internal config and interface
>>>>>> for this (`internal.task.assignor.class`:
>>>>>> `org.apache.kafka.streams.processor.internals.assignment.Tas
>>>>>> kAssignor`).
>>>>>> 
>>>>>> We kept that interface and config internal because the
>>>>>> current TaskAssignor interface has a number of flaws, but if
>>>>>> we correct those flaws, we can offer a nice public interface
>>>>>> that people can use to control the standby allocation, but
>>>>>> also active task allocation, based on the tags I suggested
>>>>>> in (1).
>>>>>> 
>>>>>> I don't think we need too much work to refactor
>>>>>> TaskAssignor, the main problems are that the assign method
>>>>>> mutates its input and that it doesn't expose the full
>>>>>> metadata from the cluster members. Therefore, if you like
>>>>>> this idea, we should propose to refactor TaskAssignor with:
>>>>>> * input: an immutable description of the cluster, including
>>>>>> current lags of all stateful tasks and current stateless
>>>>>> task assignments, as well as metadata for each host.
>>>>>> * output: an object describing the new assignment as well as
>>>>>> a flag on whether to schedule a followup probing rebalance.
>>>>>> 
>>>>>> An even more stretchy stretch goal would be to include
>>>>>> metadata of the brokers, which could be used to achieve
>>>>>> higher levels of rack awareness. For example, we could co-
>>>>>> locate tasks in the same "rack" (AZ) as the partition leader
>>>>>> for their input or output topics, to minimize cross-AZ
>>>>>> traffic. I'm not sure to what extent clients can learn the
>>>>>> relevant broker metadata, so this stretch might not be
>>>>>> currently feasible, but as long as we design the
>>>>>> TaskAssignor for extensibility, we can do something like
>>>>>> this in the future.
>>>>>> 
>>>>>> Thanks again for this proposal, I hope the above is more
>>>>>> inspiring than annoying :)
>>>>>> 
>>>>>> I really think your KIP is super high value in whatever form
>>>>>> you ultimately land on.
>>>>>> 
>>>>>> 
>>>>>> Thanks,
>>>>>> John
>>>>>> 
>>>>>> On Thu, 2021-01-28 at 13:08 +0200, Levani Kokhreidze wrote:
>>>>>>> Hi John
>>>>>>> 
>>>>>>> Thanks for the feedback (and for the great work on KIP441 :) ). 
>>>>>>> Makes sense, will add a section in the KIP explaining rack awarenesses 
>>>>>>> on high level and how it’s implemented in the different distributed 
>>>>>>> systems.
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> Levani
>>>>>>> 
>>>>>>>> On 27. Jan 2021, at 16:07, John Roesler <vvcep...@apache.org 
>>>>>>>> <mailto:vvcep...@apache.org>> wrote:
>>>>>>>> 
>>>>>>>> Hi Levani,
>>>>>>>> 
>>>>>>>> Thanks for this KIP! I think this is really high value; it was 
>>>>>>>> something I was disappointed I didn’t get to do as part of KIP-441.
>>>>>>>> 
>>>>>>>> Rack awareness is a feature provided by other distributed systems as 
>>>>>>>> well. I wonder if your KIP could devote a section to summarizing what 
>>>>>>>> rack awareness looks like in other distributed systems, to help us put 
>>>>>>>> this design in context. 
>>>>>>>> 
>>>>>>>> Thanks!
>>>>>>>> John
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Tue, Jan 26, 2021, at 16:46, Levani Kokhreidze wrote:
>>>>>>>>> Hello all,
>>>>>>>>> 
>>>>>>>>> I’d like to start discussion on KIP-708 [1] that aims to introduce 
>>>>>>>>> rack 
>>>>>>>>> aware standby task distribution in Kafka Streams.
>>>>>>>>> In addition to changes mentioned in the KIP, I’d like to get some 
>>>>>>>>> ideas 
>>>>>>>>> on additional change I have in mind. 
>>>>>>>>> Assuming KIP moves forward, I was wondering if it makes sense to 
>>>>>>>>> configure Kafka Streams consumer instances with the rack ID passed 
>>>>>>>>> with 
>>>>>>>>> the new StreamsConfig#RACK_ID_CONFIG property. 
>>>>>>>>> In practice, that would mean that when “rack.id <http://rack.id/> 
>>>>>>>>> <http://rack.id/ <http://rack.id/>>” is 
>>>>>>>>> configured in Kafka Streams, it will automatically translate into 
>>>>>>>>> ConsumerConfig#CLIENT_RACK_ID config for all the KafkaConsumer 
>>>>>>>>> clients 
>>>>>>>>> that is used by Kafka Streams internally.
>>>>>>>>> 
>>>>>>>>> [1] 
>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
>>>>>>>>>  
>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor>
>>>>>>>>>  
>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor
>>>>>>>>>  
>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-708:+Rack+aware+Kafka+Streams+with+pluggable+StandbyTask+assignor>>
>>>>>>>>> 
>>>>>>>>> P.S 
>>>>>>>>> I have draft PR ready, if it helps the discussion moving forward, I 
>>>>>>>>> can 
>>>>>>>>> provide the draft PR link in this thread.
>>>>>>>>> 
>>>>>>>>> Regards, 
>>>>>>>>> Levani
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>> 
>>> 
> 

Reply via email to