[PR] KAFKA-16861: Don't convert to group to classic if the size is larger than group max size. [kafka]
frankvicky opened a new pull request, #16163: URL: https://github.com/apache/kafka/pull/16163 Fix the bug where the group downgrade to a classic one when a member leaves, even though the consumer group size is still larger than `classicGroupMaxSize`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16223: Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest [kafka]
chiacyu commented on code in PR #15989: URL: https://github.com/apache/kafka/pull/15989#discussion_r1623158977 ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java: ## @@ -1184,6 +1185,141 @@ public void testRestoreRestartRequestInconsistentState() { verify(configLog).stop(); } +@Test +public void testPutTaskConfigsZeroTasks() throws Exception { +when(configLog.partitionCount()).thenReturn(1); + +configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); +verifyConfigure(); +configStorage.start(); + +// Bootstrap as if we had already added the connector, but no tasks had been added yet +whiteBoxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.emptyList()); + +// Null before writing +ClusterConfigState configState = configStorage.snapshot(); +assertEquals(-1, configState.offset()); + +// Task configs should read to end, write to the log, read to end, write root. + doAnswer(expectReadToEnd(Collections.emptyMap())).when(configLog).readToEnd(); + +expectConvertWriteRead( +COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(0), +"tasks", 0); // We have 0 tasks + +configStorage.putTaskConfigs("connector1", Collections.emptyList()); + +// As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks +configUpdateListener.onTaskConfigUpdate(Collections.emptyList()); Review Comment: Thanks for the example, would apply. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Adjust validateOffsetCommit/Fetch in ConsumerGroup to ensure compatibility with classic protocol members [kafka]
dajac commented on PR #16145: URL: https://github.com/apache/kafka/pull/16145#issuecomment-2143311247 @dongnuo123 There are failed tests that seem related to the patch. Could you please take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16876) TaskManager.handleRevocation doesn't handle errors thrown from task.prepareCommit
Rohan Desai created KAFKA-16876: --- Summary: TaskManager.handleRevocation doesn't handle errors thrown from task.prepareCommit Key: KAFKA-16876 URL: https://issues.apache.org/jira/browse/KAFKA-16876 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.6.0 Reporter: Rohan Desai `TaskManager.handleRevocation` does not handle exceptions thrown by `task.prepareCommit`. In the particular instance I observed, `pepareCommit` flushed caches which led to downstream `producer.send` calls that threw a `TaskMigratedException`. This means that the tasks that need to be revoked are not suspended by `handleRevocation`. `ConsumerCoordinator` stores the thrown exception and then moves on to the other task assignment callbacks. One of these - `StreamsPartitionAssigner.onCommit` tries to close the tasks and raises an `IllegalStateException`. Fortunately, it dirty-closes the tasks if close fails so we don't leak any tasks. I think there's maybe two bugs here: # `TaskManager.handleRevocation` should handle errors from `prepareCommit`. It should try not to leave any revoked tasks in an unsuspended state. # The `ConsumerCoordinator` just throws the first exception that it sees. But it seems bad to throw the `TaskMigratedException` and drop the `IllegalStateException` (though in this case I think its relatively benign). I think on `IllegalStateException` we really want the streams thread to exit. One idea here is to have `ConsumerCoordinator` throw an exception type that includes the other exceptions that it has seen in another field. But this breaks the contract for clients that catch specific exceptions. I'm not sure of a clean solution, but I think its at least worth recording that it would be preferable to have the caller of `poll` handle all the thrown exceptions rather than just the first one. Here is the IllegalStateException stack trace I observed: {code:java} [ 508.535] [service_application2] [inf] [ERROR] 2024-05-30 06:35:04.556 [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-StreamThread-1] TaskManager - stream-thread [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-St reamThread-1] Failed to close task 0_3 cleanly. Attempting to close remaining tasks before re-throwing: [ 508.535] [service_application2] [inf] java.lang.IllegalStateException: Illegal state RUNNING while closing active task 0_3 [ 508.535] [service_application2] [inf] at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:673) ~[kafka-streams-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.streams.processor.internals.StreamTask.closeClean(StreamTask.java:546) ~[kafka-streams-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.streams.processor.internals.TaskManager.closeTaskClean(TaskManager.java:1295) ~[kafka-streams-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:630) [kafka-streams-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350) [kafka-streams-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381) [kafka-streams-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315) [kafka-clients-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469) [kafka-clients-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478) [kafka-clients-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389) [kafka-clients-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564) [kafka-clients-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220) [kafka-clients-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179) [kafka-clients-3.6.0.jar:?] [ 508.535] [service_application2] [inf] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) [kafka-clients-3.6.0.jar:?] [
Re: [PR] KAFKA-16223: Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest [kafka]
chia7712 commented on code in PR #15989: URL: https://github.com/apache/kafka/pull/15989#discussion_r1623140655 ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java: ## @@ -1315,4 +1451,22 @@ private Map structToMap(Struct struct) { for (Field field : struct.schema().fields()) result.put(field.name(), struct.get(field)); return result; } + +// Manually insert a connector into config storage, updating the task configs, connector config, and root config +private void whiteBoxAddConnector(String connectorName, Map connectorConfig, List> taskConfigs) { Review Comment: we don't use `whiteBox` now, so that can be simplified: ```java private void addConnector(String connectorName, Map connectorConfig, List> taskConfigs) { for (int i = 0; i < taskConfigs.size(); i++) configStorage.taskConfigs.put(new ConnectorTaskId(connectorName, i), taskConfigs.get(i)); configStorage.connectorConfigs.put(connectorName, connectorConfig); configStorage.connectorTaskCounts.put(connectorName, taskConfigs.size()); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16223: Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest [kafka]
chia7712 commented on code in PR #15989: URL: https://github.com/apache/kafka/pull/15989#discussion_r1623140561 ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java: ## @@ -1184,6 +1185,141 @@ public void testRestoreRestartRequestInconsistentState() { verify(configLog).stop(); } +@Test +public void testPutTaskConfigsZeroTasks() throws Exception { +when(configLog.partitionCount()).thenReturn(1); + +configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); +verifyConfigure(); +configStorage.start(); + +// Bootstrap as if we had already added the connector, but no tasks had been added yet +whiteBoxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.emptyList()); + +// Null before writing +ClusterConfigState configState = configStorage.snapshot(); +assertEquals(-1, configState.offset()); + +// Task configs should read to end, write to the log, read to end, write root. + doAnswer(expectReadToEnd(Collections.emptyMap())).when(configLog).readToEnd(); + +expectConvertWriteRead( +COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(0), +"tasks", 0); // We have 0 tasks + +configStorage.putTaskConfigs("connector1", Collections.emptyList()); + +// As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks +configUpdateListener.onTaskConfigUpdate(Collections.emptyList()); Review Comment: Please see following example: ```java @Test public void testPutTaskConfigsZeroTasks() throws Exception { configStorage.setupAndCreateKafkaBasedLog(TOPIC, config); verifyConfigure(); configStorage.start(); verify(configLog).start(); // Records to be read by consumer as it reads to the end of the log doAnswer(expectReadToEnd(new LinkedHashMap<>())). doAnswer(expectReadToEnd(Collections.singletonMap(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0 .when(configLog).readToEnd(); expectConvertWriteRead( COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(0), "tasks", 0); // We have 0 tasks // Bootstrap as if we had already added the connector, but no tasks had been added yet addConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.emptyList()); // Null before writing ClusterConfigState configState = configStorage.snapshot(); assertEquals(-1, configState.offset()); // Writing task configs should block until all the writes have been performed and the root record update // has completed List> taskConfigs = Collections.emptyList(); configStorage.putTaskConfigs("connector1", taskConfigs); // Validate root config by listing all connectors and tasks configState = configStorage.snapshot(); assertEquals(1, configState.offset()); String connectorName = CONNECTOR_IDS.get(0); assertEquals(Collections.singletonList(connectorName), new ArrayList<>(configState.connectors())); assertEquals(Collections.emptyList(), configState.tasks(connectorName)); assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors()); // As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks verify(configUpdateListener).onTaskConfigUpdate(Collections.emptyList()); configStorage.stop(); verify(configLog).stop(); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15630 Improve documentation of offset.lag.max [kafka]
ganesh-sadanala commented on code in PR #16080: URL: https://github.com/apache/kafka/pull/16080#discussion_r1623138681 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java: ## @@ -101,7 +101,13 @@ public class MirrorSourceConfig extends MirrorConnectorConfig { public static final Class CONFIG_PROPERTY_FILTER_CLASS_DEFAULT = DefaultConfigPropertyFilter.class; public static final String OFFSET_LAG_MAX = "offset.lag.max"; -private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote partition can be before it is resynced."; +private static final String OFFSET_LAG_MAX_DOC = "Determines the maximum allowed lag between the source and remote partitions before MirrorMaker initiates a resync operation to catch up the remote partition. The lag is calculated as the difference between the latest offset in the source partition and the last committed offset in the remote partition.\n" + +"\n" + Review Comment: new lines are not needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Sliding window approach to calculate non-zero punctuate-ratio metric [kafka]
ganesh-sadanala opened a new pull request, #16162: URL: https://github.com/apache/kafka/pull/16162 This pull request changes the method to calculate the `punctuate-ratio` metric. The current implementation calculates the metric after the last record of the poll loop. After a puntuate, the value is close to 1, but there is little chance that metric is sampled at this time. So its value is almost always 0. The updated implementation calculates the metric value over the window of last 30 seconds. *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15630 Improve documentation of offset.lag.max [kafka]
ganesh-sadanala commented on code in PR #16080: URL: https://github.com/apache/kafka/pull/16080#discussion_r1623135976 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java: ## @@ -101,7 +101,13 @@ public class MirrorSourceConfig extends MirrorConnectorConfig { public static final Class CONFIG_PROPERTY_FILTER_CLASS_DEFAULT = DefaultConfigPropertyFilter.class; public static final String OFFSET_LAG_MAX = "offset.lag.max"; -private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote partition can be before it is resynced."; +private static final String OFFSET_LAG_MAX_DOC = "Determines the maximum allowed lag between the source and remote partitions before MirrorMaker initiates a resync operation to catch up the remote partition. The lag is calculated as the difference between the latest offset in the source partition and the last committed offset in the remote partition.\n" + +"\n" + +"When the lag for a remote partition exceeds the offset.lag.max value, MirrorMaker will initiate a resync operation to catch up the remote partition with the source partition. This involves reading records from the source partition starting from the last committed offset in the remote partition and writing them to the remote partition.\n" + +"\n" + +"Setting offset.lag.max to a lower value can be beneficial in scenarios where records may not flow constantly or at a consistent rate, as it ensures the remote partitions stay more closely in sync with the source partitions during periods of low throughput or inactivity. On the other hand, setting it to a higher value can be useful when the source topic has high throughput and the remote partitions can tolerate a larger lag.\n" + Review Comment: Agree! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14509: [4/4] Handle includeAuthorizedOperations [kafka]
chia7712 commented on PR #16158: URL: https://github.com/apache/kafka/pull/16158#issuecomment-2143262308 out of curiosity, do we have IT for that option? I grep code base and it seems the related ITs are running with old coordinator/protocol. For example: 1. https://github.com/apache/kafka/blob/fb566e48bf05d749f8db8da803a3570acf25bb11/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala#L1213 2. https://github.com/apache/kafka/blob/fb566e48bf05d749f8db8da803a3570acf25bb11/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala#L139 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16807: DescribeLogDirsResponseData#results#topics have unexpected topics having empty partitions [kafka]
m1a2st commented on PR #16042: URL: https://github.com/apache/kafka/pull/16042#issuecomment-2143228875 I already rebased the code -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: speed up reset consumer group offset test [kafka]
m1a2st commented on PR #16155: URL: https://github.com/apache/kafka/pull/16155#issuecomment-2143228699 Ok, I resolved the conflict -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: speed up reset consumer group offset test [kafka]
chia7712 commented on PR #16155: URL: https://github.com/apache/kafka/pull/16155#issuecomment-2143225807 @m1a2st Could you please fix the conflicts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16807: DescribeLogDirsResponseData#results#topics have unexpected topics having empty partitions [kafka]
chia7712 commented on PR #16042: URL: https://github.com/apache/kafka/pull/16042#issuecomment-2143225345 @m1a2st Could you rebase code to include newest fixes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]
ableegoldman commented on code in PR #16147: URL: https://github.com/apache/kafka/pull/16147#discussion_r1623081838 ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java: ## @@ -245,55 +251,46 @@ private AssignmentState(final ApplicationState applicationState, final int maxPairs = taskCount * (taskCount - 1) / 2; this.taskPairs = new TaskPairs(maxPairs); -this.newTaskLocations = new HashMap<>(); -this.newAssignments = new HashMap<>(); +this.newTaskLocations = previousActiveAssignment.keySet().stream() +.collect(Collectors.toMap(Function.identity(), taskId -> new HashSet<>())); +this.newAssignments = clients.values().stream().collect(Collectors.toMap( +KafkaStreamsState::processId, +state -> KafkaStreamsAssignment.of(state.processId(), new HashSet<>()) +)); } public void finalizeAssignment(final TaskId taskId, final ProcessId client, final AssignedTask.Type type) { Review Comment: Overlooked this earlier I guess, these should all be private methods right? Pretty much everything other than #assign ? ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java: ## @@ -245,55 +251,46 @@ private AssignmentState(final ApplicationState applicationState, final int maxPairs = taskCount * (taskCount - 1) / 2; this.taskPairs = new TaskPairs(maxPairs); -this.newTaskLocations = new HashMap<>(); -this.newAssignments = new HashMap<>(); +this.newTaskLocations = previousActiveAssignment.keySet().stream() +.collect(Collectors.toMap(Function.identity(), taskId -> new HashSet<>())); +this.newAssignments = clients.values().stream().collect(Collectors.toMap( +KafkaStreamsState::processId, +state -> KafkaStreamsAssignment.of(state.processId(), new HashSet<>()) +)); } public void finalizeAssignment(final TaskId taskId, final ProcessId client, final AssignedTask.Type type) { -newAssignments.computeIfAbsent(client, k -> new HashSet<>()); -newTaskLocations.computeIfAbsent(taskId, k -> new HashSet<>()); - -final Set newAssignmentsForClient = newAssignments.get(client) -.stream().map(AssignedTask::id).collect(Collectors.toSet()); - +final Set newAssignmentsForClient = newAssignments.get(client).tasks().keySet(); taskPairs.addPairs(taskId, newAssignmentsForClient); -newAssignments.get(client).add(new AssignedTask(taskId, type)); -newTaskLocations.get(taskId).add(client); + +newAssignments.get(client).assignTask(new AssignedTask(taskId, type)); +newTaskLocations.computeIfAbsent(taskId, k -> new HashSet<>()).add(client); } -public Map buildKafkaStreamsAssignments() { -final Map kafkaStreamsAssignments = new HashMap<>(); -for (final Map.Entry> entry : newAssignments.entrySet()) { -final ProcessId processId = entry.getKey(); -final Set assignedTasks = newAssignments.get(processId); -final KafkaStreamsAssignment assignment = KafkaStreamsAssignment.of(processId, assignedTasks); -kafkaStreamsAssignments.put(processId, assignment); -} -return kafkaStreamsAssignments; +public Map newAssignments() { +return newAssignments; } public void processOptimizedAssignments(final Map optimizedAssignments) { Review Comment: now that `#processOptimizedAssignments` is only updating the `newTaskLocations`, can we simplify things further by just keeping the `newTaskLocations` map up to date as we move tasks around? That way we can get rid of `processOptimizedAssignments` altogether ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java: ## @@ -245,55 +251,46 @@ private AssignmentState(final ApplicationState applicationState, final int maxPairs = taskCount * (taskCount - 1) / 2; this.taskPairs = new TaskPairs(maxPairs); -this.newTaskLocations = new HashMap<>(); -this.newAssignments = new HashMap<>(); +this.newTaskLocations = previousActiveAssignment.keySet().stream() +.collect(Collectors.toMap(Function.identity(), taskId -> new HashSet<>())); +this.newAssignments = clients.values().stream().collect(Collectors.toMap( +KafkaStreamsState::processId, +state -> KafkaStreamsAssignment.of(state.processId(), new HashSet<>()) +)); } public void finalizeAssignment(final TaskId taskId, fin
Re: [PR] KAFKA-15305: The background thread should try to process the remaining task until the shutdown timer is expired. [kafka]
frankvicky commented on code in PR #16156: URL: https://github.com/apache/kafka/pull/16156#discussion_r1622619577 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -298,7 +298,7 @@ private void sendUnsentRequests(final Timer timer) { do { Review Comment: Cool, I think we don't need this early return statement in the current situation because we also need to consider if there are any in-flight requests. The reason why we have this statement here is because we didn't consider the in-flight requests before, so we could return if there are no unsent requests. 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]
ableegoldman commented on code in PR #16147: URL: https://github.com/apache/kafka/pull/16147#discussion_r1623079947 ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java: ## @@ -173,6 +178,7 @@ private static void assignStandby(final ApplicationState applicationState, final AssignmentState assignmentState) { final Set statefulTasks = applicationState.allTasks().values().stream() .filter(TaskInfo::isStateful) Review Comment: you can remove this line actually, the changelogs check is sufficient (not to mention I just realized this filter is probably broken at the moment anyways since we still need to fix the `stateStoreNames` thing) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]
ableegoldman commented on code in PR #16147: URL: https://github.com/apache/kafka/pull/16147#discussion_r1623078889 ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java: ## @@ -40,13 +41,17 @@ import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils; import org.apache.kafka.streams.processor.assignment.TaskAssignor; import org.apache.kafka.streams.processor.assignment.TaskInfo; +import org.apache.kafka.streams.processor.assignment.TaskTopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class StickyTaskAssignor implements TaskAssignor { private static final Logger LOG = LoggerFactory.getLogger(StickyTaskAssignor.class); +public static final int DEFAULT_STATEFUL_TRAFFIC_COST = 1; +public static final int DEFAULT_STATEFUL_NON_OVERLAP_COST = 10; Review Comment: so weird. why did they name it "stateful"? both of these assignors, and AFAICT the configs themselves, pertain to both stateless and stateful tasks... 🤷♀️ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]
ableegoldman commented on code in PR #16147: URL: https://github.com/apache/kafka/pull/16147#discussion_r1623064697 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java: ## @@ -263,6 +267,8 @@ public Optional user final org.apache.kafka.streams.processor.assignment.TaskAssignor assignor = Utils.newInstance(userTaskAssignorClassname, org.apache.kafka.streams.processor.assignment.TaskAssignor.class); log.info("Instantiated {} as the task assignor.", userTaskAssignorClassname); +assignor.configure(streamsConfig.originals()); +log.info("Configured task assignor {} with the StreamsConfig.", userTaskAssignorClassname); Review Comment: I think we only need to log something once about the assignor, the first one is good enough so I'd just remove this ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java: ## @@ -45,8 +45,8 @@ public class HighAvailabilityTaskAssignor implements TaskAssignor { private static final Logger log = LoggerFactory.getLogger(HighAvailabilityTaskAssignor.class); -private static final int DEFAULT_STATEFUL_TRAFFIC_COST = 10; -private static final int DEFAULT_STATEFUL_NON_OVERLAP_COST = 1; +public static final int DEFAULT_STATEFUL_TRAFFIC_COST = 10; Review Comment: ditto here: let's use a different variable name for the different classes, too confusing if we're mixing them up by calling them outside of this class now. So I guess `DEFAULT_HIGH_AVAILABILITY_XXX_COST` (or maybe just `DEFAULT_HA_XXX_COST`?) ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ## @@ -1492,8 +1521,7 @@ public void onAssignment(final Assignment assignment, final ConsumerGroupMetadat topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost); encodedNextScheduledRebalanceMs = Long.MAX_VALUE; break; -case 6: -validateActiveTaskEncoding(partitions, info, logPrefix); +case 6: validateActiveTaskEncoding(partitions, info, logPrefix); Review Comment: accidental formatting change? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ## @@ -790,8 +813,14 @@ private UserTaskAssignmentListener assignTasksToClients(final Cluster fullMetada final org.apache.kafka.streams.processor.assignment.TaskAssignor assignor = userTaskAssignor.get(); final TaskAssignment taskAssignment = assignor.assign(applicationState); final AssignmentError assignmentError = validateTaskAssignment(applicationState, taskAssignment); -processStreamsPartitionAssignment(clientMetadataMap, taskAssignment); -userTaskAssignmentListener = (assignment, subscription) -> assignor.onAssignmentComputed(assignment, subscription, assignmentError); +processStreamsPartitionAssignment(assignor, taskAssignment, assignmentError, clientMetadataMap, groupSubscription); +userTaskAssignmentListener = (assignment, subscription) -> { +assignor.onAssignmentComputed(assignment, subscription, assignmentError); +if (assignmentError != AssignmentError.NONE) { +throw new StreamsException("Task assignment with " + assignor.getClass() + Review Comment: ditto here: log an error before throwing and change assignor.getClass() to assignor.getClass().getName() ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ## @@ -573,14 +586,23 @@ private ApplicationState buildApplicationState(final TopologyMetadata topologyMe )); return new DefaultApplicationState( -assignmentConfigs.toPublicAssignmentConfigs(), +publicAssignmentConfigs, logicalTasks, clientMetadataMap ); } -private static void processStreamsPartitionAssignment(final Map clientMetadataMap, - final TaskAssignment taskAssignment) { +private static void processStreamsPartitionAssignment(final org.apache.kafka.streams.processor.assignment.TaskAssignor assignor, + final TaskAssignment taskAssignment, + final AssignmentError assignmentError, + final Map clientMetadataMap, + final GroupSubscription groupSubscription) { +if (assignmentError == AssignmentError.UNKNOWN_PROCESS_ID || assignmentError == AssignmentError.UNKNOWN_TASK_ID) { +assignor.onA
Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]
ableegoldman commented on code in PR #16147: URL: https://github.com/apache/kafka/pull/16147#discussion_r1623063757 ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java: ## @@ -29,20 +32,47 @@ public class AssignmentConfigs { private final int numStandbyReplicas; private final long probingRebalanceIntervalMs; private final List rackAwareAssignmentTags; -private final int rackAwareTrafficCost; -private final int rackAwareNonOverlapCost; +private final OptionalInt rackAwareTrafficCost; +private final OptionalInt rackAwareNonOverlapCost; private final String rackAwareAssignmentStrategy; -public AssignmentConfigs(final StreamsConfig configs) { -this( -configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG), -configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG), -configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG), - configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG), -configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG), - configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG), - configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG), - configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG) +public static AssignmentConfigs of(final StreamsConfig configs) { +final long acceptableRecoveryLag = configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG); +final int maxWarmupReplicas = configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG); +final int numStandbyReplicas = configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG); +final long probingRebalanceIntervalMs = configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG); +final List rackAwareAssignmentTags = configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG); +final String rackAwareAssignmentStrategy = configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG); +Integer rackAwareTrafficCost = configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG); +Integer rackAwareNonOverlapCost = configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG); + +final String assignorClassName = configs.getString(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG); +if (StickyTaskAssignor.class.getName().equals(assignorClassName)) { +if (rackAwareTrafficCost == null) { +rackAwareTrafficCost = StickyTaskAssignor.DEFAULT_STICKY_TRAFFIC_COST; +} +if (rackAwareNonOverlapCost == null) { +rackAwareNonOverlapCost = StickyTaskAssignor.DEFAULT_STICKY_NON_OVERLAP_COST; +} +} else if (HighAvailabilityTaskAssignor.class.getName().equals(assignorClassName)) { +// TODO KAFKA-16869: replace with the HighAvailabilityTaskAssignor class once it implements the new TaskAssignor interface +if (rackAwareTrafficCost == null) { +rackAwareTrafficCost = HighAvailabilityTaskAssignor.DEFAULT_STATEFUL_TRAFFIC_COST; +} +if (rackAwareNonOverlapCost == null) { +rackAwareNonOverlapCost = HighAvailabilityTaskAssignor.DEFAULT_STATEFUL_NON_OVERLAP_COST; +} +} + +return new AssignmentConfigs( +acceptableRecoveryLag, +maxWarmupReplicas, +numStandbyReplicas, +probingRebalanceIntervalMs, +rackAwareAssignmentTags, +OptionalInt.of(rackAwareTrafficCost), +OptionalInt.of(rackAwareNonOverlapCost), Review Comment: Does this need to be `ofNullable`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15045) Move Streams task assignor to public configs
[ https://issues.apache.org/jira/browse/KAFKA-15045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-15045: --- Description: https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams > Move Streams task assignor to public configs > > > Key: KAFKA-15045 > URL: https://issues.apache.org/jira/browse/KAFKA-15045 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: A. Sophie Blee-Goldman >Priority: Major > Labels: kip > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]
apourchet commented on code in PR #16147: URL: https://github.com/apache/kafka/pull/16147#discussion_r1623054265 ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java: ## @@ -29,32 +33,59 @@ public class AssignmentConfigs { private final int numStandbyReplicas; private final long probingRebalanceIntervalMs; private final List rackAwareAssignmentTags; -private final int rackAwareTrafficCost; -private final int rackAwareNonOverlapCost; +private final OptionalInt rackAwareTrafficCost; +private final OptionalInt rackAwareNonOverlapCost; private final String rackAwareAssignmentStrategy; -public AssignmentConfigs(final StreamsConfig configs) { -this( -configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG), -configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG), -configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG), - configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG), -configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG), - configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG), - configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG), - configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG) +public static AssignmentConfigs of(final StreamsConfig configs) { +final long acceptableRecoveryLag = configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG); +final int maxWarmupReplicas = configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG); +final int numStandbyReplicas = configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG); +final long probingRebalanceIntervalMs = configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG); +final List rackAwareAssignmentTags = configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG); +final String rackAwareAssignmentStrategy = configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG); +Optional rackAwareTrafficCost = Optional.ofNullable(configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG)); Review Comment: We need to map from Integer anyway, because otherwise the `OptionalInt.of(int)` construction will NPE. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16875) Replace ClientState with TaskAssignment when creating individual consumer Assignments
A. Sophie Blee-Goldman created KAFKA-16875: -- Summary: Replace ClientState with TaskAssignment when creating individual consumer Assignments Key: KAFKA-16875 URL: https://issues.apache.org/jira/browse/KAFKA-16875 Project: Kafka Issue Type: Sub-task Reporter: A. Sophie Blee-Goldman In the initial implementation of KIP-924 in version 3.8, we converted from the new TaskAssignor's output type (TaskAssignment) into the old ClientState-based assignment representation. This allowed us to plug in a custom assignor without converting all the internal mechanisms that occur after the KafkaStreams client level assignment and process it into a consumer level assignment. However we ultimately want to get rid of ClientState altogether, so we need to invert this logic so that we instead convert the ClientState into a TaskAssignment and then use the TaskAssignment to process the assigned tasks into consumer Assignments -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16874) Remove old TaskAssignor interface
A. Sophie Blee-Goldman created KAFKA-16874: -- Summary: Remove old TaskAssignor interface Key: KAFKA-16874 URL: https://issues.apache.org/jira/browse/KAFKA-16874 Project: Kafka Issue Type: Sub-task Reporter: A. Sophie Blee-Goldman Once we have the new HAAssignor that implements the new TaskAssignor interface, we can remove the old TaskAssignor interface. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16873) Remove StreamsConfig.INTERNAL_TASK_ASSIGNOR_CLASS
A. Sophie Blee-Goldman created KAFKA-16873: -- Summary: Remove StreamsConfig.INTERNAL_TASK_ASSIGNOR_CLASS Key: KAFKA-16873 URL: https://issues.apache.org/jira/browse/KAFKA-16873 Project: Kafka Issue Type: Sub-task Reporter: A. Sophie Blee-Goldman Once we have all the out-of-the-box assignors implementing the new TaskAssignor interface that corresponds to the new public task assignor config, we can remove the old internal task assignor config altogether. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16872) Remove ClientState class
A. Sophie Blee-Goldman created KAFKA-16872: -- Summary: Remove ClientState class Key: KAFKA-16872 URL: https://issues.apache.org/jira/browse/KAFKA-16872 Project: Kafka Issue Type: Sub-task Reporter: A. Sophie Blee-Goldman One of the end-state goals of KIP-924 is to remove the ClientState class altogether. There are some blockers to this such as the removal of the old internal task assignor config and the old HAAssignor, so this ticket will probably be one of the very last KAFKA-16868 subtasks to be tackled. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16871) Clean up internal AssignmentConfigs class in Streams
A. Sophie Blee-Goldman created KAFKA-16871: -- Summary: Clean up internal AssignmentConfigs class in Streams Key: KAFKA-16871 URL: https://issues.apache.org/jira/browse/KAFKA-16871 Project: Kafka Issue Type: Sub-task Reporter: A. Sophie Blee-Goldman In KIP-924 we added a new public AssignmentConfigs class to hold all of the, you guessed it, assignment related configs. However, there is an existing config class of the same name and largely the same contents but that's in an internal package, specifically inside the AssignorConfiguration class. We should remove the old AssignmentConfigs class that's in AssignorConfiguration and replace any usages of it with the new public AssignmentConfigs that we added in KIP-924 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]
apourchet commented on code in PR #16147: URL: https://github.com/apache/kafka/pull/16147#discussion_r1623057182 ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java: ## @@ -40,13 +41,17 @@ import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils; import org.apache.kafka.streams.processor.assignment.TaskAssignor; import org.apache.kafka.streams.processor.assignment.TaskInfo; +import org.apache.kafka.streams.processor.assignment.TaskTopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class StickyTaskAssignor implements TaskAssignor { private static final Logger LOG = LoggerFactory.getLogger(StickyTaskAssignor.class); +public static final int DEFAULT_STATEFUL_TRAFFIC_COST = 1; +public static final int DEFAULT_STATEFUL_NON_OVERLAP_COST = 10; Review Comment: I just copied these from the original internal StickyTaskAssignor, for the sake of consistency. I'll change them to `DEFAULT_STICKY_*` though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]
apourchet commented on code in PR #16147: URL: https://github.com/apache/kafka/pull/16147#discussion_r1623054265 ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java: ## @@ -29,32 +33,59 @@ public class AssignmentConfigs { private final int numStandbyReplicas; private final long probingRebalanceIntervalMs; private final List rackAwareAssignmentTags; -private final int rackAwareTrafficCost; -private final int rackAwareNonOverlapCost; +private final OptionalInt rackAwareTrafficCost; +private final OptionalInt rackAwareNonOverlapCost; private final String rackAwareAssignmentStrategy; -public AssignmentConfigs(final StreamsConfig configs) { -this( -configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG), -configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG), -configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG), - configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG), -configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG), - configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG), - configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG), - configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG) +public static AssignmentConfigs of(final StreamsConfig configs) { +final long acceptableRecoveryLag = configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG); +final int maxWarmupReplicas = configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG); +final int numStandbyReplicas = configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG); +final long probingRebalanceIntervalMs = configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG); +final List rackAwareAssignmentTags = configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG); +final String rackAwareAssignmentStrategy = configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG); +Optional rackAwareTrafficCost = Optional.ofNullable(configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG)); Review Comment: We need to map from Integer anyway, because otherwise the `OptionalInt.of(int)` construction will NPE. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]
ableegoldman commented on code in PR #16147: URL: https://github.com/apache/kafka/pull/16147#discussion_r1623047318 ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java: ## @@ -29,32 +33,59 @@ public class AssignmentConfigs { private final int numStandbyReplicas; private final long probingRebalanceIntervalMs; private final List rackAwareAssignmentTags; -private final int rackAwareTrafficCost; -private final int rackAwareNonOverlapCost; +private final OptionalInt rackAwareTrafficCost; +private final OptionalInt rackAwareNonOverlapCost; private final String rackAwareAssignmentStrategy; -public AssignmentConfigs(final StreamsConfig configs) { -this( -configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG), -configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG), -configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG), - configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG), -configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG), - configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG), - configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG), - configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG) +public static AssignmentConfigs of(final StreamsConfig configs) { +final long acceptableRecoveryLag = configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG); +final int maxWarmupReplicas = configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG); +final int numStandbyReplicas = configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG); +final long probingRebalanceIntervalMs = configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG); +final List rackAwareAssignmentTags = configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG); +final String rackAwareAssignmentStrategy = configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG); +Optional rackAwareTrafficCost = Optional.ofNullable(configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG)); +Optional rackAwareNonOverlapCost = Optional.ofNullable(configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG)); + +final String assignorClassName = configs.getString(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG); +if (StickyTaskAssignor.class.getName().equals(assignorClassName)) { +if (!rackAwareTrafficCost.isPresent()) { +rackAwareTrafficCost = Optional.of(StickyTaskAssignor.DEFAULT_STATEFUL_TRAFFIC_COST); +} +if (!rackAwareNonOverlapCost.isPresent()) { +rackAwareNonOverlapCost = Optional.of(StickyTaskAssignor.DEFAULT_STATEFUL_NON_OVERLAP_COST); +} +} + Review Comment: nit: should be an else-if rather than two separate if statements ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java: ## @@ -29,32 +33,59 @@ public class AssignmentConfigs { private final int numStandbyReplicas; private final long probingRebalanceIntervalMs; private final List rackAwareAssignmentTags; -private final int rackAwareTrafficCost; -private final int rackAwareNonOverlapCost; +private final OptionalInt rackAwareTrafficCost; +private final OptionalInt rackAwareNonOverlapCost; private final String rackAwareAssignmentStrategy; -public AssignmentConfigs(final StreamsConfig configs) { -this( -configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG), -configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG), -configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG), - configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG), -configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG), - configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG), - configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG), - configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG) +public static AssignmentConfigs of(final StreamsConfig configs) { +final long acceptableRecoveryLag = configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG); +final int maxWarmupReplicas = configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG); +final int numStandbyReplicas = configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG); +final long probingRebalanceIntervalMs = configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG); +final List rackAwareAssignmentTags = configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG); +final String rackAwareAssign
[jira] [Created] (KAFKA-16870) Values.parseString returns objects which fail ConnectSchema.validateValue
Greg Harris created KAFKA-16870: --- Summary: Values.parseString returns objects which fail ConnectSchema.validateValue Key: KAFKA-16870 URL: https://issues.apache.org/jira/browse/KAFKA-16870 Project: Kafka Issue Type: Task Components: connect Reporter: Greg Harris Values.parseString attempts to parse schema'd data out of blind strings. It opportunistically parses maps and arrays, and tries to find a common schema that all values can be cast to. If parsing succeeds but the values don't have a common schema, the Values class emits containers with null inner schemas (schemaless elements, keys, or values). These are not acceptable in ConnectSchema.validateValue, which currently throws an NPE, and after KAFKA-16858 will throw DataException. We should avoid producing bad data from the Values class (and the SimpleHeaderConverter which relies on it) which causes exceptions when used later, for example, as the value of a Struct. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16869) Rewrite HighAvailabilityTaskAssignor to implement the new TaskAssignor interface
A. Sophie Blee-Goldman created KAFKA-16869: -- Summary: Rewrite HighAvailabilityTaskAssignor to implement the new TaskAssignor interface Key: KAFKA-16869 URL: https://issues.apache.org/jira/browse/KAFKA-16869 Project: Kafka Issue Type: Sub-task Components: streams Reporter: A. Sophie Blee-Goldman We need to add a new HighAvailabilityTaskAssignor that implements the new TaskAssignor interface. Once we have that, we need to remember to also make these related changes: # Change the StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG default from null to the new HAAssignor # Check for this new HAAssignor type when evaluating the OptionalInt rack-aware assignment configs in the public AssignmentConfigs class. If these configs are Optional.empty() and the new HAAssignor is used, they should be overridden to the HAAssignor-specific default values. This code already exists but should be updated to check for the new HAAssignor class name instead of "null" # Until the old HAAssignor and old internal task assignor config can be removed completely, make sure the new HAAssignor is used by default when a TaskAssignor is selected in StreamsPartitionAssignor -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16868) Post KIP-924 StreamsPartitionAssignor code cleanup
A. Sophie Blee-Goldman created KAFKA-16868: -- Summary: Post KIP-924 StreamsPartitionAssignor code cleanup Key: KAFKA-16868 URL: https://issues.apache.org/jira/browse/KAFKA-16868 Project: Kafka Issue Type: Improvement Reporter: A. Sophie Blee-Goldman Making an umbrella task for all of the tech debt and code consolidation cleanup work that can/should be done following the implementation of [KIP-924: customizable task assignment for Streams|https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams] Most of this revolves around deduplicating code once it's no longer needed, including classes like the ClientState, StandbyTaskAssignor and related elements, and the old TaskAssignor interface along with its implementations. Note that in 3.8, the first version in which KIP-924 was released, we just added the new public config and new TaskAssignor interface but did not get rid of the old internal config or old TaskAssignor interface. If neither config is set in 3.8 we still default to the old HAAssignor, as a kind of opt-in feature flag, and internally will convert the output of the new TaskAssignor into the old style of ClientState-based assignment tracking. We intend to clean up all of the old code and eventually support only the new TaskAssignor interface as well as converting everything internally from the ClientState to the TaskAssignment/KafkaStreamsAssignment style output -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16858: Throw DataException from validateValue on array and map schemas without inner schemas [kafka]
gharris1727 opened a new pull request, #16161: URL: https://github.com/apache/kafka/pull/16161 The SchemaBuilder interface allows for objects with null valueSchema and/or keySchema to be constructed. These currently cause the validateValue to throw an NPE on non-empty containers. This changes empty and non-empty containers to throw DataException instead. The first commit rewrites some existing tests, and adds assertions for the prior behavior. Look at only changes in the later commits to see the actual behavior change. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16047: Use REQUEST_TIMEOUT_MS_CONFIG in AdminClient.fenceProducers [kafka]
edoardocomar commented on code in PR #16151: URL: https://github.com/apache/kafka/pull/16151#discussion_r1623013225 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -4569,7 +4569,7 @@ public ListTransactionsResult listTransactions(ListTransactionsOptions options) public FenceProducersResult fenceProducers(Collection transactionalIds, FenceProducersOptions options) { AdminApiFuture.SimpleAdminApiFuture future = FenceProducersHandler.newFuture(transactionalIds); -FenceProducersHandler handler = new FenceProducersHandler(logContext); +FenceProducersHandler handler = new FenceProducersHandler(logContext, requestTimeoutMs); Review Comment: https://issues.apache.org/jira/browse/KAFKA-16047?focusedCommentId=17851019 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16570) FenceProducers API returns "unexpected error" when successful
[ https://issues.apache.org/jira/browse/KAFKA-16570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar reassigned KAFKA-16570: - Assignee: Edoardo Comar (was: Justine Olshan) > FenceProducers API returns "unexpected error" when successful > - > > Key: KAFKA-16570 > URL: https://issues.apache.org/jira/browse/KAFKA-16570 > Project: Kafka > Issue Type: Bug >Reporter: Justine Olshan >Assignee: Edoardo Comar >Priority: Major > > When we want to fence a producer using the admin client, we send an > InitProducerId request. > There is logic in that API to fence (and abort) any ongoing transactions and > that is what the API relies on to fence the producer. However, this handling > also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because > we want to actually get a new producer ID and want to retry until the the ID > is supplied or we time out. > [https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170] > > [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1322] > > In the case of fence producer, we don't retry and instead we have no handling > for concurrent transactions and log a message about an unexpected error. > [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112] > > This is not unexpected though and the operation was successful. We should > just swallow this error and treat this as a successful run of the command. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]
junrao commented on code in PR #15673: URL: https://github.com/apache/kafka/pull/15673#discussion_r1623011371 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -228,7 +231,7 @@ public enum MetadataVersion { * Think carefully before you update this value. ONCE A METADATA VERSION IS PRODUCTION, * IT CANNOT BE CHANGED. */ -public static final MetadataVersion LATEST_PRODUCTION = IBP_3_7_IV4; +public static final MetadataVersion LATEST_PRODUCTION = IBP_3_8_IV1; Review Comment: Thanks, @CalvinConfluent . @clolov : In that case, we will need to (1) mark IBP_3_8_IV0 as unused, (2) create a new MV IBP_4_0_IV0, (3) replace existing references of IBP_3_8_IV0 related to Elr like the following to IBP_4_0_IV0. ``` public boolean isElrSupported() { return this.isAtLeast(IBP_3_8_IV0); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]
CalvinConfluent commented on code in PR #15673: URL: https://github.com/apache/kafka/pull/15673#discussion_r1622979418 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -228,7 +231,7 @@ public enum MetadataVersion { * Think carefully before you update this value. ONCE A METADATA VERSION IS PRODUCTION, * IT CANNOT BE CHANGED. */ -public static final MetadataVersion LATEST_PRODUCTION = IBP_3_7_IV4; +public static final MetadataVersion LATEST_PRODUCTION = IBP_3_8_IV1; Review Comment: Not yet. We are still finalizing the last PR. https://github.com/apache/kafka/pull/15702 @clolov I don't want to block your feature, we should let KIP-966 use another metadata version and mark the IBP_3_8_IV0 as reserved. @junrao What is your suggestion here? Using a future metadata version like IBP_4_0_IV0 for KIP-966 at the moment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]
CalvinConfluent commented on code in PR #15673: URL: https://github.com/apache/kafka/pull/15673#discussion_r1622977589 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -228,7 +231,7 @@ public enum MetadataVersion { * Think carefully before you update this value. ONCE A METADATA VERSION IS PRODUCTION, * IT CANNOT BE CHANGED. */ -public static final MetadataVersion LATEST_PRODUCTION = IBP_3_7_IV4; +public static final MetadataVersion LATEST_PRODUCTION = IBP_3_8_IV1; Review Comment: Not yet. We are still finalizing the last PR. https://github.com/apache/kafka/pull/15702 @clolov I don't want to block your feature, we should mark KIP-966 enabled for another metadata version. @junrao What is your suggestion here? Using a future metadata version like IBP_4_0_IV0 for KIP-966? Also, should I make a change separately or we can do it in this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]
CalvinConfluent commented on code in PR #15673: URL: https://github.com/apache/kafka/pull/15673#discussion_r1622977589 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -228,7 +231,7 @@ public enum MetadataVersion { * Think carefully before you update this value. ONCE A METADATA VERSION IS PRODUCTION, * IT CANNOT BE CHANGED. */ -public static final MetadataVersion LATEST_PRODUCTION = IBP_3_7_IV4; +public static final MetadataVersion LATEST_PRODUCTION = IBP_3_8_IV1; Review Comment: Not yet. We are still finalizing the last PR. https://github.com/apache/kafka/pull/15702 @clolov I don't want to block your feature, we should mark KIP-966 enabled for another metadata version. @junrao What is your suggestion here? Using a future metadata version like IBP_4_0_IV0 for KIP-966? Also, should I make a change separately or we can do it in this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]
clolov commented on code in PR #15673: URL: https://github.com/apache/kafka/pull/15673#discussion_r1622968644 ## core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala: ## @@ -55,7 +55,7 @@ class ListOffsetsRequestTest extends BaseRequestTest { .build() val debugReplicaRequest = ListOffsetsRequest.Builder - .forReplica(ApiKeys.LIST_OFFSETS.latestVersion, ListOffsetsRequest.DEBUGGING_REPLICA_ID) + .forReplica(ApiKeys.LIST_OFFSETS.latestVersion(), ListOffsetsRequest.DEBUGGING_REPLICA_ID) Review Comment: We do not, I will raise another version tomorrow morning which address this + fix the merge conflicts -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16856) Zookeeper - Add new exception
[ https://issues.apache.org/jira/browse/KAFKA-16856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christo Lolov reassigned KAFKA-16856: - Assignee: Muralidhar Basani > Zookeeper - Add new exception > - > > Key: KAFKA-16856 > URL: https://issues.apache.org/jira/browse/KAFKA-16856 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Assignee: Muralidhar Basani >Priority: Major > > *Summary* > Add TIERED_STORAGE_DISABLEMENT_IN_PROGRESS exception -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16867) Streams should run tag-based standby assignment based on rack ids
A. Sophie Blee-Goldman created KAFKA-16867: -- Summary: Streams should run tag-based standby assignment based on rack ids Key: KAFKA-16867 URL: https://issues.apache.org/jira/browse/KAFKA-16867 Project: Kafka Issue Type: Bug Components: streams Reporter: A. Sophie Blee-Goldman In KIP-708, we introduced a tag-based standby task assignment algorithm that runs if the user has configured their clients with "rack aware assignment tags". If no tags are configured, the default load-based standby task assignment algorithm is run instead. In KIP-924 we introduced a different kind of rack-aware assignment logic which is based on the "rack.id" of the consumers and topic partitions. While this did not replace the tag-based rack-aware assignment of KIP-708 which had different (and opposing) goals, we realized that Streams could leverage the rack.ids to run the tag-based standby task assignment algorithm even if clients were not configured with assignment tags. Unfortunately, during implementation of KIP-924, a bug in the logic meant that the tag-based algorithm was never actually being run based on the rack ids. This bug is present to this day and carried over (intentionally) during the task assignor refactoring of KIP-924. We should still fix this bug so that users can benefit from the resiliency of KIP-708 based on consumer rack ids, even if they did not explicitly opt-in by configuring clients with assignment tags, since KIP-708 is a net benefit with no downside -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16248) Kafka consumer should cache leader offset ranges
[ https://issues.apache.org/jira/browse/KAFKA-16248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-16248: --- Assignee: Alieh Saeedi > Kafka consumer should cache leader offset ranges > > > Key: KAFKA-16248 > URL: https://issues.apache.org/jira/browse/KAFKA-16248 > Project: Kafka > Issue Type: Bug >Reporter: Lucas Brutschy >Assignee: Alieh Saeedi >Priority: Critical > > We noticed a streams application received an OFFSET_OUT_OF_RANGE error > following a network partition and streams task rebalance and subsequently > reset its offsets to the beginning. > Inspecting the logs, we saw multiple consumer log messages like: > {code:java} > Setting offset for partition tp to the committed offset > FetchPosition{offset=1234, offsetEpoch=Optional.empty...) > {code} > Inspecting the streams code, it looks like kafka streams calls `commitSync` > passing through an explicit OffsetAndMetadata object but does not populate > the offset leader epoch. > The offset leader epoch is required in the offset commit to ensure that all > consumers in the consumer group have coherent metadata before fetching. > Otherwise after a consumer group rebalance, a consumer may fetch with a stale > leader epoch with respect to the committed offset and get an offset out of > range error from a zombie partition leader. > An example of where this can cause issues: > 1. We have a consumer group with consumer 1 and consumer 2. Partition P is > assigned to consumer 1 which has up-to-date metadata for P. Consumer 2 has > stale metadata for P. > 2. Consumer 1 fetches partition P with offset 50, epoch 8. commits the offset > 50 without an epoch. > 3. The consumer group rebalances and P is now assigned to consumer 2. > Consumer 2 has a stale leader epoch for P (let's say leader epoch 7). > Consumer 2 will now try to fetch with leader epoch 7, offset 50. If we have a > zombie leader due to a network partition, the zombie leader may accept > consumer 2's fetch leader epoch and return an OFFSET_OUT_OF_RANGE to consumer > 2. > If in step 1, consumer 1 committed the leader epoch for the message, then > when consumer 2 receives assignment P it would force a metadata refresh to > discover a sufficiently new leader epoch for the committed offset. > Kafka Streams cannot fully determine the leader epoch of the offsets it wants > to commit - in EOS mode, streams commits the offset after the last control > records (to avoid always having a lag of >0), but the leader epoch of the > control record is not known to streams (since only non-control records are > returned from Consumer.poll). > A fix discussed with [~hachikuji] is to have the consumer cache leader epoch > ranges, similar to how the broker maintains a leader epoch cache. > This ticket was split from the original ticket > https://issues.apache.org/jira/browse/KAFKA-15344 which was described as a > streams fix, but the problem cannot be fully fixed in streams. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16698) Fix flaky kafka.network.DynamicConnectionQuotaTest#testDynamicIpConnectionRateQuota
[ https://issues.apache.org/jira/browse/KAFKA-16698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851200#comment-17851200 ] Chia-Ping Tsai commented on KAFKA-16698: [~muralibasani] Could you file PR with your solution? I'd like to take a look and let see what happens in QA > Fix flaky > kafka.network.DynamicConnectionQuotaTest#testDynamicIpConnectionRateQuota > --- > > Key: KAFKA-16698 > URL: https://issues.apache.org/jira/browse/KAFKA-16698 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Priority: Major > > {code:java} > org.opentest4j.AssertionFailedError: Timed out waiting for connection rate > update to propagateat > app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at app//org.junit.jupiter.api.Assertions.fail(Assertions.java:138) at > app//kafka.network.DynamicConnectionQuotaTest.updateIpConnectionRate(DynamicConnectionQuotaTest.scala:279) >at > app//kafka.network.DynamicConnectionQuotaTest.testDynamicIpConnectionRateQuota(DynamicConnectionQuotaTest.scala:255) > at > java.base@21.0.2/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) > at java.base@21.0.2/java.lang.reflect.Method.invoke(Method.java:580)at > app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) > at > app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) >at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:94) >at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16639) AsyncKafkaConsumer#close does not send heartbeat to leave group
[ https://issues.apache.org/jira/browse/KAFKA-16639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851197#comment-17851197 ] Chia-Ping Tsai commented on KAFKA-16639: I have shipped the PR into trunk and 3.8 [~frankvicky] thanks for all your contribution. nice fix! > AsyncKafkaConsumer#close does not send heartbeat to leave group > --- > > Key: KAFKA-16639 > URL: https://issues.apache.org/jira/browse/KAFKA-16639 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Chia-Ping Tsai >Assignee: TengYao Chi >Priority: Critical > Labels: kip-848-client-support > Fix For: 3.8.0 > > > This bug can be reproduced by immediately closing a consumer which is just > created. > The root cause is that we skip the new heartbeat used to leave group when > there is a in-flight heartbeat request > ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212).] > It seems to me the simple solution is that we create a heartbeat request when > meeting above situation and then send it by pollOnClose > ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java#L62).] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16864) Copy on write in the Optimized Uniform Assignor
[ https://issues.apache.org/jira/browse/KAFKA-16864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-16864. - Fix Version/s: 3.8.0 Resolution: Fixed > Copy on write in the Optimized Uniform Assignor > --- > > Key: KAFKA-16864 > URL: https://issues.apache.org/jira/browse/KAFKA-16864 > Project: Kafka > Issue Type: Sub-task >Reporter: Ritika Reddy >Assignee: David Jacot >Priority: Major > Fix For: 3.8.0 > > > An optimization for the uniform (homogenous) assignor by avoiding creating a > copy of all the assignments. Instead, the assignor creates a copy only if the > assignment is updated. It is a sort of copy-on-write. This change reduces the > overhead of the TargetAssignmentBuilder when ran with the uniform > (homogenous) assignor. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16860) Introduce `group.version` feature flag
[ https://issues.apache.org/jira/browse/KAFKA-16860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-16860. - Resolution: Fixed > Introduce `group.version` feature flag > -- > > Key: KAFKA-16860 > URL: https://issues.apache.org/jira/browse/KAFKA-16860 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: David Jacot >Priority: Blocker > Fix For: 3.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]
dajac merged PR #16088: URL: https://github.com/apache/kafka/pull/16088 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16639) AsyncKafkaConsumer#close does not send heartbeat to leave group
[ https://issues.apache.org/jira/browse/KAFKA-16639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-16639. Resolution: Fixed > AsyncKafkaConsumer#close does not send heartbeat to leave group > --- > > Key: KAFKA-16639 > URL: https://issues.apache.org/jira/browse/KAFKA-16639 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Chia-Ping Tsai >Assignee: TengYao Chi >Priority: Critical > Labels: kip-848-client-support > Fix For: 3.8.0 > > > This bug can be reproduced by immediately closing a consumer which is just > created. > The root cause is that we skip the new heartbeat used to leave group when > there is a in-flight heartbeat request > ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212).] > It seems to me the simple solution is that we create a heartbeat request when > meeting above situation and then send it by pollOnClose > ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java#L62).] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16639: Ensure HeartbeatRequestManager generates leave request regardless of in-flight heartbeats. [kafka]
chia7712 merged PR #16017: URL: https://github.com/apache/kafka/pull/16017 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14509: [4/4] Handle includeAuthorizedOperations [kafka]
dajac commented on code in PR #16158: URL: https://github.com/apache/kafka/pull/16158#discussion_r1622909655 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -3852,6 +3853,17 @@ class KafkaApis(val requestChannel: RequestChannel, if (exception != null) { requestHelper.sendMaybeThrottle(request, consumerGroupDescribeRequest.getErrorResponse(exception)) } else { + if (request.header.apiVersion >= 3 && includeAuthorizedOperations) { Review Comment: I suppose that we don't need the version check because this API only has version zero. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16252: Dynamic KRaft network manager and channel [kafka]
ahuang98 opened a new pull request, #16160: URL: https://github.com/apache/kafka/pull/16160 See https://github.com/apache/kafka/pull/15986 for description and all comment history, this addresses some of the comments from that PR (anything with 👀) Specifically, https://github.com/apache/kafka/pull/15986#discussion_r1619520419, https://github.com/apache/kafka/pull/15986#discussion_r1619518236, and > I don't think we want to change all of the tests to use controller.quorum.bootstrap.servers. We still need to support the old configuration. Let's make it so that tests using IBP_3_8_IV0 or newer use the new thing, and tests using an older MV use the old configuration. That way we will have good coverage. are not addressed in this PR. I think it would be alright for them to be addressed in a followup. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Adjust validateOffsetCommit/Fetch in ConsumerGroup to ensure compatibility with classic protocol members [kafka]
dajac commented on code in PR #16145: URL: https://github.com/apache/kafka/pull/16145#discussion_r1622897443 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -832,7 +855,20 @@ public void validateOffsetFetch( throw new UnknownMemberIdException(String.format("Member %s is not a member of group %s.", memberId, groupId)); } -validateMemberEpoch(memberEpoch, member.memberEpoch()); + +if (member.useClassicProtocol()) { +try { +validateMemberEpoch(memberEpoch, member.memberEpoch()); +} catch (StaleMemberEpochException ex) { +// StaleMemberEpochException is not supported in the classic protocol. We throw +// IllegalGenerationException instead for compatibility. +throw new IllegalGenerationException(String.format("Invalid offset commit because the " Review Comment: nit: `offset fetch`. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -832,7 +855,20 @@ public void validateOffsetFetch( throw new UnknownMemberIdException(String.format("Member %s is not a member of group %s.", memberId, groupId)); } -validateMemberEpoch(memberEpoch, member.memberEpoch()); + +if (member.useClassicProtocol()) { +try { +validateMemberEpoch(memberEpoch, member.memberEpoch()); +} catch (StaleMemberEpochException ex) { +// StaleMemberEpochException is not supported in the classic protocol. We throw +// IllegalGenerationException instead for compatibility. +throw new IllegalGenerationException(String.format("Invalid offset commit because the " Review Comment: I actually wonder if we could reuse the code too. We could define an helper method which takes the member, the received epoch and the source (e.g. offset commit or offset fetch). What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16652: add unit test for ClusterTemplate offering zero ClusterConfig [kafka]
chia7712 commented on code in PR #15862: URL: https://github.com/apache/kafka/pull/15862#discussion_r1622894813 ## core/src/test/java/kafka/test/junit/ClusterTestExtensionsUnitTest.java: ## @@ -17,29 +17,64 @@ package kafka.test.junit; +import kafka.test.ClusterConfig; import kafka.test.annotation.ClusterTemplate; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.extension.ExtensionContext; + +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.List; + import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class ClusterTestExtensionsUnitTest { + +static List cfgEmpty() { +return Collections.emptyList(); +} + +@SuppressWarnings({"unchecked", "rawtypes"}) +private ExtensionContext buildExtensionContext(String methodName) throws Exception { +ExtensionContext extensionContext = mock(ExtensionContext.class); +Class clazz = ClusterTestExtensionsUnitTest.class; +Method method = clazz.getDeclaredMethod(methodName); +when(extensionContext.getRequiredTestClass()).thenReturn(clazz); +when(extensionContext.getRequiredTestMethod()).thenReturn(method); +return extensionContext; +} + @Test -void testProcessClusterTemplate() { +void testProcessClusterTemplate() throws Exception { ClusterTestExtensions ext = new ClusterTestExtensions(); -ExtensionContext context = mock(ExtensionContext.class); +ExtensionContext context = buildExtensionContext("cfgEmpty"); ClusterTemplate annot = mock(ClusterTemplate.class); -when(annot.value()).thenReturn("").thenReturn(" "); +when(annot.value()).thenReturn("").thenReturn(" ").thenReturn("cfgEmpty"); + +Assertions.assertEquals( Review Comment: This test is equal to next one, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16860; [2/2] Introduce group.version feature flag [kafka]
dajac merged PR #16149: URL: https://github.com/apache/kafka/pull/16149 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]
dajac merged PR #16120: URL: https://github.com/apache/kafka/pull/16120 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]
dajac commented on PR #16120: URL: https://github.com/apache/kafka/pull/16120#issuecomment-2142887987 I confirmed that the failed tests are not related. Merging to trunk and to 3.8. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]
dajac commented on PR #16120: URL: https://github.com/apache/kafka/pull/16120#issuecomment-2142885524 testTopicPartitionsArgWithInternalIncluded does not fail locally. It is also a flaky test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] SMT InsertField current timestamp [kafka]
andreit-geomotiv opened a new pull request, #16159: URL: https://github.com/apache/kafka/pull/16159 Kafka Connect single message transform for inserting field with current system timestamp value. ``` "transforms": "insertCurrentTimestamp", "transforms.insertCurrentTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.insertCurrentTimestamp.current.timestamp.field": "LOAD_TIMESTAMP_UTC" ``` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: update all-latency-avg documentation [kafka]
sebastienviale commented on PR #16148: URL: https://github.com/apache/kafka/pull/16148#issuecomment-2142880744 hum, may be a check is needed. If yes, I will create another PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14509: [4/4] Handle includeAuthorizedOperations [kafka]
dajac commented on PR #16158: URL: https://github.com/apache/kafka/pull/16158#issuecomment-2142879003 @riedelmax Thanks for the patch. Could you please extend unit and integration tests to cover this change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Bump trunk to 3.9.0-SNAPSHOT [kafka]
jlprat commented on PR #16150: URL: https://github.com/apache/kafka/pull/16150#issuecomment-2142879659 Hi @jolshan According to KIP-1012 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-1012%3A+The+need+for+a+Kafka+3.8.x+release) we agreed to have a version in the 3.x series with feature parity on KRaft-Zookeeper. KIP-853 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Controller+Membership+Changes) and KIP-966 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-966%3A+Eligible+Leader+Replicas) were identified as must haves. These two, are right now not done and it's not clear we could get them in before code freeze. For this reason I decided to err on the side precaution and consider there will be a 3.9.0. If we manage to get the desired parity we can always change it to 4.0.0. Let me know if you have further questions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15630 Improve documentation of offset.lag.max [kafka]
ganesh-sadanala commented on code in PR #16080: URL: https://github.com/apache/kafka/pull/16080#discussion_r1622879301 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java: ## @@ -101,7 +101,13 @@ public class MirrorSourceConfig extends MirrorConnectorConfig { public static final Class CONFIG_PROPERTY_FILTER_CLASS_DEFAULT = DefaultConfigPropertyFilter.class; public static final String OFFSET_LAG_MAX = "offset.lag.max"; -private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote partition can be before it is resynced."; +private static final String OFFSET_LAG_MAX_DOC = "Determines the maximum allowed lag between the source and remote partitions before MirrorMaker initiates a resync operation to catch up the remote partition. The lag is calculated as the difference between the latest offset in the source partition and the last committed offset in the remote partition.\n" + +"\n" + +"When the lag for a remote partition exceeds the offset.lag.max value, MirrorMaker will initiate a resync operation to catch up the remote partition with the source partition. This involves reading records from the source partition starting from the last committed offset in the remote partition and writing them to the remote partition.\n" + Review Comment: Thank you for pointing this out and providing the feedback. You're correct! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15630 Improve documentation of offset.lag.max [kafka]
ganesh-sadanala commented on code in PR #16080: URL: https://github.com/apache/kafka/pull/16080#discussion_r1622872574 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java: ## @@ -101,7 +101,13 @@ public class MirrorSourceConfig extends MirrorConnectorConfig { public static final Class CONFIG_PROPERTY_FILTER_CLASS_DEFAULT = DefaultConfigPropertyFilter.class; public static final String OFFSET_LAG_MAX = "offset.lag.max"; -private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote partition can be before it is resynced."; +private static final String OFFSET_LAG_MAX_DOC = "Determines the maximum allowed lag between the source and remote partitions before MirrorMaker initiates a resync operation to catch up the remote partition. The lag is calculated as the difference between the latest offset in the source partition and the last committed offset in the remote partition.\n" + Review Comment: This clearly explains the latest mapping record in the offset sync topic which clearly states the latest sync in the target/remote partition. Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: update all-latency-avg documentation [kafka]
mjsax commented on PR #16148: URL: https://github.com/apache/kafka/pull/16148#issuecomment-2142860090 Just wondering. This seems to also apply to `all-latency-max` and the other metrics over iterators, ie, range? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: update all-latency-avg documentation [kafka]
mjsax commented on PR #16148: URL: https://github.com/apache/kafka/pull/16148#issuecomment-2142858494 Thanks for the PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: update all-latency-avg documentation [kafka]
mjsax merged PR #16148: URL: https://github.com/apache/kafka/pull/16148 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]
AyoubOm commented on code in PR #15790: URL: https://github.com/apache/kafka/pull/15790#discussion_r1622840595 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java: ## @@ -58,8 +60,21 @@ public void addChild(final ProcessorNode child) { public void init(final InternalProcessorContext context) { super.init(context); this.context = context; -keySerializer = prepareKeySerializer(keySerializer, context, this.name()); -valSerializer = prepareValueSerializer(valSerializer, context, this.name()); +try { +keySerializer = prepareKeySerializer(keySerializer, context, this.name()); +} catch (final ConfigException e) { +throw new ConfigException(String.format("Failed to initialize key serdes for sink node %s", name())); +} catch (final StreamsException e) { Review Comment: Thanks everyone for your input ! I will make the change accordingly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-14509: [4/4] Handle includeAuthorizedOperations [kafka]
riedelmax opened a new pull request, #16158: URL: https://github.com/apache/kafka/pull/16158 Last PR for KAFKA-14509 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: update all-latency-avg documentation [kafka]
sebastienviale commented on code in PR #16148: URL: https://github.com/apache/kafka/pull/16148#discussion_r1622820325 ## docs/ops.html: ## @@ -3143,7 +3143,7 @@
Re: [PR] MINOR: Adjust validateOffsetCommit in ConsumerGroup to ensure compatibility with classic protocol members [kafka]
dongnuo123 commented on code in PR #16145: URL: https://github.com/apache/kafka/pull/16145#discussion_r1622806164 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -806,7 +809,29 @@ public void validateOffsetCommit( if (memberEpoch < 0 && members().isEmpty()) return; final ConsumerGroupMember member = getOrMaybeCreateMember(memberId, false); -validateMemberEpoch(memberEpoch, member.memberEpoch()); +if (member.useClassicProtocol()) { +validateMemberInstanceId(member, groupInstanceId); Review Comment: Let's remove this validation because we have to do `getOrMaybeCreateMember(memberId, false);` and know whether the member uses the classic protocol before deciding whether instance id should be validated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Adjust validateOffsetCommit in ConsumerGroup to ensure compatibility with classic protocol members [kafka]
dongnuo123 commented on code in PR #16145: URL: https://github.com/apache/kafka/pull/16145#discussion_r1622804714 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -806,7 +809,29 @@ public void validateOffsetCommit( if (memberEpoch < 0 && members().isEmpty()) return; final ConsumerGroupMember member = getOrMaybeCreateMember(memberId, false); -validateMemberEpoch(memberEpoch, member.memberEpoch()); +if (member.useClassicProtocol()) { +validateMemberInstanceId(member, groupInstanceId); + +try { +validateMemberEpoch(memberEpoch, member.memberEpoch()); +} catch (StaleMemberEpochException ex) { +// StaleMemberEpochException is not supported in the classic protocol. We throw +// IllegalGenerationException instead for compatibility. +throw new IllegalGenerationException(String.format("Invalid offset commit because the " ++ "received generation id %d does not match the expected member epoch %d.", +memberEpoch, member.memberEpoch())); +} + +if (member.memberEpoch() < groupEpoch() || +member.state() == MemberState.UNREVOKED_PARTITIONS || +(member.state() == MemberState.UNRELEASED_PARTITIONS && !waitingOnUnreleasedPartition(member))) { +throw new RebalanceInProgressException(String.format("Invalid offset commit because" + +" a new rebalance has been triggered in group %s and member %s should rejoin to catch up.", +groupId(), member.memberId())); +} Review Comment: Got it, now I know why the classic consumer offset commit validation doesn't check PREPARING_REBALANCE. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16570) FenceProducers API returns "unexpected error" when successful
[ https://issues.apache.org/jira/browse/KAFKA-16570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851171#comment-17851171 ] Justine Olshan commented on KAFKA-16570: Hey [~ecomar], thanks. If you don't mind assigning yourself to the JIRA, I can review it (y) I've been busy with other 3.8 stuff. > FenceProducers API returns "unexpected error" when successful > - > > Key: KAFKA-16570 > URL: https://issues.apache.org/jira/browse/KAFKA-16570 > Project: Kafka > Issue Type: Bug >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > > When we want to fence a producer using the admin client, we send an > InitProducerId request. > There is logic in that API to fence (and abort) any ongoing transactions and > that is what the API relies on to fence the producer. However, this handling > also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because > we want to actually get a new producer ID and want to retry until the the ID > is supplied or we time out. > [https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170] > > [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1322] > > In the case of fence producer, we don't retry and instead we have no handling > for concurrent transactions and log a message about an unexpected error. > [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112] > > This is not unexpected though and the operation was successful. We should > just swallow this error and treat this as a successful run of the command. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]
junrao commented on code in PR #15673: URL: https://github.com/apache/kafka/pull/15673#discussion_r1622789214 ## core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala: ## @@ -55,7 +55,7 @@ class ListOffsetsRequestTest extends BaseRequestTest { .build() val debugReplicaRequest = ListOffsetsRequest.Builder - .forReplica(ApiKeys.LIST_OFFSETS.latestVersion, ListOffsetsRequest.DEBUGGING_REPLICA_ID) + .forReplica(ApiKeys.LIST_OFFSETS.latestVersion(), ListOffsetsRequest.DEBUGGING_REPLICA_ID) Review Comment: Why do we need to add the brackets? ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -228,7 +231,7 @@ public enum MetadataVersion { * Think carefully before you update this value. ONCE A METADATA VERSION IS PRODUCTION, * IT CANNOT BE CHANGED. */ -public static final MetadataVersion LATEST_PRODUCTION = IBP_3_7_IV4; +public static final MetadataVersion LATEST_PRODUCTION = IBP_3_8_IV1; Review Comment: @CalvinConfluent and @cmccabe : Is IBP_3_8_IV0 for KIP-966 production ready? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Bump trunk to 3.9.0-SNAPSHOT [kafka]
jolshan commented on PR #16150: URL: https://github.com/apache/kafka/pull/16150#issuecomment-2142758803 Hmmm. I'm not sure I follow the reasoning for bumping to 3.9 rather than 4.0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15630 Improve documentation of offset.lag.max [kafka]
gharris1727 commented on code in PR #16080: URL: https://github.com/apache/kafka/pull/16080#discussion_r1622763556 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java: ## @@ -101,7 +101,13 @@ public class MirrorSourceConfig extends MirrorConnectorConfig { public static final Class CONFIG_PROPERTY_FILTER_CLASS_DEFAULT = DefaultConfigPropertyFilter.class; public static final String OFFSET_LAG_MAX = "offset.lag.max"; -private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote partition can be before it is resynced."; +private static final String OFFSET_LAG_MAX_DOC = "Determines the maximum allowed lag between the source and remote partitions before MirrorMaker initiates a resync operation to catch up the remote partition. The lag is calculated as the difference between the latest offset in the source partition and the last committed offset in the remote partition.\n" + +"\n" + +"When the lag for a remote partition exceeds the offset.lag.max value, MirrorMaker will initiate a resync operation to catch up the remote partition with the source partition. This involves reading records from the source partition starting from the last committed offset in the remote partition and writing them to the remote partition.\n" + Review Comment: > This involves reading records from the source partition starting from the last committed offset in the remote partition and writing them to the remote partition. This is incorrect, and I don't know where this information came from. When the lag for a remote partition exceeds offset.lag.max, it will emit an offset sync to the offset sync topic, which can then be used by the MirrorCheckpointTask to translate upstream and downstream offsets. ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java: ## @@ -101,7 +101,13 @@ public class MirrorSourceConfig extends MirrorConnectorConfig { public static final Class CONFIG_PROPERTY_FILTER_CLASS_DEFAULT = DefaultConfigPropertyFilter.class; public static final String OFFSET_LAG_MAX = "offset.lag.max"; -private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote partition can be before it is resynced."; +private static final String OFFSET_LAG_MAX_DOC = "Determines the maximum allowed lag between the source and remote partitions before MirrorMaker initiates a resync operation to catch up the remote partition. The lag is calculated as the difference between the latest offset in the source partition and the last committed offset in the remote partition.\n" + +"\n" + +"When the lag for a remote partition exceeds the offset.lag.max value, MirrorMaker will initiate a resync operation to catch up the remote partition with the source partition. This involves reading records from the source partition starting from the last committed offset in the remote partition and writing them to the remote partition.\n" + +"\n" + +"Setting offset.lag.max to a lower value can be beneficial in scenarios where records may not flow constantly or at a consistent rate, as it ensures the remote partitions stay more closely in sync with the source partitions during periods of low throughput or inactivity. On the other hand, setting it to a higher value can be useful when the source topic has high throughput and the remote partitions can tolerate a larger lag.\n" + Review Comment: > Setting offset.lag.max to a lower value can be beneficial in scenarios where records may not flow constantly or at a consistent rate Lowering the offset.lag.max to a non-zero value doesn't help when the flow is inconsistent. If a pause in the flow happens in-between syncs, the records after the last sync aren't translated. That was addressed in a separate improvement: https://issues.apache.org/jira/browse/KAFKA-15906 Lowering the offset.lag.max to a nonzero value is only necessary when the partition has a consistent low throughput, and a fixed sync every offset.flush.interval.ms isn't acceptable, which is a very contrived scenario. ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java: ## @@ -101,7 +101,13 @@ public class MirrorSourceConfig extends MirrorConnectorConfig { public static final Class CONFIG_PROPERTY_FILTER_CLASS_DEFAULT = DefaultConfigPropertyFilter.class; public static final String OFFSET_LAG_MAX = "offset.lag.max"; -private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote partition can be before it is resynced."; +private static final String OFFSET_LAG_MAX_DOC = "Determines the maximum allowed lag between the source and remote partitions before MirrorMaker initiates a resync operation to catch up the remote partition. The lag is calculated as the difference between th
Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]
dajac commented on PR #16120: URL: https://github.com/apache/kafka/pull/16120#issuecomment-2142740321 It looks like testDescribeQuorumReplicationSuccessfu also fails for other PRs: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-16154/1/tests. testTopicPartitionsArgWithInternalIncluded seems to be new. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]
dajac commented on PR #16120: URL: https://github.com/apache/kafka/pull/16120#issuecomment-2142730589 Will do when I get back to my computer. They indeed look suspicious. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]
jolshan commented on PR #16120: URL: https://github.com/apache/kafka/pull/16120#issuecomment-2142726604 Failures look like usual suspects, but can we confirm the 4.0 ones? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16047: Use REQUEST_TIMEOUT_MS_CONFIG in AdminClient.fenceProducers [kafka]
edoardocomar commented on code in PR #16151: URL: https://github.com/apache/kafka/pull/16151#discussion_r1622769486 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -4569,7 +4569,7 @@ public ListTransactionsResult listTransactions(ListTransactionsOptions options) public FenceProducersResult fenceProducers(Collection transactionalIds, FenceProducersOptions options) { AdminApiFuture.SimpleAdminApiFuture future = FenceProducersHandler.newFuture(transactionalIds); -FenceProducersHandler handler = new FenceProducersHandler(logContext); +FenceProducersHandler handler = new FenceProducersHandler(logContext, requestTimeoutMs); Review Comment: as an override to the AdminClient REQUEST_TIMEOUT_MS_CONFIG ? The options doc says it's on override on API timeout but yes we could use that too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16809: Run Javadoc in CI [kafka]
gharris1727 merged PR #16025: URL: https://github.com/apache/kafka/pull/16025 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16047: Use REQUEST_TIMEOUT_MS_CONFIG in AdminClient.fenceProducers [kafka]
gharris1727 commented on code in PR #16151: URL: https://github.com/apache/kafka/pull/16151#discussion_r1622743028 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -4569,7 +4569,7 @@ public ListTransactionsResult listTransactions(ListTransactionsOptions options) public FenceProducersResult fenceProducers(Collection transactionalIds, FenceProducersOptions options) { AdminApiFuture.SimpleAdminApiFuture future = FenceProducersHandler.newFuture(transactionalIds); -FenceProducersHandler handler = new FenceProducersHandler(logContext); +FenceProducersHandler handler = new FenceProducersHandler(logContext, requestTimeoutMs); Review Comment: There's a requestTimeoutMs in options, can you use that if specified? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: update all-latency-avg documentation [kafka]
mjsax commented on code in PR #16148: URL: https://github.com/apache/kafka/pull/16148#discussion_r1622742983 ## docs/ops.html: ## @@ -3143,7 +3143,7 @@
Re: [PR] MINOR: update all-latency-avg documentation [kafka]
mjsax commented on code in PR #16148: URL: https://github.com/apache/kafka/pull/16148#discussion_r1622740601 ## docs/ops.html: ## @@ -3143,7 +3143,7 @@
Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]
jolshan commented on code in PR #16120: URL: https://github.com/apache/kafka/pull/16120#discussion_r1622739622 ## core/src/test/java/kafka/test/ClusterInstance.java: ## @@ -159,9 +158,7 @@ default Set supportedGroupProtocols() { Set supportedGroupProtocols = new HashSet<>(); supportedGroupProtocols.add(CLASSIC); -// KafkaConfig#isNewGroupCoordinatorEnabled check both NEW_GROUP_COORDINATOR_ENABLE_CONFIG and GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -if (serverProperties.getOrDefault(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "").equals("true") || Review Comment: Got it. Let me run through the tests one more time with this understanding. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15713: KRaft support in AclCommandTest [kafka]
chia7712 commented on code in PR #15830: URL: https://github.com/apache/kafka/pull/15830#discussion_r1622737514 ## core/src/test/java/kafka/admin/AclCommandIntegrationTest.java: ## @@ -0,0 +1,453 @@ +package kafka.admin; + +import kafka.test.ClusterInstance; +import kafka.test.annotation.*; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.common.acl.*; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.Resource; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.LogCaptureAppender; +import org.apache.kafka.common.utils.SecurityUtils; +import org.apache.kafka.security.authorizer.AclEntry; +import org.apache.kafka.server.config.ServerConfigs; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.extension.ExtendWith; +import scala.Console$; +import scala.collection.JavaConverters; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.util.AbstractMap.SimpleEntry; +import java.util.*; +import java.util.Map.Entry; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.util.Arrays.asList; +import static java.util.Collections.*; +import static java.util.stream.Collectors.*; +import static java.util.stream.Stream.concat; +import static org.apache.kafka.common.acl.AclOperation.*; +import static org.apache.kafka.common.acl.AclPermissionType.ALLOW; +import static org.apache.kafka.common.acl.AclPermissionType.DENY; +import static org.apache.kafka.common.resource.PatternType.LITERAL; +import static org.apache.kafka.common.resource.PatternType.PREFIXED; +import static org.apache.kafka.common.resource.ResourceType.*; +import static org.apache.kafka.metadata.authorizer.StandardAuthorizer.SUPER_USERS_CONFIG; +import static org.apache.kafka.test.TestUtils.tempFile; +import static org.junit.jupiter.api.Assertions.*; +import static scala.collection.JavaConverters.setAsJavaSet; + +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults +@Tag("integration") +public class AclCommandIntegrationTest { +private final ClusterInstance cluster; + +private static final String AUTHORIZER = ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG; +private static final String ACL_AUTHORIZER = "kafka.security.authorizer.AclAuthorizer"; +private static final String STANDARD_AUTHORIZER = "org.apache.kafka.metadata.authorizer.StandardAuthorizer"; +private static final String SUPER_USERS = "super.users"; +private static final String USER_ANONYMOUS = "User:ANONYMOUS"; + +private static final KafkaPrincipal PRINCIPAL = SecurityUtils.parseKafkaPrincipal("User:test2"); Review Comment: ok, that will have many duplicate code ... Sorry that could we merge this test into `AclCommandTest`? We don't need to use new test infra in this PR, and we can do a follow-up for that. Also, using scala is ok to me for now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15713: KRaft support in AclCommandTest [kafka]
chia7712 commented on code in PR #15830: URL: https://github.com/apache/kafka/pull/15830#discussion_r1622734164 ## core/src/main/scala/kafka/admin/AclCommand.scala: ## @@ -115,8 +115,6 @@ object AclCommand extends Logging { val aclBindings = acls.map(acl => new AclBinding(resource, acl)).asJavaCollection adminClient.createAcls(aclBindings).all().get() } - -listAcls(adminClient) Review Comment: > Do you think it's worth migrating those tests to java at this stage, if they are going to be deleted anyway? we can keep origin test in this PR. the new test file you added is good to me > we can probably remove console output Current ACLs agree -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15713: KRaft support in AclCommandTest [kafka]
chia7712 commented on code in PR #15830: URL: https://github.com/apache/kafka/pull/15830#discussion_r1622734164 ## core/src/main/scala/kafka/admin/AclCommand.scala: ## @@ -115,8 +115,6 @@ object AclCommand extends Logging { val aclBindings = acls.map(acl => new AclBinding(resource, acl)).asJavaCollection adminClient.createAcls(aclBindings).all().get() } - -listAcls(adminClient) Review Comment: > Do you think it's worth migrating those tests to java at this stage, if they are going to be deleted anyway? we can keep origin test :) > we can probably remove console output Current ACLs ## core/src/main/scala/kafka/admin/AclCommand.scala: ## @@ -115,8 +115,6 @@ object AclCommand extends Logging { val aclBindings = acls.map(acl => new AclBinding(resource, acl)).asJavaCollection adminClient.createAcls(aclBindings).all().get() } - -listAcls(adminClient) Review Comment: > Do you think it's worth migrating those tests to java at this stage, if they are going to be deleted anyway? we can keep origin test :) > we can probably remove console output Current ACLs agree -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]
dajac commented on code in PR #16120: URL: https://github.com/apache/kafka/pull/16120#discussion_r1622733949 ## core/src/test/java/kafka/test/ClusterInstance.java: ## @@ -159,9 +158,7 @@ default Set supportedGroupProtocols() { Set supportedGroupProtocols = new HashSet<>(); supportedGroupProtocols.add(CLASSIC); -// KafkaConfig#isNewGroupCoordinatorEnabled check both NEW_GROUP_COORDINATOR_ENABLE_CONFIG and GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -if (serverProperties.getOrDefault(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "").equals("true") || Review Comment: Correct! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16866: Used the right constant in RLMT#testFetchQuotaManagerConfig [kafka]
chia7712 merged PR #16152: URL: https://github.com/apache/kafka/pull/16152 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Refactor DynamicConfig [kafka]
chia7712 merged PR #16133: URL: https://github.com/apache/kafka/pull/16133 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: speed up reset consumer group offset test [kafka]
chia7712 commented on code in PR #16155: URL: https://github.com/apache/kafka/pull/16155#discussion_r1622712633 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java: ## @@ -62,7 +63,7 @@ static List forConsumerGroupCoordinator() { serverProperties.put(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true"); return Collections.singletonList(ClusterConfig.defaultBuilder() -.setTypes(Stream.of(KRAFT, CO_KRAFT).collect(Collectors.toSet())) Review Comment: Could you please add comments for this change? ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java: ## @@ -75,6 +76,7 @@ static List forClassicGroupCoordinator() { serverProperties.put(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "false"); return Collections.singletonList(ClusterConfig.defaultBuilder() +.setTypes(Stream.of(ZK, KRAFT).collect(Collectors.toSet())) Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]
apourchet commented on code in PR #16147: URL: https://github.com/apache/kafka/pull/16147#discussion_r1622591542 ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java: ## @@ -481,10 +479,24 @@ private static int getCrossRackTrafficCost(final Set topicPa */ private static boolean canPerformRackAwareOptimization(final ApplicationState applicationState, final AssignedTask.Type taskType) { -final String rackAwareAssignmentStrategy = applicationState.assignmentConfigs().rackAwareAssignmentStrategy(); +final AssignmentConfigs assignmentConfigs = applicationState.assignmentConfigs(); +final String rackAwareAssignmentStrategy = assignmentConfigs.rackAwareAssignmentStrategy(); if (StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(rackAwareAssignmentStrategy)) { +LOG.warn("Rack aware task assignment optimization disabled: rack aware strategy was set to {}", +rackAwareAssignmentStrategy); +return false; +} + +if (!assignmentConfigs.rackAwareTrafficCost().isPresent()) { Review Comment: `isEmpty` is JDK 11 unfortunately: https://docs.oracle.com/en%2Fjava%2Fjavase%2F11%2Fdocs%2Fapi%2F%2F/java.base/java/util/Optional.html#isEmpty() -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16811) Punctuate Ratio metric almost impossible to track
[ https://issues.apache.org/jira/browse/KAFKA-16811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851160#comment-17851160 ] Ganesh Sadanala edited comment on KAFKA-16811 at 5/31/24 4:53 PM: -- [~sebviale] [~rohanpd] I have completed the implementation using the SlidingWindow approach with x=30 seconds for testing. Here are the changes: [https://github.com/ganesh-sadanala/kafka/commit/20873613b4f94cea5de32382e7e30cde25ef4c99] I have followed these steps to test the changes, but I still see the puncutate-ratio as zero for all the instances of example Demo class. # Start ZooKeeper, Kafka Broker. # Created input and output topics with 3 partitions (for the sake of having active tasks distributed to multiple instances of WordCountProcessorDemo stream class) # {code:java} bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic streams-plaintext-input --partitions 3 --replication-factor 1 bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic streams-wordcount-output --partitions 3 --replication-factor 1 {code} 4. Run the 3 instances of Kafka Streams Demo Application in different terminals/processors: # {code:java} bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountProcessorDemo {code} 5. Produce and consume data # {code:java} bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning{code} 6. Open the jconsole and watch the metrics I see that all the metrics are getting calculated. When I run the debugger, I see that in this code tasks.activeTasks() is an empty list. Because of that punctuated values is becoming zero, hence the punctuate ratio. TaskExecutor.java {code:java} int punctuate() { int punctuated = 0; for (final Task task : tasks.activeTasks()) { try { if (executionMetadata.canPunctuateTask(task)) { if (task.maybePunctuateStreamTime()) { punctuated++; } if (task.maybePunctuateSystemTime()) { punctuated++; } } } catch (final TaskMigratedException e) { log.info("Failed to punctuate stream task {} since it got migrated to another thread already. " + "Will trigger a new rebalance and close all tasks as zombies together.", task.id()); throw e; } catch (final StreamsException e) { log.error("Failed to punctuate stream task {} due to the following error:", task.id(), e); e.setTaskId(task.id()); throw e; } catch (final KafkaException e) { log.error("Failed to punctuate stream task {} due to the following error:", task.id(), e); throw new StreamsException(e, task.id()); } } return punctuated; } } {code} Is there a way to make active tasks list non-empty, thus I can test the changes and write some unit tests? Is this behaviour normal in the local environment? was (Author: JIRAUSER305566): [~sebviale] [~rohanpd] I have completed the implementation using the SlidingWindow approach with x=30 seconds for testing. Here are the changes: [https://github.com/ganesh-sadanala/kafka/commit/20873613b4f94cea5de32382e7e30cde25ef4c99] I have followed these steps to test the changes, but I still see the puncutate-ratio as zero for all the instances of example Demo class. # Start ZooKeeper, Kafka Broker. # Created input and output topics with 3 partitions (for the sake of having active tasks distributed to multiple instances of WordCountProcessorDemo stream class) # {code:java} bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic streams-plaintext-input --partitions 3 --replication-factor 1 bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic streams-wordcount-output --partitions 3 --replication-factor 1 {code} 4. Run the 3 instances of Kafka Streams Demo Application in different terminals/processors: # {code:java} bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountProcessorDemo {code} 5. Produce and consume data # {code:java} bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning{code} 6. Open the jconsole and watch the metrics I see that all the metrics are getting calculated, but when in run the debugger, I see that in this code tasks.activeTasks() is empty list. TaskExecutor.java {code:java} int punctuate() {
Re: [PR] KAFKA-16757: Fix broker re-registration issues around MV 3.7-IV2 [kafka]
cmccabe commented on PR #15945: URL: https://github.com/apache/kafka/pull/15945#issuecomment-2142641154 rebased -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]
jolshan commented on code in PR #16120: URL: https://github.com/apache/kafka/pull/16120#discussion_r1622699466 ## core/src/test/java/kafka/test/ClusterInstance.java: ## @@ -159,9 +158,7 @@ default Set supportedGroupProtocols() { Set supportedGroupProtocols = new HashSet<>(); supportedGroupProtocols.add(CLASSIC); -// KafkaConfig#isNewGroupCoordinatorEnabled check both NEW_GROUP_COORDINATOR_ENABLE_CONFIG and GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -if (serverProperties.getOrDefault(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "").equals("true") || Review Comment: Hmmm -- so when you say the new group coordinator -- you mean the code (which handles both protocols) and not the protocol itself. Do I have this right? 1. If the protocol is consumer -> new coordinator 2. If the config is enabled -> new coordinator 3. If protocol is not consumer && config is not enabled -> old coordinator -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16811) Punctuate Ratio metric almost impossible to track
[ https://issues.apache.org/jira/browse/KAFKA-16811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851160#comment-17851160 ] Ganesh Sadanala edited comment on KAFKA-16811 at 5/31/24 4:47 PM: -- [~sebviale] [~rohanpd] I have completed the implementation using the SlidingWindow approach with x=30 seconds for testing. Here are the changes: [https://github.com/ganesh-sadanala/kafka/commit/20873613b4f94cea5de32382e7e30cde25ef4c99] I have followed these steps to test the changes, but I still see the puncutate-ratio as zero for all the instances of example Demo class. # Start ZooKeeper, Kafka Broker. # Created input and output topics with 3 partitions (for the sake of having active tasks distributed to multiple instances of WordCountProcessorDemo stream class) # {code:java} bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic streams-plaintext-input --partitions 3 --replication-factor 1 bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic streams-wordcount-output --partitions 3 --replication-factor 1 {code} 4. Run the 3 instances of Kafka Streams Demo Application in different terminals/processors: # {code:java} bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountProcessorDemo {code} 5. Produce and consume data # {code:java} bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning{code} 6. Open the jconsole and watch the metrics I see that all the metrics are getting calculated, but when in run the debugger, I see that in this code tasks.activeTasks() is empty list. TaskExecutor.java {code:java} int punctuate() { int punctuated = 0; for (final Task task : tasks.activeTasks()) { try { if (executionMetadata.canPunctuateTask(task)) { if (task.maybePunctuateStreamTime()) { punctuated++; } if (task.maybePunctuateSystemTime()) { punctuated++; } } } catch (final TaskMigratedException e) { log.info("Failed to punctuate stream task {} since it got migrated to another thread already. " + "Will trigger a new rebalance and close all tasks as zombies together.", task.id()); throw e; } catch (final StreamsException e) { log.error("Failed to punctuate stream task {} due to the following error:", task.id(), e); e.setTaskId(task.id()); throw e; } catch (final KafkaException e) { log.error("Failed to punctuate stream task {} due to the following error:", task.id(), e); throw new StreamsException(e, task.id()); } } return punctuated; } } {code} Is there a way to make active tasks list non-empty, thus I can test the changes and write some unit tests? Is this behaviour normal in the local environment? was (Author: JIRAUSER305566): [~sebviale] I have completed the implementation using the SlidingWindow approach with x=30 seconds for testing. Here are the changes: [https://github.com/ganesh-sadanala/kafka/commit/20873613b4f94cea5de32382e7e30cde25ef4c99] I have followed these steps to test the changes, but I still see the puncutate-ratio as zero for all the instances of example Demo class. # Start ZooKeeper, Kafka Broker. # Created input and output topics with 3 partitions (for the sake of having active tasks distributed to multiple instances of WordCountProcessorDemo stream class) # {code:java} bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic streams-plaintext-input --partitions 3 --replication-factor 1 bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic streams-wordcount-output --partitions 3 --replication-factor 1 {code} 4. Run the 3 instances of Kafka Streams Demo Application in different terminals/processors: # {code:java} bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountProcessorDemo {code} 5. Produce and consume data # {code:java} bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning{code} 6. Open the jconsole and watch the metrics I see that all the metrics are getting calculated, but when in run the debugger, I see that in this code tasks.activeTasks() is empty list. TaskExecutor.java {code:java} int punctuate() { int punctuated = 0; for (final Task task : tasks.activeTasks()) { tr
Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]
jolshan commented on code in PR #16120: URL: https://github.com/apache/kafka/pull/16120#discussion_r1622699466 ## core/src/test/java/kafka/test/ClusterInstance.java: ## @@ -159,9 +158,7 @@ default Set supportedGroupProtocols() { Set supportedGroupProtocols = new HashSet<>(); supportedGroupProtocols.add(CLASSIC); -// KafkaConfig#isNewGroupCoordinatorEnabled check both NEW_GROUP_COORDINATOR_ENABLE_CONFIG and GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -if (serverProperties.getOrDefault(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "").equals("true") || Review Comment: Hmmm -- do when you say the new group coordinator -- you mean the code and not the protocol. Do I have this right? 1. If the protocol is consumer -> new coordinator 2. If the config is enabled -> new coordinator 3. If protocol is not consumer && config is not enabled -> old coordinator ## core/src/test/java/kafka/test/ClusterInstance.java: ## @@ -159,9 +158,7 @@ default Set supportedGroupProtocols() { Set supportedGroupProtocols = new HashSet<>(); supportedGroupProtocols.add(CLASSIC); -// KafkaConfig#isNewGroupCoordinatorEnabled check both NEW_GROUP_COORDINATOR_ENABLE_CONFIG and GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG -if (serverProperties.getOrDefault(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "").equals("true") || Review Comment: Hmmm -- so when you say the new group coordinator -- you mean the code and not the protocol. Do I have this right? 1. If the protocol is consumer -> new coordinator 2. If the config is enabled -> new coordinator 3. If protocol is not consumer && config is not enabled -> old coordinator -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16811) Punctuate Ratio metric almost impossible to track
[ https://issues.apache.org/jira/browse/KAFKA-16811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851160#comment-17851160 ] Ganesh Sadanala commented on KAFKA-16811: - [~sebviale] I have completed the implementation using the SlidingWindow approach with x=30 seconds for testing. Here are the changes: [https://github.com/ganesh-sadanala/kafka/commit/20873613b4f94cea5de32382e7e30cde25ef4c99] I have followed these steps to test the changes, but I still see the puncutate-ratio as zero for all the instances of example Demo class. # Start ZooKeeper, Kafka Broker. # Created input and output topics with 3 partitions (for the sake of having active tasks distributed to multiple instances of WordCountProcessorDemo stream class) # {code:java} bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic streams-plaintext-input --partitions 3 --replication-factor 1 bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic streams-wordcount-output --partitions 3 --replication-factor 1 {code} 4. Run the 3 instances of Kafka Streams Demo Application in different terminals/processors: # {code:java} bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountProcessorDemo {code} 5. Produce and consume data # {code:java} bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning{code} 6. Open the jconsole and watch the metrics I see that all the metrics are getting calculated, but when in run the debugger, I see that in this code tasks.activeTasks() is empty list. TaskExecutor.java {code:java} int punctuate() { int punctuated = 0; for (final Task task : tasks.activeTasks()) { try { if (executionMetadata.canPunctuateTask(task)) { if (task.maybePunctuateStreamTime()) { punctuated++; } if (task.maybePunctuateSystemTime()) { punctuated++; } } } catch (final TaskMigratedException e) { log.info("Failed to punctuate stream task {} since it got migrated to another thread already. " + "Will trigger a new rebalance and close all tasks as zombies together.", task.id()); throw e; } catch (final StreamsException e) { log.error("Failed to punctuate stream task {} due to the following error:", task.id(), e); e.setTaskId(task.id()); throw e; } catch (final KafkaException e) { log.error("Failed to punctuate stream task {} due to the following error:", task.id(), e); throw new StreamsException(e, task.id()); } } return punctuated; } } {code} Is there a way to make active tasks list non-empty, thus I can test the changes and write some unit tests? Is this behaviour normal in the local environment? > Punctuate Ratio metric almost impossible to track > - > > Key: KAFKA-16811 > URL: https://issues.apache.org/jira/browse/KAFKA-16811 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Sebastien Viale >Assignee: Ganesh Sadanala >Priority: Minor > > The Punctuate ratio metric is returned after the last record of the poll > loop. It is recomputed in every poll loop. > After a puntuate, the value is close to 1, but there is little chance that > metric is sampled at this time. > So its value is almost always 0. > A solution could be to apply a kind of "sliding window" to it and report the > value for the last x seconds. -- This message was sent by Atlassian Jira (v8.20.10#820010)