[GitHub] [kafka] mjsax commented on a diff in pull request #14150: KAFKA-15022: [6/N] add rack aware assignor configs and update standby optimizer

2023-08-07 Thread via GitHub


mjsax commented on code in PR #14150:
URL: https://github.com/apache/kafka/pull/14150#discussion_r1286245745


##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -890,12 +914,28 @@ public class StreamsConfig extends AbstractConfig {
 in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, 
EXACTLY_ONCE_V2),
 Importance.MEDIUM,
 PROCESSING_GUARANTEE_DOC)
+.define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG,
+Type.INT,
+null,

Review Comment:
   ```suggestion
   null,
   ```



##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -890,12 +914,28 @@ public class StreamsConfig extends AbstractConfig {
 in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, 
EXACTLY_ONCE_V2),
 Importance.MEDIUM,
 PROCESSING_GUARANTEE_DOC)
+.define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG,
+Type.INT,
+null,
+Importance.MEDIUM,

Review Comment:
   ```suggestion
   Importance.MEDIUM,
   ```



##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -890,12 +914,28 @@ public class StreamsConfig extends AbstractConfig {
 in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, 
EXACTLY_ONCE_V2),
 Importance.MEDIUM,
 PROCESSING_GUARANTEE_DOC)
+.define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG,
+Type.INT,
+null,
+Importance.MEDIUM,
+RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_DOC)
+.define(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG,
+Type.STRING,
+RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
+in(RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, 
RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC),
+Importance.MEDIUM,
+RACK_AWARE_ASSIGNMENT_STRATEGY_DOC)
 .define(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
 Type.LIST,
 Collections.emptyList(),
 atMostOfSize(MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE),
 Importance.MEDIUM,
 RACK_AWARE_ASSIGNMENT_TAGS_DOC)
+.define(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG,
+Type.INT,
+null,
+Importance.MEDIUM,

Review Comment:
   ```suggestion
   Importance.MEDIUM,
   ```



##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -890,12 +914,28 @@ public class StreamsConfig extends AbstractConfig {
 in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, 
EXACTLY_ONCE_V2),
 Importance.MEDIUM,
 PROCESSING_GUARANTEE_DOC)
+.define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG,
+Type.INT,
+null,
+Importance.MEDIUM,
+RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_DOC)
+.define(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG,
+Type.STRING,
+RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
+in(RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, 
RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC),
+Importance.MEDIUM,
+RACK_AWARE_ASSIGNMENT_STRATEGY_DOC)
 .define(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
 Type.LIST,
 Collections.emptyList(),
 atMostOfSize(MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE),
 Importance.MEDIUM,
 RACK_AWARE_ASSIGNMENT_TAGS_DOC)
+.define(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG,
+Type.INT,
+null,

Review Comment:
   ```suggestion
   null,
   ```



##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -890,12 +914,28 @@ public class StreamsConfig extends AbstractConfig {
 in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, 
EXACTLY_ONCE_V2),
 Importance.MEDIUM,
 PROCESSING_GUARANTEE_DOC)
+.define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG,
+Type.INT,

Review Comment:
   ```suggestion
   Type.INT,
   ```



##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -890,12 +914,28 @@ public class StreamsConfig extends AbstractConfig {
 in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, 
EXACTLY_ONCE_V2),
 Importance.MEDIUM,
 PROCESSING_GUARANTEE_DOC)
+.define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG

[GitHub] [kafka] mjsax commented on a diff in pull request #14150: KAFKA-15022: [6/N] add rack aware assignor configs and update standby optimizer

2023-08-04 Thread via GitHub


mjsax commented on code in PR #14150:
URL: https://github.com/apache/kafka/pull/14150#discussion_r1284932353


##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -890,6 +914,22 @@ public class StreamsConfig extends AbstractConfig {
 in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, 
EXACTLY_ONCE_V2),
 Importance.MEDIUM,
 PROCESSING_GUARANTEE_DOC)
+.define(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG,
+Type.STRING,
+RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
+in(RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, 
RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC),
+Importance.MEDIUM,
+RACK_AWARE_ASSIGNMENT_STRATEGY_DOC)
+.define(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG,
+Type.INT,
+null,

Review Comment:
   Thanks. We will need a docs PR anyway -- can you open one in the next days 
and include this information? -- Wondering if the `description` in 
`StreamsConfig` should also mention the different default values for both 
assignors and explain that `null` means use those defaults?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #14150: KAFKA-15022: [6/N] add rack aware assignor configs and update standby optimizer

2023-08-04 Thread via GitHub


mjsax commented on code in PR #14150:
URL: https://github.com/apache/kafka/pull/14150#discussion_r1284793025


##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -755,6 +756,29 @@ public class StreamsConfig extends AbstractConfig {
 public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = 
"default.client.supplier";
 public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier 
class that implements the 
org.apache.kafka.streams.KafkaClientSupplier interface.";
 
+public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_NONE = "NONE";
+public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC = 
"MIN_TRAFFIC";
+
+/** {@code } rack.aware.assignment.strategy */
+@SuppressWarnings("WeakerAccess")
+public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG = 
"rack.aware.assignment.strategy";
+public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The 
strategy we use for rack aware assignment. Rack aware assignment will take 
client.rack and racks of TopicPartition into account when assigning"
++ " tasks to minimize cross rack traffic. Valid settings are : " 
+ RACK_AWARE_ASSIGNMENT_STRATEGY_NONE + ", which will disable rack aware 
assignment; " + RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC
++ ", which will compute minimum cross rack traffic assignment";

Review Comment:
   ```suggestion
   + ", which will compute minimum cross rack traffic 
assignment.";
   ```



##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -755,6 +756,29 @@ public class StreamsConfig extends AbstractConfig {
 public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = 
"default.client.supplier";
 public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier 
class that implements the 
org.apache.kafka.streams.KafkaClientSupplier interface.";
 
+public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_NONE = "NONE";
+public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC = 
"MIN_TRAFFIC";
+
+/** {@code } rack.aware.assignment.strategy */
+@SuppressWarnings("WeakerAccess")
+public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG = 
"rack.aware.assignment.strategy";
+public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The 
strategy we use for rack aware assignment. Rack aware assignment will take 
client.rack and racks of TopicPartition into account when assigning"

Review Comment:
   ```suggestion
   public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The 
strategy we use for rack aware assignment. Rack aware assignment will take 
client.rack and racks of TopicPartition 
into account when assigning"
   ```



##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -755,6 +756,29 @@ public class StreamsConfig extends AbstractConfig {
 public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = 
"default.client.supplier";
 public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier 
class that implements the 
org.apache.kafka.streams.KafkaClientSupplier interface.";
 
+public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_NONE = "NONE";
+public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC = 
"MIN_TRAFFIC";
+
+/** {@code } rack.aware.assignment.strategy */
+@SuppressWarnings("WeakerAccess")
+public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG = 
"rack.aware.assignment.strategy";
+public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The 
strategy we use for rack aware assignment. Rack aware assignment will take 
client.rack and racks of TopicPartition into account when assigning"
++ " tasks to minimize cross rack traffic. Valid settings are : " 
+ RACK_AWARE_ASSIGNMENT_STRATEGY_NONE + ", which will disable rack aware 
assignment; " + RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC
++ ", which will compute minimum cross rack traffic assignment";
+
+@SuppressWarnings("WeakerAccess")
+public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG = 
"rack.aware.assignment.traffic_cost";
+public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC = "Cost 
associated with cross rack traffic. This config and 
rack.aware.assignment.non_overlap_cost controls whether the "
++ "optimization algorithm favors minimizing cross rack traffic or 
minimize the movement of tasks in existing assignment. If set a larger value " 
+ RackAwareTaskAssignor.class.getName() + " will "
++ "optimize minimizing cross rack traffic";

Review Comment:
   ```suggestion
   + "optimize for minimizing cross rack traffic.";
   ```



##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -755,6 +756,29 @@ public class StreamsConfig extends AbstractConfig {
 public static f