[ 
https://issues.apache.org/jira/browse/KAFKA-6718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16999090#comment-16999090
 ] 

Levani Kokhreidze edited comment on KAFKA-6718 at 12/18/19 12:29 PM:
---------------------------------------------------------------------

Thanks for the ideas [~vvcephei] will definitely look into it. My initial idea 
was around how Kubernetes [affinity rules|#affinity-and-anti-affinity]] work. 
It's similar to the elasticsearch mechanisms you have provided (tbh, I think 
most of the distributed systems share same idea behind rack awareness and 
standby tasks). I think with this task we have a possibility to let users 
specify something similar to affinity/anti-affinity rules based on some 
interface implementation which can have some default implementation out of the 
box (rack awareness maybe by default) but maybe can be extended to some other 
metrics? Would be amazing to have disk space awareness as well, but since disk 
space can't be static value, like rack.id it maybe challenging to implement it, 
but definitely it would be useful to have such feature.

Actually, required disk space can be estimated for standby tasks, so in theory 
it can be encoded (taking disk space as an example can be any other reasonable 
metric as well) to the assignment so interface that defines affinity rule, 
would get Standby tasks with additional encoded metrics and either accept the 
tasks, or reject them. If rejected, it will be routed to other Kafka Streams 
instance, until succeeds. If appropriate Kafka Streams instance won't be found 
that corresponds to `num.stanby.replicas` config, we can log warning like it's 
right now when num(Kafka Streams instance) is less than `num.stanby.replicas`. 
This is rough idea, which may not work at all since I've not looked in details 
how standby tasks work atm in Kafka Streams :) Sorry if this all doesn't make 
sense. But would definitely love to try to make this interface as extendable as 
possible so we won't be limited to only rack awareness.


was (Author: lkokhreidze):
Thanks for the ideas [~vvcephei] will definitely look into it. My initial idea 
was around how Kubernetes [affinity rules|#affinity-and-anti-affinity]] work. 
It's similar to the elasticsearch mechanisms you have provided (tbh, I think 
most of the distributed systems share same idea behind rack awareness and 
standby tasks). I think with this task we have a possibility to let users 
specify something similar to affinity/anti-affinity rules based on some 
interface implementation which can have some default implementation out of the 
box (rack awareness maybe by default) but maybe can be extended to some other 
metrics? Would be amazing to have disk space awareness as well, but since disk 
space can't be static value, like rack.id it maybe challenging to implement it, 
but definitely it would be useful to have such feature. What's your take on it?

> Rack Aware Stand-by Task Assignment for Kafka Streams
> -----------------------------------------------------
>
>                 Key: KAFKA-6718
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6718
>             Project: Kafka
>          Issue Type: New Feature
>          Components: streams
>            Reporter: Deepak Goyal
>            Priority: Major
>              Labels: needs-kip
>
> |Machines in data centre are sometimes grouped in racks. Racks provide 
> isolation as each rack may be in a different physical location and has its 
> own power source. When tasks are properly replicated across racks, it 
> provides fault tolerance in that if a rack goes down, the remaining racks can 
> continue to serve traffic.
>   
>  This feature is already implemented at Kafka 
> [KIP-36|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment]
>  but we needed similar for task assignments at Kafka Streams Application 
> layer. 
>   
>  This features enables replica tasks to be assigned on different racks for 
> fault-tolerance.
>  NUM_STANDBY_REPLICAS = x
>  totalTasks = x+1 (replica + active)
>  # If there are no rackID provided: Cluster will behave rack-unaware
>  # If same rackId is given to all the nodes: Cluster will behave rack-unaware
>  # If (totalTasks <= number of racks), then Cluster will be rack aware i.e. 
> each replica task is each assigned to a different rack.
>  # Id (totalTasks > number of racks), then it will first assign tasks on 
> different racks, further tasks will be assigned to least loaded node, cluster 
> wide.|
> We have added another config in StreamsConfig called "RACK_ID_CONFIG" which 
> helps StickyPartitionAssignor to assign tasks in such a way that no two 
> replica tasks are on same rack if possible.
>  Post that it also helps to maintain stickyness with-in the rack.|



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to