lihaosky commented on code in PR #14150: URL: https://github.com/apache/kafka/pull/14150#discussion_r1284910264
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java: ########## @@ -267,25 +267,46 @@ public static class AssignmentConfigs { public final int numStandbyReplicas; public final long probingRebalanceIntervalMs; public final List<String> rackAwareAssignmentTags; + public final Integer rackAwareAssignmentTrafficCost; Review Comment: It can be null and we will use default in assignor. It's related to your questions in https://github.com/apache/kafka/pull/14139#discussion_r1284910181 ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -890,6 +914,22 @@ public class StreamsConfig extends AbstractConfig { in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2), Importance.MEDIUM, PROCESSING_GUARANTEE_DOC) + .define(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, + Type.STRING, + RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, + in(RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC), + Importance.MEDIUM, + RACK_AWARE_ASSIGNMENT_STRATEGY_DOC) + .define(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, + Type.INT, + null, Review Comment: The default value is different for `HAAssignor` and `StickyAssignor`, so the null here means use default in assignor. If it's set, it would override assignor's default. I guess we can guide to set relative values. Maybe we can mention default in HAAssignor is 10, 1 and in sticky assignor, it will be 1, 10... ########## streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java: ########## @@ -1378,6 +1380,48 @@ public void shouldNotEnableAnyOptimizationsWithNoOptimizationConfig() { assertEquals(0, configs.size()); } + @Test + public void shouldReturnDefaultRackAwareAssignmentConfig() { + final String strategy = streamsConfig.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG); + assertEquals("NONE", strategy); + } + + @Test + public void shouldtSetMinTrafficRackAwareAssignmentConfig() { + props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC); + assertEquals("MIN_TRAFFIC", new StreamsConfig(props).getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG)); Review Comment: It will use default cost in assignor -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org