[GitHub] [kafka] mjsax commented on a diff in pull request #14150: KAFKA-15022: [6/N] add rack aware assignor configs and update standby optimizer
mjsax commented on code in PR #14150: URL: https://github.com/apache/kafka/pull/14150#discussion_r1286245745 ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -890,12 +914,28 @@ public class StreamsConfig extends AbstractConfig { in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2), Importance.MEDIUM, PROCESSING_GUARANTEE_DOC) +.define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, +Type.INT, +null, Review Comment: ```suggestion null, ``` ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -890,12 +914,28 @@ public class StreamsConfig extends AbstractConfig { in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2), Importance.MEDIUM, PROCESSING_GUARANTEE_DOC) +.define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, +Type.INT, +null, +Importance.MEDIUM, Review Comment: ```suggestion Importance.MEDIUM, ``` ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -890,12 +914,28 @@ public class StreamsConfig extends AbstractConfig { in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2), Importance.MEDIUM, PROCESSING_GUARANTEE_DOC) +.define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, +Type.INT, +null, +Importance.MEDIUM, +RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_DOC) +.define(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, +Type.STRING, +RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, +in(RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC), +Importance.MEDIUM, +RACK_AWARE_ASSIGNMENT_STRATEGY_DOC) .define(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, Type.LIST, Collections.emptyList(), atMostOfSize(MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE), Importance.MEDIUM, RACK_AWARE_ASSIGNMENT_TAGS_DOC) +.define(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, +Type.INT, +null, +Importance.MEDIUM, Review Comment: ```suggestion Importance.MEDIUM, ``` ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -890,12 +914,28 @@ public class StreamsConfig extends AbstractConfig { in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2), Importance.MEDIUM, PROCESSING_GUARANTEE_DOC) +.define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, +Type.INT, +null, +Importance.MEDIUM, +RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_DOC) +.define(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, +Type.STRING, +RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, +in(RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC), +Importance.MEDIUM, +RACK_AWARE_ASSIGNMENT_STRATEGY_DOC) .define(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, Type.LIST, Collections.emptyList(), atMostOfSize(MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE), Importance.MEDIUM, RACK_AWARE_ASSIGNMENT_TAGS_DOC) +.define(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, +Type.INT, +null, Review Comment: ```suggestion null, ``` ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -890,12 +914,28 @@ public class StreamsConfig extends AbstractConfig { in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2), Importance.MEDIUM, PROCESSING_GUARANTEE_DOC) +.define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, +Type.INT, Review Comment: ```suggestion Type.INT, ``` ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -890,12 +914,28 @@ public class StreamsConfig extends AbstractConfig { in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2), Importance.MEDIUM, PROCESSING_GUARANTEE_DOC) +.define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG
[GitHub] [kafka] mjsax commented on a diff in pull request #14150: KAFKA-15022: [6/N] add rack aware assignor configs and update standby optimizer
mjsax commented on code in PR #14150: URL: https://github.com/apache/kafka/pull/14150#discussion_r1284932353 ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -890,6 +914,22 @@ public class StreamsConfig extends AbstractConfig { in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2), Importance.MEDIUM, PROCESSING_GUARANTEE_DOC) +.define(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, +Type.STRING, +RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, +in(RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC), +Importance.MEDIUM, +RACK_AWARE_ASSIGNMENT_STRATEGY_DOC) +.define(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, +Type.INT, +null, Review Comment: Thanks. We will need a docs PR anyway -- can you open one in the next days and include this information? -- Wondering if the `description` in `StreamsConfig` should also mention the different default values for both assignors and explain that `null` means use those defaults? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #14150: KAFKA-15022: [6/N] add rack aware assignor configs and update standby optimizer
mjsax commented on code in PR #14150: URL: https://github.com/apache/kafka/pull/14150#discussion_r1284793025 ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -755,6 +756,29 @@ public class StreamsConfig extends AbstractConfig { public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = "default.client.supplier"; public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier class that implements the org.apache.kafka.streams.KafkaClientSupplier interface."; +public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_NONE = "NONE"; +public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC = "MIN_TRAFFIC"; + +/** {@code } rack.aware.assignment.strategy */ +@SuppressWarnings("WeakerAccess") +public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG = "rack.aware.assignment.strategy"; +public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The strategy we use for rack aware assignment. Rack aware assignment will take client.rack and racks of TopicPartition into account when assigning" ++ " tasks to minimize cross rack traffic. Valid settings are : " + RACK_AWARE_ASSIGNMENT_STRATEGY_NONE + ", which will disable rack aware assignment; " + RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC ++ ", which will compute minimum cross rack traffic assignment"; Review Comment: ```suggestion + ", which will compute minimum cross rack traffic assignment."; ``` ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -755,6 +756,29 @@ public class StreamsConfig extends AbstractConfig { public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = "default.client.supplier"; public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier class that implements the org.apache.kafka.streams.KafkaClientSupplier interface."; +public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_NONE = "NONE"; +public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC = "MIN_TRAFFIC"; + +/** {@code } rack.aware.assignment.strategy */ +@SuppressWarnings("WeakerAccess") +public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG = "rack.aware.assignment.strategy"; +public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The strategy we use for rack aware assignment. Rack aware assignment will take client.rack and racks of TopicPartition into account when assigning" Review Comment: ```suggestion public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The strategy we use for rack aware assignment. Rack aware assignment will take client.rack and racks of TopicPartition into account when assigning" ``` ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -755,6 +756,29 @@ public class StreamsConfig extends AbstractConfig { public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = "default.client.supplier"; public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier class that implements the org.apache.kafka.streams.KafkaClientSupplier interface."; +public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_NONE = "NONE"; +public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC = "MIN_TRAFFIC"; + +/** {@code } rack.aware.assignment.strategy */ +@SuppressWarnings("WeakerAccess") +public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG = "rack.aware.assignment.strategy"; +public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The strategy we use for rack aware assignment. Rack aware assignment will take client.rack and racks of TopicPartition into account when assigning" ++ " tasks to minimize cross rack traffic. Valid settings are : " + RACK_AWARE_ASSIGNMENT_STRATEGY_NONE + ", which will disable rack aware assignment; " + RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC ++ ", which will compute minimum cross rack traffic assignment"; + +@SuppressWarnings("WeakerAccess") +public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG = "rack.aware.assignment.traffic_cost"; +public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC = "Cost associated with cross rack traffic. This config and rack.aware.assignment.non_overlap_cost controls whether the " ++ "optimization algorithm favors minimizing cross rack traffic or minimize the movement of tasks in existing assignment. If set a larger value " + RackAwareTaskAssignor.class.getName() + " will " ++ "optimize minimizing cross rack traffic"; Review Comment: ```suggestion + "optimize for minimizing cross rack traffic."; ``` ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -755,6 +756,29 @@ public class StreamsConfig extends AbstractConfig { public static f