Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-06-05 Thread via GitHub


ableegoldman commented on PR #16147:
URL: https://github.com/apache/kafka/pull/16147#issuecomment-2151097972

   Cherrypicked to 3.8


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-06-04 Thread via GitHub


ableegoldman merged PR #16147:
URL: https://github.com/apache/kafka/pull/16147


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-06-04 Thread via GitHub


ableegoldman commented on PR #16147:
URL: https://github.com/apache/kafka/pull/16147#issuecomment-2148162418

   Test failures are unrelated. Merging to trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-06-03 Thread via GitHub


ableegoldman commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1625023687


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -527,35 +531,44 @@ private ApplicationState buildApplicationState(final 
TopologyMetadata topologyMe
   + "tasks for source topics vs 
changelog topics.");
 }
 
-final Set logicalTaskIds = 
unmodifiableSet(sourcePartitionsForTask.keySet());
-final Set allTopicPartitions = new 
HashSet<>();
+final Set topicsRequiringRackInfo = new 
HashSet<>();
+final AtomicBoolean rackInformationFetched = new AtomicBoolean(false);
+final Runnable fetchRackInformation = () -> {
+if (!rackInformationFetched.get()) {
+RackUtils.annotateTopicPartitionsWithRackInfo(cluster,

Review Comment:
   very small nit (tack onto any followup PR): weird line break, either keep 
everything on one line or move the `cluster` variable to the 2nd line with the 
other params



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -527,35 +531,44 @@ private ApplicationState buildApplicationState(final 
TopologyMetadata topologyMe
   + "tasks for source topics vs 
changelog topics.");
 }
 
-final Set logicalTaskIds = 
unmodifiableSet(sourcePartitionsForTask.keySet());
-final Set allTopicPartitions = new 
HashSet<>();
+final Set topicsRequiringRackInfo = new 
HashSet<>();
+final AtomicBoolean rackInformationFetched = new AtomicBoolean(false);
+final Runnable fetchRackInformation = () -> {
+if (!rackInformationFetched.get()) {
+RackUtils.annotateTopicPartitionsWithRackInfo(cluster,
+internalTopicManager, topicsRequiringRackInfo);
+rackInformationFetched.set(true);
+}
+};
+
 final Map> topicPartitionsForTask = 
new HashMap<>();
+final Set logicalTaskIds = 
unmodifiableSet(sourcePartitionsForTask.keySet());
 logicalTaskIds.forEach(taskId -> {
 final Set topicPartitions = new HashSet<>();
 
 for (final TopicPartition topicPartition : 
sourcePartitionsForTask.get(taskId)) {
 final boolean isSource = true;
 final boolean isChangelog = 
changelogPartitionsForTask.get(taskId).contains(topicPartition);
 final DefaultTaskTopicPartition racklessTopicPartition = new 
DefaultTaskTopicPartition(
-topicPartition, isSource, isChangelog, null);
-allTopicPartitions.add(racklessTopicPartition);
+topicPartition, isSource, isChangelog, 
fetchRackInformation);
+topicsRequiringRackInfo.add(racklessTopicPartition);
 topicPartitions.add(racklessTopicPartition);
 }
 
 for (final TopicPartition topicPartition : 
changelogPartitionsForTask.get(taskId)) {
 final boolean isSource = 
sourcePartitionsForTask.get(taskId).contains(topicPartition);
 final boolean isChangelog = true;
 final DefaultTaskTopicPartition racklessTopicPartition = new 
DefaultTaskTopicPartition(
-topicPartition, isSource, isChangelog, null);
-allTopicPartitions.add(racklessTopicPartition);
+topicPartition, isSource, isChangelog, 
fetchRackInformation);
+if (publicAssignmentConfigs.numStandbyReplicas() > 0) {

Review Comment:
   Note that active tasks will also read from changelog topics (though only 
during the restore phase), so we should be adding changelogs to the 
`topicsRequiringRackInfo` set even if there are no standbys configured
   
   Again you can tack this onto PR #17 or whatever PR is next



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -573,14 +586,24 @@ private ApplicationState buildApplicationState(final 
TopologyMetadata topologyMe
 ));
 
 return new DefaultApplicationState(
-assignmentConfigs.toPublicAssignmentConfigs(),
+publicAssignmentConfigs,
 logicalTasks,
 clientMetadataMap
 );
 }
 
-private static void processStreamsPartitionAssignment(final Map clientMetadataMap,
-  final TaskAssignment 
taskAssignment) {
+private void processStreamsPartitionAssignment(final 
org.apache.kafka.streams.processor.assignment.TaskAssignor assignor,
+   final TaskAssignment 
taskAssignment,
+   final AssignmentError 
assignmentError,
+  

Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-06-03 Thread via GitHub


apourchet commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1624682684


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##
@@ -245,55 +251,46 @@ private AssignmentState(final ApplicationState 
applicationState,
 final int maxPairs = taskCount * (taskCount - 1) / 2;
 this.taskPairs = new TaskPairs(maxPairs);
 
-this.newTaskLocations = new HashMap<>();
-this.newAssignments = new HashMap<>();
+this.newTaskLocations = previousActiveAssignment.keySet().stream()
+.collect(Collectors.toMap(Function.identity(), taskId -> new 
HashSet<>()));
+this.newAssignments = 
clients.values().stream().collect(Collectors.toMap(
+KafkaStreamsState::processId,
+state -> KafkaStreamsAssignment.of(state.processId(), new 
HashSet<>())
+));
 }
 
 public void finalizeAssignment(final TaskId taskId, final ProcessId 
client, final AssignedTask.Type type) {
-newAssignments.computeIfAbsent(client, k -> new HashSet<>());
-newTaskLocations.computeIfAbsent(taskId, k -> new HashSet<>());
-
-final Set newAssignmentsForClient = 
newAssignments.get(client)
-.stream().map(AssignedTask::id).collect(Collectors.toSet());
-
+final Set newAssignmentsForClient = 
newAssignments.get(client).tasks().keySet();
 taskPairs.addPairs(taskId, newAssignmentsForClient);
-newAssignments.get(client).add(new AssignedTask(taskId, type));
-newTaskLocations.get(taskId).add(client);
+
+newAssignments.get(client).assignTask(new AssignedTask(taskId, 
type));
+newTaskLocations.computeIfAbsent(taskId, k -> new 
HashSet<>()).add(client);
 }
 
-public Map 
buildKafkaStreamsAssignments() {
-final Map 
kafkaStreamsAssignments = new HashMap<>();
-for (final Map.Entry> entry : 
newAssignments.entrySet()) {
-final ProcessId processId = entry.getKey();
-final Set assignedTasks = 
newAssignments.get(processId);
-final KafkaStreamsAssignment assignment = 
KafkaStreamsAssignment.of(processId, assignedTasks);
-kafkaStreamsAssignments.put(processId, assignment);
-}
-return kafkaStreamsAssignments;
+public Map newAssignments() {
+return newAssignments;
 }
 
 public void processOptimizedAssignments(final Map optimizedAssignments) {

Review Comment:
   > now that `#processOptimizedAssignments` is only updating the 
`newTaskLocations`, can we simplify things further by just keeping the 
`newTaskLocations` map up to date as we move tasks around? That way we can get 
rid of `processOptimizedAssignments` altogether
   
   Since the task assignments are mutated rather than "moved" around, and that 
mutation happens frequently outside of the context of this class (by 
`TaskAssignmentUtils` for instance), we can't easily make sure both variables 
change together. We would need this `processOptimizedAssignments` call to 
happen regardless.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-06-03 Thread via GitHub


apourchet commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1624657249


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##
@@ -245,55 +251,46 @@ private AssignmentState(final ApplicationState 
applicationState,
 final int maxPairs = taskCount * (taskCount - 1) / 2;
 this.taskPairs = new TaskPairs(maxPairs);
 
-this.newTaskLocations = new HashMap<>();
-this.newAssignments = new HashMap<>();
+this.newTaskLocations = previousActiveAssignment.keySet().stream()
+.collect(Collectors.toMap(Function.identity(), taskId -> new 
HashSet<>()));
+this.newAssignments = 
clients.values().stream().collect(Collectors.toMap(
+KafkaStreamsState::processId,
+state -> KafkaStreamsAssignment.of(state.processId(), new 
HashSet<>())
+));
 }
 
 public void finalizeAssignment(final TaskId taskId, final ProcessId 
client, final AssignedTask.Type type) {
-newAssignments.computeIfAbsent(client, k -> new HashSet<>());
-newTaskLocations.computeIfAbsent(taskId, k -> new HashSet<>());
-
-final Set newAssignmentsForClient = 
newAssignments.get(client)
-.stream().map(AssignedTask::id).collect(Collectors.toSet());
-
+final Set newAssignmentsForClient = 
newAssignments.get(client).tasks().keySet();
 taskPairs.addPairs(taskId, newAssignmentsForClient);
-newAssignments.get(client).add(new AssignedTask(taskId, type));
-newTaskLocations.get(taskId).add(client);
+
+newAssignments.get(client).assignTask(new AssignedTask(taskId, 
type));
+newTaskLocations.computeIfAbsent(taskId, k -> new 
HashSet<>()).add(client);
 }
 
-public Map 
buildKafkaStreamsAssignments() {
-final Map 
kafkaStreamsAssignments = new HashMap<>();
-for (final Map.Entry> entry : 
newAssignments.entrySet()) {
-final ProcessId processId = entry.getKey();
-final Set assignedTasks = 
newAssignments.get(processId);
-final KafkaStreamsAssignment assignment = 
KafkaStreamsAssignment.of(processId, assignedTasks);
-kafkaStreamsAssignments.put(processId, assignment);
-}
-return kafkaStreamsAssignments;
+public Map newAssignments() {
+return newAssignments;
 }
 
 public void processOptimizedAssignments(final Map optimizedAssignments) {

Review Comment:
   This would involve changing `TaskAssignmentUtils`. Currently it's not 
obvious that those methods mutate the input (and until recently, it wasn't). 
The public methods all return `Map` so I 
suggest we change those to void and assume mutation everywhere uphill of those 
calls.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-06-03 Thread via GitHub


apourchet commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1624647021


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##
@@ -331,7 +328,7 @@ public Set findClientsWithoutAssignedTask(final 
TaskId taskId) {
 
 public double clientLoad(final ProcessId processId) {
 final int capacity = clients.get(processId).numProcessingThreads();
-final double totalTaskCount = 
newAssignments.getOrDefault(processId, new HashSet<>()).size();
+final double totalTaskCount = 
newAssignments.getOrDefault(processId, KafkaStreamsAssignment.of(processId, new 
HashSet<>())).tasks().size();

Review Comment:
   Yep, I'm already doing the initialization as you suggested, but never got 
rid of the `getOrDefault`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-06-03 Thread via GitHub


apourchet commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1624625757


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##
@@ -29,20 +32,47 @@ public class AssignmentConfigs {
 private final int numStandbyReplicas;
 private final long probingRebalanceIntervalMs;
 private final List rackAwareAssignmentTags;
-private final int rackAwareTrafficCost;
-private final int rackAwareNonOverlapCost;
+private final OptionalInt rackAwareTrafficCost;
+private final OptionalInt rackAwareNonOverlapCost;
 private final String rackAwareAssignmentStrategy;
 
-public AssignmentConfigs(final StreamsConfig configs) {
-this(
-configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG),
-configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG),
-configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG),
-
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG),
-configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG),
-
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG),
-
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG),
-
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG)
+public static AssignmentConfigs of(final StreamsConfig configs) {
+final long acceptableRecoveryLag = 
configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG);
+final int maxWarmupReplicas = 
configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG);
+final int numStandbyReplicas = 
configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+final long probingRebalanceIntervalMs = 
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG);
+final List rackAwareAssignmentTags = 
configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG);
+final String rackAwareAssignmentStrategy = 
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG);
+Integer rackAwareTrafficCost = 
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG);
+Integer rackAwareNonOverlapCost = 
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG);
+
+final String assignorClassName = 
configs.getString(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG);
+if (StickyTaskAssignor.class.getName().equals(assignorClassName)) {
+if (rackAwareTrafficCost == null) {
+rackAwareTrafficCost = 
StickyTaskAssignor.DEFAULT_STICKY_TRAFFIC_COST;
+}
+if (rackAwareNonOverlapCost == null) {
+rackAwareNonOverlapCost = 
StickyTaskAssignor.DEFAULT_STICKY_NON_OVERLAP_COST;
+}
+} else if 
(HighAvailabilityTaskAssignor.class.getName().equals(assignorClassName)) {
+// TODO KAFKA-16869: replace with the HighAvailabilityTaskAssignor 
class once it implements the new TaskAssignor interface
+if (rackAwareTrafficCost == null) {
+rackAwareTrafficCost = 
HighAvailabilityTaskAssignor.DEFAULT_STATEFUL_TRAFFIC_COST;
+}
+if (rackAwareNonOverlapCost == null) {
+rackAwareNonOverlapCost = 
HighAvailabilityTaskAssignor.DEFAULT_STATEFUL_NON_OVERLAP_COST;
+}
+}
+
+return new AssignmentConfigs(
+acceptableRecoveryLag,
+maxWarmupReplicas,
+numStandbyReplicas,
+probingRebalanceIntervalMs,
+rackAwareAssignmentTags,
+OptionalInt.of(rackAwareTrafficCost),
+OptionalInt.of(rackAwareNonOverlapCost),

Review Comment:
   The only way to do this is with if/else, OptionalInt's API surface is pretty 
tragically lackluster.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-05-31 Thread via GitHub


ableegoldman commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1623081838


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##
@@ -245,55 +251,46 @@ private AssignmentState(final ApplicationState 
applicationState,
 final int maxPairs = taskCount * (taskCount - 1) / 2;
 this.taskPairs = new TaskPairs(maxPairs);
 
-this.newTaskLocations = new HashMap<>();
-this.newAssignments = new HashMap<>();
+this.newTaskLocations = previousActiveAssignment.keySet().stream()
+.collect(Collectors.toMap(Function.identity(), taskId -> new 
HashSet<>()));
+this.newAssignments = 
clients.values().stream().collect(Collectors.toMap(
+KafkaStreamsState::processId,
+state -> KafkaStreamsAssignment.of(state.processId(), new 
HashSet<>())
+));
 }
 
 public void finalizeAssignment(final TaskId taskId, final ProcessId 
client, final AssignedTask.Type type) {

Review Comment:
   Overlooked this earlier I guess, these should all be private methods right? 
Pretty much everything other than #assign ?



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##
@@ -245,55 +251,46 @@ private AssignmentState(final ApplicationState 
applicationState,
 final int maxPairs = taskCount * (taskCount - 1) / 2;
 this.taskPairs = new TaskPairs(maxPairs);
 
-this.newTaskLocations = new HashMap<>();
-this.newAssignments = new HashMap<>();
+this.newTaskLocations = previousActiveAssignment.keySet().stream()
+.collect(Collectors.toMap(Function.identity(), taskId -> new 
HashSet<>()));
+this.newAssignments = 
clients.values().stream().collect(Collectors.toMap(
+KafkaStreamsState::processId,
+state -> KafkaStreamsAssignment.of(state.processId(), new 
HashSet<>())
+));
 }
 
 public void finalizeAssignment(final TaskId taskId, final ProcessId 
client, final AssignedTask.Type type) {
-newAssignments.computeIfAbsent(client, k -> new HashSet<>());
-newTaskLocations.computeIfAbsent(taskId, k -> new HashSet<>());
-
-final Set newAssignmentsForClient = 
newAssignments.get(client)
-.stream().map(AssignedTask::id).collect(Collectors.toSet());
-
+final Set newAssignmentsForClient = 
newAssignments.get(client).tasks().keySet();
 taskPairs.addPairs(taskId, newAssignmentsForClient);
-newAssignments.get(client).add(new AssignedTask(taskId, type));
-newTaskLocations.get(taskId).add(client);
+
+newAssignments.get(client).assignTask(new AssignedTask(taskId, 
type));
+newTaskLocations.computeIfAbsent(taskId, k -> new 
HashSet<>()).add(client);
 }
 
-public Map 
buildKafkaStreamsAssignments() {
-final Map 
kafkaStreamsAssignments = new HashMap<>();
-for (final Map.Entry> entry : 
newAssignments.entrySet()) {
-final ProcessId processId = entry.getKey();
-final Set assignedTasks = 
newAssignments.get(processId);
-final KafkaStreamsAssignment assignment = 
KafkaStreamsAssignment.of(processId, assignedTasks);
-kafkaStreamsAssignments.put(processId, assignment);
-}
-return kafkaStreamsAssignments;
+public Map newAssignments() {
+return newAssignments;
 }
 
 public void processOptimizedAssignments(final Map optimizedAssignments) {

Review Comment:
   now that `#processOptimizedAssignments` is only updating the 
`newTaskLocations`, can we simplify things further by just keeping the 
`newTaskLocations` map up to date as we move tasks around? That way we can get 
rid of `processOptimizedAssignments` altogether



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##
@@ -245,55 +251,46 @@ private AssignmentState(final ApplicationState 
applicationState,
 final int maxPairs = taskCount * (taskCount - 1) / 2;
 this.taskPairs = new TaskPairs(maxPairs);
 
-this.newTaskLocations = new HashMap<>();
-this.newAssignments = new HashMap<>();
+this.newTaskLocations = previousActiveAssignment.keySet().stream()
+.collect(Collectors.toMap(Function.identity(), taskId -> new 
HashSet<>()));
+this.newAssignments = 
clients.values().stream().collect(Collectors.toMap(
+KafkaStreamsState::processId,
+state -> KafkaStreamsAssignment.of(state.processId(), new 
HashSet<>())
+));
 }
 
 public void finalizeAssignment(final TaskId taskId, fin

Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-05-31 Thread via GitHub


ableegoldman commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1623079947


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##
@@ -173,6 +178,7 @@ private static void assignStandby(final ApplicationState 
applicationState,
   final AssignmentState assignmentState) {
 final Set statefulTasks = 
applicationState.allTasks().values().stream()
 .filter(TaskInfo::isStateful)

Review Comment:
   you can remove this line actually, the changelogs check is sufficient (not 
to mention I just realized this filter is probably broken at the moment anyways 
since we still need to fix the `stateStoreNames` thing)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-05-31 Thread via GitHub


ableegoldman commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1623078889


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##
@@ -40,13 +41,17 @@
 import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils;
 import org.apache.kafka.streams.processor.assignment.TaskAssignor;
 import org.apache.kafka.streams.processor.assignment.TaskInfo;
+import org.apache.kafka.streams.processor.assignment.TaskTopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 public class StickyTaskAssignor implements TaskAssignor {
 private static final Logger LOG = 
LoggerFactory.getLogger(StickyTaskAssignor.class);
 
+public static final int DEFAULT_STATEFUL_TRAFFIC_COST = 1;
+public static final int DEFAULT_STATEFUL_NON_OVERLAP_COST = 10;

Review Comment:
   so weird. why did they name it "stateful"? both of these assignors, and 
AFAICT the configs themselves, pertain to both stateless and stateful tasks... 
🤷‍♀️ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-05-31 Thread via GitHub


ableegoldman commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1623064697


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java:
##
@@ -263,6 +267,8 @@ public 
Optional user
 final org.apache.kafka.streams.processor.assignment.TaskAssignor 
assignor = Utils.newInstance(userTaskAssignorClassname,
 
org.apache.kafka.streams.processor.assignment.TaskAssignor.class);
 log.info("Instantiated {} as the task assignor.", 
userTaskAssignorClassname);
+assignor.configure(streamsConfig.originals());
+log.info("Configured task assignor {} with the StreamsConfig.", 
userTaskAssignorClassname);

Review Comment:
   I think we only need to log something once about the assignor, the first one 
is good enough so I'd just remove this



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java:
##
@@ -45,8 +45,8 @@
 
 public class HighAvailabilityTaskAssignor implements TaskAssignor {
 private static final Logger log = 
LoggerFactory.getLogger(HighAvailabilityTaskAssignor.class);
-private static final int DEFAULT_STATEFUL_TRAFFIC_COST = 10;
-private static final int DEFAULT_STATEFUL_NON_OVERLAP_COST = 1;
+public static final int DEFAULT_STATEFUL_TRAFFIC_COST = 10;

Review Comment:
   ditto here: let's use a different variable name for the different classes, 
too confusing if we're mixing them up by calling them outside of this class 
now. So I guess `DEFAULT_HIGH_AVAILABILITY_XXX_COST` (or maybe just 
`DEFAULT_HA_XXX_COST`?)



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -1492,8 +1521,7 @@ public void onAssignment(final Assignment assignment, 
final ConsumerGroupMetadat
 topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost);
 encodedNextScheduledRebalanceMs = Long.MAX_VALUE;
 break;
-case 6:
-validateActiveTaskEncoding(partitions, info, logPrefix);
+case 6: validateActiveTaskEncoding(partitions, info, logPrefix);

Review Comment:
   accidental formatting change?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -790,8 +813,14 @@ private UserTaskAssignmentListener 
assignTasksToClients(final Cluster fullMetada
 final org.apache.kafka.streams.processor.assignment.TaskAssignor 
assignor = userTaskAssignor.get();
 final TaskAssignment taskAssignment = 
assignor.assign(applicationState);
 final AssignmentError assignmentError = 
validateTaskAssignment(applicationState, taskAssignment);
-processStreamsPartitionAssignment(clientMetadataMap, 
taskAssignment);
-userTaskAssignmentListener = (assignment, subscription) -> 
assignor.onAssignmentComputed(assignment, subscription, assignmentError);
+processStreamsPartitionAssignment(assignor, taskAssignment, 
assignmentError, clientMetadataMap, groupSubscription);
+userTaskAssignmentListener = (assignment, subscription) -> {
+assignor.onAssignmentComputed(assignment, subscription, 
assignmentError);
+if (assignmentError != AssignmentError.NONE) {
+throw new StreamsException("Task assignment with " + 
assignor.getClass() +

Review Comment:
   ditto here: log an error before throwing and change assignor.getClass() to 
assignor.getClass().getName()



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -573,14 +586,23 @@ private ApplicationState buildApplicationState(final 
TopologyMetadata topologyMe
 ));
 
 return new DefaultApplicationState(
-assignmentConfigs.toPublicAssignmentConfigs(),
+publicAssignmentConfigs,
 logicalTasks,
 clientMetadataMap
 );
 }
 
-private static void processStreamsPartitionAssignment(final Map clientMetadataMap,
-  final TaskAssignment 
taskAssignment) {
+private static void processStreamsPartitionAssignment(final 
org.apache.kafka.streams.processor.assignment.TaskAssignor assignor,
+  final TaskAssignment 
taskAssignment,
+  final 
AssignmentError assignmentError,
+  final Map clientMetadataMap,
+  final 
GroupSubscription groupSubscription) {
+if (assignmentError == AssignmentError.UNKNOWN_PROCESS_ID || 
assignmentError == AssignmentError.UNKNOWN_TASK_ID) {
+assignor.onA

Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-05-31 Thread via GitHub


ableegoldman commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1623063757


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##
@@ -29,20 +32,47 @@ public class AssignmentConfigs {
 private final int numStandbyReplicas;
 private final long probingRebalanceIntervalMs;
 private final List rackAwareAssignmentTags;
-private final int rackAwareTrafficCost;
-private final int rackAwareNonOverlapCost;
+private final OptionalInt rackAwareTrafficCost;
+private final OptionalInt rackAwareNonOverlapCost;
 private final String rackAwareAssignmentStrategy;
 
-public AssignmentConfigs(final StreamsConfig configs) {
-this(
-configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG),
-configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG),
-configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG),
-
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG),
-configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG),
-
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG),
-
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG),
-
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG)
+public static AssignmentConfigs of(final StreamsConfig configs) {
+final long acceptableRecoveryLag = 
configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG);
+final int maxWarmupReplicas = 
configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG);
+final int numStandbyReplicas = 
configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+final long probingRebalanceIntervalMs = 
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG);
+final List rackAwareAssignmentTags = 
configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG);
+final String rackAwareAssignmentStrategy = 
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG);
+Integer rackAwareTrafficCost = 
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG);
+Integer rackAwareNonOverlapCost = 
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG);
+
+final String assignorClassName = 
configs.getString(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG);
+if (StickyTaskAssignor.class.getName().equals(assignorClassName)) {
+if (rackAwareTrafficCost == null) {
+rackAwareTrafficCost = 
StickyTaskAssignor.DEFAULT_STICKY_TRAFFIC_COST;
+}
+if (rackAwareNonOverlapCost == null) {
+rackAwareNonOverlapCost = 
StickyTaskAssignor.DEFAULT_STICKY_NON_OVERLAP_COST;
+}
+} else if 
(HighAvailabilityTaskAssignor.class.getName().equals(assignorClassName)) {
+// TODO KAFKA-16869: replace with the HighAvailabilityTaskAssignor 
class once it implements the new TaskAssignor interface
+if (rackAwareTrafficCost == null) {
+rackAwareTrafficCost = 
HighAvailabilityTaskAssignor.DEFAULT_STATEFUL_TRAFFIC_COST;
+}
+if (rackAwareNonOverlapCost == null) {
+rackAwareNonOverlapCost = 
HighAvailabilityTaskAssignor.DEFAULT_STATEFUL_NON_OVERLAP_COST;
+}
+}
+
+return new AssignmentConfigs(
+acceptableRecoveryLag,
+maxWarmupReplicas,
+numStandbyReplicas,
+probingRebalanceIntervalMs,
+rackAwareAssignmentTags,
+OptionalInt.of(rackAwareTrafficCost),
+OptionalInt.of(rackAwareNonOverlapCost),

Review Comment:
   Does this need to be `ofNullable`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-05-31 Thread via GitHub


apourchet commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1623054265


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##
@@ -29,32 +33,59 @@ public class AssignmentConfigs {
 private final int numStandbyReplicas;
 private final long probingRebalanceIntervalMs;
 private final List rackAwareAssignmentTags;
-private final int rackAwareTrafficCost;
-private final int rackAwareNonOverlapCost;
+private final OptionalInt rackAwareTrafficCost;
+private final OptionalInt rackAwareNonOverlapCost;
 private final String rackAwareAssignmentStrategy;
 
-public AssignmentConfigs(final StreamsConfig configs) {
-this(
-configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG),
-configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG),
-configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG),
-
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG),
-configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG),
-
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG),
-
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG),
-
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG)
+public static AssignmentConfigs of(final StreamsConfig configs) {
+final long acceptableRecoveryLag = 
configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG);
+final int maxWarmupReplicas = 
configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG);
+final int numStandbyReplicas = 
configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+final long probingRebalanceIntervalMs = 
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG);
+final List rackAwareAssignmentTags = 
configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG);
+final String rackAwareAssignmentStrategy = 
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG);
+Optional rackAwareTrafficCost = 
Optional.ofNullable(configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG));

Review Comment:
   We need to map from Integer anyway, because otherwise the 
`OptionalInt.of(int)` construction will NPE.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-05-31 Thread via GitHub


apourchet commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1623057182


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##
@@ -40,13 +41,17 @@
 import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils;
 import org.apache.kafka.streams.processor.assignment.TaskAssignor;
 import org.apache.kafka.streams.processor.assignment.TaskInfo;
+import org.apache.kafka.streams.processor.assignment.TaskTopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 public class StickyTaskAssignor implements TaskAssignor {
 private static final Logger LOG = 
LoggerFactory.getLogger(StickyTaskAssignor.class);
 
+public static final int DEFAULT_STATEFUL_TRAFFIC_COST = 1;
+public static final int DEFAULT_STATEFUL_NON_OVERLAP_COST = 10;

Review Comment:
   I just copied these from the original internal StickyTaskAssignor, for the 
sake of consistency. I'll change them to `DEFAULT_STICKY_*` though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-05-31 Thread via GitHub


apourchet commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1623054265


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##
@@ -29,32 +33,59 @@ public class AssignmentConfigs {
 private final int numStandbyReplicas;
 private final long probingRebalanceIntervalMs;
 private final List rackAwareAssignmentTags;
-private final int rackAwareTrafficCost;
-private final int rackAwareNonOverlapCost;
+private final OptionalInt rackAwareTrafficCost;
+private final OptionalInt rackAwareNonOverlapCost;
 private final String rackAwareAssignmentStrategy;
 
-public AssignmentConfigs(final StreamsConfig configs) {
-this(
-configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG),
-configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG),
-configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG),
-
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG),
-configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG),
-
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG),
-
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG),
-
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG)
+public static AssignmentConfigs of(final StreamsConfig configs) {
+final long acceptableRecoveryLag = 
configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG);
+final int maxWarmupReplicas = 
configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG);
+final int numStandbyReplicas = 
configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+final long probingRebalanceIntervalMs = 
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG);
+final List rackAwareAssignmentTags = 
configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG);
+final String rackAwareAssignmentStrategy = 
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG);
+Optional rackAwareTrafficCost = 
Optional.ofNullable(configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG));

Review Comment:
   We need to map from Integer anyway, because otherwise the 
`OptionalInt.of(int)` construction will NPE.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-05-31 Thread via GitHub


ableegoldman commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1623047318


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##
@@ -29,32 +33,59 @@ public class AssignmentConfigs {
 private final int numStandbyReplicas;
 private final long probingRebalanceIntervalMs;
 private final List rackAwareAssignmentTags;
-private final int rackAwareTrafficCost;
-private final int rackAwareNonOverlapCost;
+private final OptionalInt rackAwareTrafficCost;
+private final OptionalInt rackAwareNonOverlapCost;
 private final String rackAwareAssignmentStrategy;
 
-public AssignmentConfigs(final StreamsConfig configs) {
-this(
-configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG),
-configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG),
-configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG),
-
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG),
-configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG),
-
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG),
-
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG),
-
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG)
+public static AssignmentConfigs of(final StreamsConfig configs) {
+final long acceptableRecoveryLag = 
configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG);
+final int maxWarmupReplicas = 
configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG);
+final int numStandbyReplicas = 
configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+final long probingRebalanceIntervalMs = 
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG);
+final List rackAwareAssignmentTags = 
configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG);
+final String rackAwareAssignmentStrategy = 
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG);
+Optional rackAwareTrafficCost = 
Optional.ofNullable(configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG));
+Optional rackAwareNonOverlapCost = 
Optional.ofNullable(configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG));
+
+final String assignorClassName = 
configs.getString(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG);
+if (StickyTaskAssignor.class.getName().equals(assignorClassName)) {
+if (!rackAwareTrafficCost.isPresent()) {
+rackAwareTrafficCost = 
Optional.of(StickyTaskAssignor.DEFAULT_STATEFUL_TRAFFIC_COST);
+}
+if (!rackAwareNonOverlapCost.isPresent()) {
+rackAwareNonOverlapCost = 
Optional.of(StickyTaskAssignor.DEFAULT_STATEFUL_NON_OVERLAP_COST);
+}
+}
+

Review Comment:
   nit: should be an else-if rather than two separate if statements



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##
@@ -29,32 +33,59 @@ public class AssignmentConfigs {
 private final int numStandbyReplicas;
 private final long probingRebalanceIntervalMs;
 private final List rackAwareAssignmentTags;
-private final int rackAwareTrafficCost;
-private final int rackAwareNonOverlapCost;
+private final OptionalInt rackAwareTrafficCost;
+private final OptionalInt rackAwareNonOverlapCost;
 private final String rackAwareAssignmentStrategy;
 
-public AssignmentConfigs(final StreamsConfig configs) {
-this(
-configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG),
-configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG),
-configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG),
-
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG),
-configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG),
-
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG),
-
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG),
-
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG)
+public static AssignmentConfigs of(final StreamsConfig configs) {
+final long acceptableRecoveryLag = 
configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG);
+final int maxWarmupReplicas = 
configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG);
+final int numStandbyReplicas = 
configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+final long probingRebalanceIntervalMs = 
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG);
+final List rackAwareAssignmentTags = 
configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG);
+final String rackAwareAssign

Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-05-31 Thread via GitHub


apourchet commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1622591542


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##
@@ -481,10 +479,24 @@ private static int getCrossRackTrafficCost(final 
Set topicPa
  */
 private static boolean canPerformRackAwareOptimization(final 
ApplicationState applicationState,
final 
AssignedTask.Type taskType) {
-final String rackAwareAssignmentStrategy = 
applicationState.assignmentConfigs().rackAwareAssignmentStrategy();
+final AssignmentConfigs assignmentConfigs = 
applicationState.assignmentConfigs();
+final String rackAwareAssignmentStrategy = 
assignmentConfigs.rackAwareAssignmentStrategy();
 if 
(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(rackAwareAssignmentStrategy))
 {
+LOG.warn("Rack aware task assignment optimization disabled: rack 
aware strategy was set to {}",
+rackAwareAssignmentStrategy);
+return false;
+}
+
+if (!assignmentConfigs.rackAwareTrafficCost().isPresent()) {

Review Comment:
   `isEmpty` is JDK 11 unfortunately:
   
https://docs.oracle.com/en%2Fjava%2Fjavase%2F11%2Fdocs%2Fapi%2F%2F/java.base/java/util/Optional.html#isEmpty()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-05-31 Thread via GitHub


apourchet commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1622591542


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##
@@ -481,10 +479,24 @@ private static int getCrossRackTrafficCost(final 
Set topicPa
  */
 private static boolean canPerformRackAwareOptimization(final 
ApplicationState applicationState,
final 
AssignedTask.Type taskType) {
-final String rackAwareAssignmentStrategy = 
applicationState.assignmentConfigs().rackAwareAssignmentStrategy();
+final AssignmentConfigs assignmentConfigs = 
applicationState.assignmentConfigs();
+final String rackAwareAssignmentStrategy = 
assignmentConfigs.rackAwareAssignmentStrategy();
 if 
(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(rackAwareAssignmentStrategy))
 {
+LOG.warn("Rack aware task assignment optimization disabled: rack 
aware strategy was set to {}",
+rackAwareAssignmentStrategy);
+return false;
+}
+
+if (!assignmentConfigs.rackAwareTrafficCost().isPresent()) {

Review Comment:
   isEmpty is JDK 11 unfortunately



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-05-30 Thread via GitHub


ableegoldman commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1621532246


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##
@@ -481,10 +479,24 @@ private static int getCrossRackTrafficCost(final 
Set topicPa
  */
 private static boolean canPerformRackAwareOptimization(final 
ApplicationState applicationState,
final 
AssignedTask.Type taskType) {
-final String rackAwareAssignmentStrategy = 
applicationState.assignmentConfigs().rackAwareAssignmentStrategy();
+final AssignmentConfigs assignmentConfigs = 
applicationState.assignmentConfigs();
+final String rackAwareAssignmentStrategy = 
assignmentConfigs.rackAwareAssignmentStrategy();
 if 
(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(rackAwareAssignmentStrategy))
 {
+LOG.warn("Rack aware task assignment optimization disabled: rack 
aware strategy was set to {}",
+rackAwareAssignmentStrategy);
+return false;
+}
+
+if (!assignmentConfigs.rackAwareTrafficCost().isPresent()) {
+LOG.warn("Rack aware task assignment optimization unavailable: the 
traffic cost configuration was not set.");

Review Comment:
   We should log the exact config name since otherwise people won't necessarily 
know what this is referring to (especially since they already forgot to set 
this config). 
   
   ```suggestion
   LOG.warn("Rack aware task assignment optimization unavailable: 
must configure {}", StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG);
   ```



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##
@@ -481,10 +479,24 @@ private static int getCrossRackTrafficCost(final 
Set topicPa
  */
 private static boolean canPerformRackAwareOptimization(final 
ApplicationState applicationState,
final 
AssignedTask.Type taskType) {
-final String rackAwareAssignmentStrategy = 
applicationState.assignmentConfigs().rackAwareAssignmentStrategy();
+final AssignmentConfigs assignmentConfigs = 
applicationState.assignmentConfigs();
+final String rackAwareAssignmentStrategy = 
assignmentConfigs.rackAwareAssignmentStrategy();
 if 
(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(rackAwareAssignmentStrategy))
 {
+LOG.warn("Rack aware task assignment optimization disabled: rack 
aware strategy was set to {}",
+rackAwareAssignmentStrategy);
+return false;
+}
+
+if (!assignmentConfigs.rackAwareTrafficCost().isPresent()) {
+LOG.warn("Rack aware task assignment optimization unavailable: the 
traffic cost configuration was not set.");
 return false;
 }
+
+if (!assignmentConfigs.rackAwareNonOverlapCost().isPresent()) {
+LOG.warn("Rack aware task assignment optimization unavailable: the 
non-overlap cost configuration was not set.");

Review Comment:
   ```suggestion
   LOG.warn("Rack aware task assignment optimization unavailable: 
must configure {}", 
StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG);
   ```



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##
@@ -40,8 +41,8 @@ public AssignmentConfigs(final StreamsConfig configs) {
 configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG),
 
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG),
 configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG),
-
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG),
-
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG),
+
Optional.ofNullable(configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG)),
+
Optional.ofNullable(configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG)),

Review Comment:
   don't we need to check `if 
(assignorClassName.equals("org.apache.kafka.streams.processor.assignment.assignors.StickyTaskAssignor"))`
 and set these to the sticky assignor defaults if true?
   
   Where `assignorClassName` is equal to 
`streamsConfig.getString(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG)` -- I guess 
maybe we do want the public  `AssignmentConfigs` constructor to take in the 
StreamsConfig after all?



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##
@@ -481,10 +479,24 @@ private static int getCrossRackTrafficCost(final 
Set topicPa
  */
 private static boolean canPerformRackAwareOptimization(final 
ApplicationState applicationState,
  

[PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-05-30 Thread via GitHub


apourchet opened a new pull request, #16147:
URL: https://github.com/apache/kafka/pull/16147

   This PR takes care of making the call back 
to`TaskAssignor.onAssignmentComputed`.
   
   It also contains a change to the public AssignmentConfigs API, as well as 
some simplifications of the StickyTaskAssignor.
   
   This PR also changes the rack information fetching to happen lazily in the 
case where the TaskAssignor makes its decisions without said rack information.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org