[PR] KAFKA-16861: Don't convert to group to classic if the size is larger than group max size. [kafka]

2024-05-31 Thread via GitHub


frankvicky opened a new pull request, #16163:
URL: https://github.com/apache/kafka/pull/16163

   Fix the bug where the group downgrade to a classic one when a member leaves, 
even though the consumer group size is still larger than `classicGroupMaxSize`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16223: Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest [kafka]

2024-05-31 Thread via GitHub


chiacyu commented on code in PR #15989:
URL: https://github.com/apache/kafka/pull/15989#discussion_r1623158977


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java:
##
@@ -1184,6 +1185,141 @@ public void 
testRestoreRestartRequestInconsistentState() {
 verify(configLog).stop();
 }
 
+@Test
+public void testPutTaskConfigsZeroTasks() throws Exception {
+when(configLog.partitionCount()).thenReturn(1);
+
+configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+verifyConfigure();
+configStorage.start();
+
+// Bootstrap as if we had already added the connector, but no tasks 
had been added yet
+whiteBoxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), 
Collections.emptyList());
+
+// Null before writing
+ClusterConfigState configState = configStorage.snapshot();
+assertEquals(-1, configState.offset());
+
+// Task configs should read to end, write to the log, read to end, 
write root.
+
doAnswer(expectReadToEnd(Collections.emptyMap())).when(configLog).readToEnd();
+
+expectConvertWriteRead(
+COMMIT_TASKS_CONFIG_KEYS.get(0), 
KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(0),
+"tasks", 0); // We have 0 tasks
+
+configStorage.putTaskConfigs("connector1", Collections.emptyList());
+
+// As soon as root is rewritten, we should see a callback notifying us 
that we reconfigured some tasks
+configUpdateListener.onTaskConfigUpdate(Collections.emptyList());

Review Comment:
   Thanks for the example, would apply. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Adjust validateOffsetCommit/Fetch in ConsumerGroup to ensure compatibility with classic protocol members [kafka]

2024-05-31 Thread via GitHub


dajac commented on PR #16145:
URL: https://github.com/apache/kafka/pull/16145#issuecomment-2143311247

   @dongnuo123 There are failed tests that seem related to the patch. Could you 
please take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16876) TaskManager.handleRevocation doesn't handle errors thrown from task.prepareCommit

2024-05-31 Thread Rohan Desai (Jira)
Rohan Desai created KAFKA-16876:
---

 Summary: TaskManager.handleRevocation doesn't handle errors thrown 
from task.prepareCommit
 Key: KAFKA-16876
 URL: https://issues.apache.org/jira/browse/KAFKA-16876
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.6.0
Reporter: Rohan Desai


`TaskManager.handleRevocation` does not handle exceptions thrown by 
`task.prepareCommit`. In the particular instance I observed, `pepareCommit` 
flushed caches which led to downstream `producer.send` calls that threw a 
`TaskMigratedException`. This means that the tasks that need to be revoked are 
not suspended by `handleRevocation`. `ConsumerCoordinator` stores the thrown 
exception and then moves on to the other task assignment callbacks. One of 
these - `StreamsPartitionAssigner.onCommit` tries to close the tasks and raises 
an `IllegalStateException`. Fortunately, it dirty-closes the tasks if close 
fails so we don't leak any tasks. I think there's maybe two bugs here:
 # `TaskManager.handleRevocation` should handle errors from `prepareCommit`. It 
should try not to leave any revoked tasks in an unsuspended state.
 # The `ConsumerCoordinator` just throws the first exception that it sees. But 
it seems bad to throw the `TaskMigratedException` and drop the 
`IllegalStateException` (though in this case I think its relatively benign). I 
think on `IllegalStateException` we really want the streams thread to exit. One 
idea here is to have `ConsumerCoordinator` throw an exception type that 
includes the other exceptions that it has seen in another field. But this 
breaks the contract for clients that catch specific exceptions. I'm not sure of 
a clean solution, but I think its at least worth recording that it would be 
preferable to have the caller of `poll` handle all the thrown exceptions rather 
than just the first one.

 

Here is the IllegalStateException stack trace I observed:
{code:java}
[       508.535] [service_application2] [inf] [ERROR] 2024-05-30 06:35:04.556 
[e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-StreamThread-1] TaskManager - 
stream-thread [e2e-c0a9810b-8b09-46bd-a6d0-f2678ce0a1f3-St
reamThread-1] Failed to close task 0_3 cleanly. Attempting to close remaining 
tasks before re-throwing:
[       508.535] [service_application2] [inf] java.lang.IllegalStateException: 
Illegal state RUNNING while closing active task 0_3
[       508.535] [service_application2] [inf] at 
org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:673)
 ~[kafka-streams-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.streams.processor.internals.StreamTask.closeClean(StreamTask.java:546)
 ~[kafka-streams-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.streams.processor.internals.TaskManager.closeTaskClean(TaskManager.java:1295)
 ~[kafka-streams-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:630)
 [kafka-streams-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350)
 [kafka-streams-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381)
 [kafka-streams-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315)
 [kafka-clients-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469)
 [kafka-clients-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478)
 [kafka-clients-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389)
 [kafka-clients-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564)
 [kafka-clients-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220)
 [kafka-clients-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179) 
[kafka-clients-3.6.0.jar:?]
[       508.535] [service_application2] [inf] at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) 
[kafka-clients-3.6.0.jar:?]
[ 

Re: [PR] KAFKA-16223: Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest [kafka]

2024-05-31 Thread via GitHub


chia7712 commented on code in PR #15989:
URL: https://github.com/apache/kafka/pull/15989#discussion_r1623140655


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java:
##
@@ -1315,4 +1451,22 @@ private Map structToMap(Struct struct) {
 for (Field field : struct.schema().fields()) result.put(field.name(), 
struct.get(field));
 return result;
 }
+
+// Manually insert a connector into config storage, updating the task 
configs, connector config, and root config
+private void whiteBoxAddConnector(String connectorName, Map connectorConfig, List> taskConfigs) {

Review Comment:
   we don't use `whiteBox` now, so that can be simplified:
   ```java
   private void addConnector(String connectorName, Map 
connectorConfig, List> taskConfigs) {
   for (int i = 0; i < taskConfigs.size(); i++)
   configStorage.taskConfigs.put(new ConnectorTaskId(connectorName, 
i), taskConfigs.get(i));
   configStorage.connectorConfigs.put(connectorName, connectorConfig);
   configStorage.connectorTaskCounts.put(connectorName, 
taskConfigs.size());
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16223: Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest [kafka]

2024-05-31 Thread via GitHub


chia7712 commented on code in PR #15989:
URL: https://github.com/apache/kafka/pull/15989#discussion_r1623140561


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreMockitoTest.java:
##
@@ -1184,6 +1185,141 @@ public void 
testRestoreRestartRequestInconsistentState() {
 verify(configLog).stop();
 }
 
+@Test
+public void testPutTaskConfigsZeroTasks() throws Exception {
+when(configLog.partitionCount()).thenReturn(1);
+
+configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
+verifyConfigure();
+configStorage.start();
+
+// Bootstrap as if we had already added the connector, but no tasks 
had been added yet
+whiteBoxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), 
Collections.emptyList());
+
+// Null before writing
+ClusterConfigState configState = configStorage.snapshot();
+assertEquals(-1, configState.offset());
+
+// Task configs should read to end, write to the log, read to end, 
write root.
+
doAnswer(expectReadToEnd(Collections.emptyMap())).when(configLog).readToEnd();
+
+expectConvertWriteRead(
+COMMIT_TASKS_CONFIG_KEYS.get(0), 
KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(0),
+"tasks", 0); // We have 0 tasks
+
+configStorage.putTaskConfigs("connector1", Collections.emptyList());
+
+// As soon as root is rewritten, we should see a callback notifying us 
that we reconfigured some tasks
+configUpdateListener.onTaskConfigUpdate(Collections.emptyList());

Review Comment:
   Please see following example:
   
   ```java
   @Test
   public void testPutTaskConfigsZeroTasks() throws Exception {
   configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
   verifyConfigure();
   configStorage.start();
   verify(configLog).start();
   
   // Records to be read by consumer as it reads to the end of the log
   doAnswer(expectReadToEnd(new LinkedHashMap<>())).
   
doAnswer(expectReadToEnd(Collections.singletonMap(COMMIT_TASKS_CONFIG_KEYS.get(0),
 CONFIGS_SERIALIZED.get(0
   .when(configLog).readToEnd();
   
   expectConvertWriteRead(
   COMMIT_TASKS_CONFIG_KEYS.get(0), 
KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(0),
   "tasks", 0); // We have 0 tasks
   
   // Bootstrap as if we had already added the connector, but no tasks 
had been added yet
   addConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), 
Collections.emptyList());
   
   
   // Null before writing
   ClusterConfigState configState = configStorage.snapshot();
   assertEquals(-1, configState.offset());
   
   // Writing task configs should block until all the writes have been 
performed and the root record update
   // has completed
   List> taskConfigs = Collections.emptyList();
   configStorage.putTaskConfigs("connector1", taskConfigs);
   
   // Validate root config by listing all connectors and tasks
   configState = configStorage.snapshot();
   assertEquals(1, configState.offset());
   String connectorName = CONNECTOR_IDS.get(0);
   assertEquals(Collections.singletonList(connectorName), new 
ArrayList<>(configState.connectors()));
   assertEquals(Collections.emptyList(), 
configState.tasks(connectorName));
   assertEquals(Collections.EMPTY_SET, 
configState.inconsistentConnectors());
   
   // As soon as root is rewritten, we should see a callback notifying 
us that we reconfigured some tasks
   
verify(configUpdateListener).onTaskConfigUpdate(Collections.emptyList());
   
   configStorage.stop();
   verify(configLog).stop();
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15630 Improve documentation of offset.lag.max [kafka]

2024-05-31 Thread via GitHub


ganesh-sadanala commented on code in PR #16080:
URL: https://github.com/apache/kafka/pull/16080#discussion_r1623138681


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java:
##
@@ -101,7 +101,13 @@ public class MirrorSourceConfig extends 
MirrorConnectorConfig {
 public static final Class CONFIG_PROPERTY_FILTER_CLASS_DEFAULT = 
DefaultConfigPropertyFilter.class;
 
 public static final String OFFSET_LAG_MAX = "offset.lag.max";
-private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote 
partition can be before it is resynced.";
+private static final String OFFSET_LAG_MAX_DOC = "Determines the maximum 
allowed lag between the source and remote partitions before MirrorMaker 
initiates a resync operation to catch up the remote partition. The lag is 
calculated as the difference between the latest offset in the source partition 
and the last committed offset in the remote partition.\n" +
+"\n" +

Review Comment:
   new lines are not needed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Sliding window approach to calculate non-zero punctuate-ratio metric [kafka]

2024-05-31 Thread via GitHub


ganesh-sadanala opened a new pull request, #16162:
URL: https://github.com/apache/kafka/pull/16162

   This pull request changes the method to calculate the `punctuate-ratio` 
metric. The current implementation calculates the metric after the last record 
of the poll loop. After a puntuate, the value is close to 1, but there is 
little chance that metric is sampled at this time.  So its value is almost 
always 0.   
   
   The updated implementation calculates the metric value over the window of 
last 30 seconds. 
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15630 Improve documentation of offset.lag.max [kafka]

2024-05-31 Thread via GitHub


ganesh-sadanala commented on code in PR #16080:
URL: https://github.com/apache/kafka/pull/16080#discussion_r1623135976


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java:
##
@@ -101,7 +101,13 @@ public class MirrorSourceConfig extends 
MirrorConnectorConfig {
 public static final Class CONFIG_PROPERTY_FILTER_CLASS_DEFAULT = 
DefaultConfigPropertyFilter.class;
 
 public static final String OFFSET_LAG_MAX = "offset.lag.max";
-private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote 
partition can be before it is resynced.";
+private static final String OFFSET_LAG_MAX_DOC = "Determines the maximum 
allowed lag between the source and remote partitions before MirrorMaker 
initiates a resync operation to catch up the remote partition. The lag is 
calculated as the difference between the latest offset in the source partition 
and the last committed offset in the remote partition.\n" +
+"\n" +
+"When the lag for a remote partition exceeds the 
offset.lag.max value, MirrorMaker will initiate a resync operation 
to catch up the remote partition with the source partition. This involves 
reading records from the source partition starting from the last committed 
offset in the remote partition and writing them to the remote partition.\n" +
+"\n" +
+"Setting offset.lag.max to a lower value can be 
beneficial in scenarios where records may not flow constantly or at a 
consistent rate, as it ensures the remote partitions stay more closely in sync 
with the source partitions during periods of low throughput or inactivity. On 
the other hand, setting it to a higher value can be useful when the source 
topic has high throughput and the remote partitions can tolerate a larger 
lag.\n" +

Review Comment:
   Agree! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14509: [4/4] Handle includeAuthorizedOperations [kafka]

2024-05-31 Thread via GitHub


chia7712 commented on PR #16158:
URL: https://github.com/apache/kafka/pull/16158#issuecomment-2143262308

   out of curiosity, do we have IT for that option?  I grep code base and it 
seems the related ITs are running with old coordinator/protocol. For example:
   
   1. 
https://github.com/apache/kafka/blob/fb566e48bf05d749f8db8da803a3570acf25bb11/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala#L1213
   2. 
https://github.com/apache/kafka/blob/fb566e48bf05d749f8db8da803a3570acf25bb11/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala#L139


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16807: DescribeLogDirsResponseData#results#topics have unexpected topics having empty partitions [kafka]

2024-05-31 Thread via GitHub


m1a2st commented on PR #16042:
URL: https://github.com/apache/kafka/pull/16042#issuecomment-2143228875

   I already rebased the code


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: speed up reset consumer group offset test [kafka]

2024-05-31 Thread via GitHub


m1a2st commented on PR #16155:
URL: https://github.com/apache/kafka/pull/16155#issuecomment-2143228699

   Ok, I resolved the conflict


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: speed up reset consumer group offset test [kafka]

2024-05-31 Thread via GitHub


chia7712 commented on PR #16155:
URL: https://github.com/apache/kafka/pull/16155#issuecomment-2143225807

   @m1a2st Could you please fix the conflicts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16807: DescribeLogDirsResponseData#results#topics have unexpected topics having empty partitions [kafka]

2024-05-31 Thread via GitHub


chia7712 commented on PR #16042:
URL: https://github.com/apache/kafka/pull/16042#issuecomment-2143225345

   @m1a2st Could you rebase code to include newest fixes?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-05-31 Thread via GitHub


ableegoldman commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1623081838


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##
@@ -245,55 +251,46 @@ private AssignmentState(final ApplicationState 
applicationState,
 final int maxPairs = taskCount * (taskCount - 1) / 2;
 this.taskPairs = new TaskPairs(maxPairs);
 
-this.newTaskLocations = new HashMap<>();
-this.newAssignments = new HashMap<>();
+this.newTaskLocations = previousActiveAssignment.keySet().stream()
+.collect(Collectors.toMap(Function.identity(), taskId -> new 
HashSet<>()));
+this.newAssignments = 
clients.values().stream().collect(Collectors.toMap(
+KafkaStreamsState::processId,
+state -> KafkaStreamsAssignment.of(state.processId(), new 
HashSet<>())
+));
 }
 
 public void finalizeAssignment(final TaskId taskId, final ProcessId 
client, final AssignedTask.Type type) {

Review Comment:
   Overlooked this earlier I guess, these should all be private methods right? 
Pretty much everything other than #assign ?



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##
@@ -245,55 +251,46 @@ private AssignmentState(final ApplicationState 
applicationState,
 final int maxPairs = taskCount * (taskCount - 1) / 2;
 this.taskPairs = new TaskPairs(maxPairs);
 
-this.newTaskLocations = new HashMap<>();
-this.newAssignments = new HashMap<>();
+this.newTaskLocations = previousActiveAssignment.keySet().stream()
+.collect(Collectors.toMap(Function.identity(), taskId -> new 
HashSet<>()));
+this.newAssignments = 
clients.values().stream().collect(Collectors.toMap(
+KafkaStreamsState::processId,
+state -> KafkaStreamsAssignment.of(state.processId(), new 
HashSet<>())
+));
 }
 
 public void finalizeAssignment(final TaskId taskId, final ProcessId 
client, final AssignedTask.Type type) {
-newAssignments.computeIfAbsent(client, k -> new HashSet<>());
-newTaskLocations.computeIfAbsent(taskId, k -> new HashSet<>());
-
-final Set newAssignmentsForClient = 
newAssignments.get(client)
-.stream().map(AssignedTask::id).collect(Collectors.toSet());
-
+final Set newAssignmentsForClient = 
newAssignments.get(client).tasks().keySet();
 taskPairs.addPairs(taskId, newAssignmentsForClient);
-newAssignments.get(client).add(new AssignedTask(taskId, type));
-newTaskLocations.get(taskId).add(client);
+
+newAssignments.get(client).assignTask(new AssignedTask(taskId, 
type));
+newTaskLocations.computeIfAbsent(taskId, k -> new 
HashSet<>()).add(client);
 }
 
-public Map 
buildKafkaStreamsAssignments() {
-final Map 
kafkaStreamsAssignments = new HashMap<>();
-for (final Map.Entry> entry : 
newAssignments.entrySet()) {
-final ProcessId processId = entry.getKey();
-final Set assignedTasks = 
newAssignments.get(processId);
-final KafkaStreamsAssignment assignment = 
KafkaStreamsAssignment.of(processId, assignedTasks);
-kafkaStreamsAssignments.put(processId, assignment);
-}
-return kafkaStreamsAssignments;
+public Map newAssignments() {
+return newAssignments;
 }
 
 public void processOptimizedAssignments(final Map optimizedAssignments) {

Review Comment:
   now that `#processOptimizedAssignments` is only updating the 
`newTaskLocations`, can we simplify things further by just keeping the 
`newTaskLocations` map up to date as we move tasks around? That way we can get 
rid of `processOptimizedAssignments` altogether



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##
@@ -245,55 +251,46 @@ private AssignmentState(final ApplicationState 
applicationState,
 final int maxPairs = taskCount * (taskCount - 1) / 2;
 this.taskPairs = new TaskPairs(maxPairs);
 
-this.newTaskLocations = new HashMap<>();
-this.newAssignments = new HashMap<>();
+this.newTaskLocations = previousActiveAssignment.keySet().stream()
+.collect(Collectors.toMap(Function.identity(), taskId -> new 
HashSet<>()));
+this.newAssignments = 
clients.values().stream().collect(Collectors.toMap(
+KafkaStreamsState::processId,
+state -> KafkaStreamsAssignment.of(state.processId(), new 
HashSet<>())
+));
 }
 
 public void finalizeAssignment(final TaskId taskId, fin

Re: [PR] KAFKA-15305: The background thread should try to process the remaining task until the shutdown timer is expired. [kafka]

2024-05-31 Thread via GitHub


frankvicky commented on code in PR #16156:
URL: https://github.com/apache/kafka/pull/16156#discussion_r1622619577


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -298,7 +298,7 @@ private void sendUnsentRequests(final Timer timer) {
 do {

Review Comment:
   Cool, I think we don't need this early return statement in the current 
situation because we also need to consider if there are any in-flight requests. 
   The reason why we have this statement here is because we didn't consider the 
in-flight requests before, so we could return if there are no unsent requests. 
🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-05-31 Thread via GitHub


ableegoldman commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1623079947


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##
@@ -173,6 +178,7 @@ private static void assignStandby(final ApplicationState 
applicationState,
   final AssignmentState assignmentState) {
 final Set statefulTasks = 
applicationState.allTasks().values().stream()
 .filter(TaskInfo::isStateful)

Review Comment:
   you can remove this line actually, the changelogs check is sufficient (not 
to mention I just realized this filter is probably broken at the moment anyways 
since we still need to fix the `stateStoreNames` thing)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-05-31 Thread via GitHub


ableegoldman commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1623078889


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##
@@ -40,13 +41,17 @@
 import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils;
 import org.apache.kafka.streams.processor.assignment.TaskAssignor;
 import org.apache.kafka.streams.processor.assignment.TaskInfo;
+import org.apache.kafka.streams.processor.assignment.TaskTopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 public class StickyTaskAssignor implements TaskAssignor {
 private static final Logger LOG = 
LoggerFactory.getLogger(StickyTaskAssignor.class);
 
+public static final int DEFAULT_STATEFUL_TRAFFIC_COST = 1;
+public static final int DEFAULT_STATEFUL_NON_OVERLAP_COST = 10;

Review Comment:
   so weird. why did they name it "stateful"? both of these assignors, and 
AFAICT the configs themselves, pertain to both stateless and stateful tasks... 
🤷‍♀️ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-05-31 Thread via GitHub


ableegoldman commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1623064697


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java:
##
@@ -263,6 +267,8 @@ public 
Optional user
 final org.apache.kafka.streams.processor.assignment.TaskAssignor 
assignor = Utils.newInstance(userTaskAssignorClassname,
 
org.apache.kafka.streams.processor.assignment.TaskAssignor.class);
 log.info("Instantiated {} as the task assignor.", 
userTaskAssignorClassname);
+assignor.configure(streamsConfig.originals());
+log.info("Configured task assignor {} with the StreamsConfig.", 
userTaskAssignorClassname);

Review Comment:
   I think we only need to log something once about the assignor, the first one 
is good enough so I'd just remove this



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java:
##
@@ -45,8 +45,8 @@
 
 public class HighAvailabilityTaskAssignor implements TaskAssignor {
 private static final Logger log = 
LoggerFactory.getLogger(HighAvailabilityTaskAssignor.class);
-private static final int DEFAULT_STATEFUL_TRAFFIC_COST = 10;
-private static final int DEFAULT_STATEFUL_NON_OVERLAP_COST = 1;
+public static final int DEFAULT_STATEFUL_TRAFFIC_COST = 10;

Review Comment:
   ditto here: let's use a different variable name for the different classes, 
too confusing if we're mixing them up by calling them outside of this class 
now. So I guess `DEFAULT_HIGH_AVAILABILITY_XXX_COST` (or maybe just 
`DEFAULT_HA_XXX_COST`?)



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -1492,8 +1521,7 @@ public void onAssignment(final Assignment assignment, 
final ConsumerGroupMetadat
 topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost);
 encodedNextScheduledRebalanceMs = Long.MAX_VALUE;
 break;
-case 6:
-validateActiveTaskEncoding(partitions, info, logPrefix);
+case 6: validateActiveTaskEncoding(partitions, info, logPrefix);

Review Comment:
   accidental formatting change?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -790,8 +813,14 @@ private UserTaskAssignmentListener 
assignTasksToClients(final Cluster fullMetada
 final org.apache.kafka.streams.processor.assignment.TaskAssignor 
assignor = userTaskAssignor.get();
 final TaskAssignment taskAssignment = 
assignor.assign(applicationState);
 final AssignmentError assignmentError = 
validateTaskAssignment(applicationState, taskAssignment);
-processStreamsPartitionAssignment(clientMetadataMap, 
taskAssignment);
-userTaskAssignmentListener = (assignment, subscription) -> 
assignor.onAssignmentComputed(assignment, subscription, assignmentError);
+processStreamsPartitionAssignment(assignor, taskAssignment, 
assignmentError, clientMetadataMap, groupSubscription);
+userTaskAssignmentListener = (assignment, subscription) -> {
+assignor.onAssignmentComputed(assignment, subscription, 
assignmentError);
+if (assignmentError != AssignmentError.NONE) {
+throw new StreamsException("Task assignment with " + 
assignor.getClass() +

Review Comment:
   ditto here: log an error before throwing and change assignor.getClass() to 
assignor.getClass().getName()



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -573,14 +586,23 @@ private ApplicationState buildApplicationState(final 
TopologyMetadata topologyMe
 ));
 
 return new DefaultApplicationState(
-assignmentConfigs.toPublicAssignmentConfigs(),
+publicAssignmentConfigs,
 logicalTasks,
 clientMetadataMap
 );
 }
 
-private static void processStreamsPartitionAssignment(final Map clientMetadataMap,
-  final TaskAssignment 
taskAssignment) {
+private static void processStreamsPartitionAssignment(final 
org.apache.kafka.streams.processor.assignment.TaskAssignor assignor,
+  final TaskAssignment 
taskAssignment,
+  final 
AssignmentError assignmentError,
+  final Map clientMetadataMap,
+  final 
GroupSubscription groupSubscription) {
+if (assignmentError == AssignmentError.UNKNOWN_PROCESS_ID || 
assignmentError == AssignmentError.UNKNOWN_TASK_ID) {
+assignor.onA

Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-05-31 Thread via GitHub


ableegoldman commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1623063757


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##
@@ -29,20 +32,47 @@ public class AssignmentConfigs {
 private final int numStandbyReplicas;
 private final long probingRebalanceIntervalMs;
 private final List rackAwareAssignmentTags;
-private final int rackAwareTrafficCost;
-private final int rackAwareNonOverlapCost;
+private final OptionalInt rackAwareTrafficCost;
+private final OptionalInt rackAwareNonOverlapCost;
 private final String rackAwareAssignmentStrategy;
 
-public AssignmentConfigs(final StreamsConfig configs) {
-this(
-configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG),
-configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG),
-configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG),
-
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG),
-configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG),
-
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG),
-
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG),
-
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG)
+public static AssignmentConfigs of(final StreamsConfig configs) {
+final long acceptableRecoveryLag = 
configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG);
+final int maxWarmupReplicas = 
configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG);
+final int numStandbyReplicas = 
configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+final long probingRebalanceIntervalMs = 
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG);
+final List rackAwareAssignmentTags = 
configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG);
+final String rackAwareAssignmentStrategy = 
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG);
+Integer rackAwareTrafficCost = 
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG);
+Integer rackAwareNonOverlapCost = 
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG);
+
+final String assignorClassName = 
configs.getString(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG);
+if (StickyTaskAssignor.class.getName().equals(assignorClassName)) {
+if (rackAwareTrafficCost == null) {
+rackAwareTrafficCost = 
StickyTaskAssignor.DEFAULT_STICKY_TRAFFIC_COST;
+}
+if (rackAwareNonOverlapCost == null) {
+rackAwareNonOverlapCost = 
StickyTaskAssignor.DEFAULT_STICKY_NON_OVERLAP_COST;
+}
+} else if 
(HighAvailabilityTaskAssignor.class.getName().equals(assignorClassName)) {
+// TODO KAFKA-16869: replace with the HighAvailabilityTaskAssignor 
class once it implements the new TaskAssignor interface
+if (rackAwareTrafficCost == null) {
+rackAwareTrafficCost = 
HighAvailabilityTaskAssignor.DEFAULT_STATEFUL_TRAFFIC_COST;
+}
+if (rackAwareNonOverlapCost == null) {
+rackAwareNonOverlapCost = 
HighAvailabilityTaskAssignor.DEFAULT_STATEFUL_NON_OVERLAP_COST;
+}
+}
+
+return new AssignmentConfigs(
+acceptableRecoveryLag,
+maxWarmupReplicas,
+numStandbyReplicas,
+probingRebalanceIntervalMs,
+rackAwareAssignmentTags,
+OptionalInt.of(rackAwareTrafficCost),
+OptionalInt.of(rackAwareNonOverlapCost),

Review Comment:
   Does this need to be `ofNullable`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15045) Move Streams task assignor to public configs

2024-05-31 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-15045:
---
Description: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams

> Move Streams task assignor to public configs
> 
>
> Key: KAFKA-15045
> URL: https://issues.apache.org/jira/browse/KAFKA-15045
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: kip
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-05-31 Thread via GitHub


apourchet commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1623054265


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##
@@ -29,32 +33,59 @@ public class AssignmentConfigs {
 private final int numStandbyReplicas;
 private final long probingRebalanceIntervalMs;
 private final List rackAwareAssignmentTags;
-private final int rackAwareTrafficCost;
-private final int rackAwareNonOverlapCost;
+private final OptionalInt rackAwareTrafficCost;
+private final OptionalInt rackAwareNonOverlapCost;
 private final String rackAwareAssignmentStrategy;
 
-public AssignmentConfigs(final StreamsConfig configs) {
-this(
-configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG),
-configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG),
-configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG),
-
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG),
-configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG),
-
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG),
-
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG),
-
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG)
+public static AssignmentConfigs of(final StreamsConfig configs) {
+final long acceptableRecoveryLag = 
configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG);
+final int maxWarmupReplicas = 
configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG);
+final int numStandbyReplicas = 
configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+final long probingRebalanceIntervalMs = 
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG);
+final List rackAwareAssignmentTags = 
configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG);
+final String rackAwareAssignmentStrategy = 
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG);
+Optional rackAwareTrafficCost = 
Optional.ofNullable(configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG));

Review Comment:
   We need to map from Integer anyway, because otherwise the 
`OptionalInt.of(int)` construction will NPE.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16875) Replace ClientState with TaskAssignment when creating individual consumer Assignments

2024-05-31 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-16875:
--

 Summary: Replace ClientState with TaskAssignment when creating 
individual consumer Assignments
 Key: KAFKA-16875
 URL: https://issues.apache.org/jira/browse/KAFKA-16875
 Project: Kafka
  Issue Type: Sub-task
Reporter: A. Sophie Blee-Goldman


In the initial implementation of KIP-924 in version 3.8, we converted from the 
new TaskAssignor's output type (TaskAssignment) into the old ClientState-based 
assignment representation. This allowed us to plug in a custom assignor without 
converting all the internal mechanisms that occur after the KafkaStreams client 
level assignment and process it into a consumer level assignment.

However we ultimately want to get rid of ClientState altogether, so we need to 
invert this logic so that we instead convert the ClientState into a 
TaskAssignment and then use the TaskAssignment to process the assigned tasks 
into consumer Assignments



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16874) Remove old TaskAssignor interface

2024-05-31 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-16874:
--

 Summary: Remove old TaskAssignor interface
 Key: KAFKA-16874
 URL: https://issues.apache.org/jira/browse/KAFKA-16874
 Project: Kafka
  Issue Type: Sub-task
Reporter: A. Sophie Blee-Goldman


Once we have the new HAAssignor that implements the new TaskAssignor interface, 
we can remove the old TaskAssignor interface. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16873) Remove StreamsConfig.INTERNAL_TASK_ASSIGNOR_CLASS

2024-05-31 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-16873:
--

 Summary: Remove StreamsConfig.INTERNAL_TASK_ASSIGNOR_CLASS
 Key: KAFKA-16873
 URL: https://issues.apache.org/jira/browse/KAFKA-16873
 Project: Kafka
  Issue Type: Sub-task
Reporter: A. Sophie Blee-Goldman


Once we have all the out-of-the-box assignors implementing the new TaskAssignor 
interface that corresponds to the new public task assignor config, we can 
remove the old internal task assignor config altogether. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16872) Remove ClientState class

2024-05-31 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-16872:
--

 Summary: Remove ClientState class
 Key: KAFKA-16872
 URL: https://issues.apache.org/jira/browse/KAFKA-16872
 Project: Kafka
  Issue Type: Sub-task
Reporter: A. Sophie Blee-Goldman


One of the end-state goals of KIP-924 is to remove the ClientState class 
altogether. There are some blockers to this such as the removal of the old 
internal task assignor config and the old HAAssignor, so this ticket will 
probably be one of the very last KAFKA-16868 subtasks to be tackled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16871) Clean up internal AssignmentConfigs class in Streams

2024-05-31 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-16871:
--

 Summary: Clean up internal AssignmentConfigs class in Streams
 Key: KAFKA-16871
 URL: https://issues.apache.org/jira/browse/KAFKA-16871
 Project: Kafka
  Issue Type: Sub-task
Reporter: A. Sophie Blee-Goldman


In KIP-924 we added a new public AssignmentConfigs class to hold all of the, 
you guessed it, assignment related configs.

However, there is an existing config class of the same name and largely the 
same contents but that's in an internal package, specifically inside the 
AssignorConfiguration class.

We should remove the old AssignmentConfigs class that's in 
AssignorConfiguration and replace any usages of it with the new public 
AssignmentConfigs that we added in KIP-924



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-05-31 Thread via GitHub


apourchet commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1623057182


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##
@@ -40,13 +41,17 @@
 import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils;
 import org.apache.kafka.streams.processor.assignment.TaskAssignor;
 import org.apache.kafka.streams.processor.assignment.TaskInfo;
+import org.apache.kafka.streams.processor.assignment.TaskTopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 public class StickyTaskAssignor implements TaskAssignor {
 private static final Logger LOG = 
LoggerFactory.getLogger(StickyTaskAssignor.class);
 
+public static final int DEFAULT_STATEFUL_TRAFFIC_COST = 1;
+public static final int DEFAULT_STATEFUL_NON_OVERLAP_COST = 10;

Review Comment:
   I just copied these from the original internal StickyTaskAssignor, for the 
sake of consistency. I'll change them to `DEFAULT_STICKY_*` though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-05-31 Thread via GitHub


apourchet commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1623054265


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##
@@ -29,32 +33,59 @@ public class AssignmentConfigs {
 private final int numStandbyReplicas;
 private final long probingRebalanceIntervalMs;
 private final List rackAwareAssignmentTags;
-private final int rackAwareTrafficCost;
-private final int rackAwareNonOverlapCost;
+private final OptionalInt rackAwareTrafficCost;
+private final OptionalInt rackAwareNonOverlapCost;
 private final String rackAwareAssignmentStrategy;
 
-public AssignmentConfigs(final StreamsConfig configs) {
-this(
-configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG),
-configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG),
-configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG),
-
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG),
-configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG),
-
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG),
-
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG),
-
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG)
+public static AssignmentConfigs of(final StreamsConfig configs) {
+final long acceptableRecoveryLag = 
configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG);
+final int maxWarmupReplicas = 
configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG);
+final int numStandbyReplicas = 
configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+final long probingRebalanceIntervalMs = 
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG);
+final List rackAwareAssignmentTags = 
configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG);
+final String rackAwareAssignmentStrategy = 
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG);
+Optional rackAwareTrafficCost = 
Optional.ofNullable(configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG));

Review Comment:
   We need to map from Integer anyway, because otherwise the 
`OptionalInt.of(int)` construction will NPE.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-05-31 Thread via GitHub


ableegoldman commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1623047318


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##
@@ -29,32 +33,59 @@ public class AssignmentConfigs {
 private final int numStandbyReplicas;
 private final long probingRebalanceIntervalMs;
 private final List rackAwareAssignmentTags;
-private final int rackAwareTrafficCost;
-private final int rackAwareNonOverlapCost;
+private final OptionalInt rackAwareTrafficCost;
+private final OptionalInt rackAwareNonOverlapCost;
 private final String rackAwareAssignmentStrategy;
 
-public AssignmentConfigs(final StreamsConfig configs) {
-this(
-configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG),
-configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG),
-configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG),
-
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG),
-configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG),
-
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG),
-
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG),
-
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG)
+public static AssignmentConfigs of(final StreamsConfig configs) {
+final long acceptableRecoveryLag = 
configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG);
+final int maxWarmupReplicas = 
configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG);
+final int numStandbyReplicas = 
configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+final long probingRebalanceIntervalMs = 
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG);
+final List rackAwareAssignmentTags = 
configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG);
+final String rackAwareAssignmentStrategy = 
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG);
+Optional rackAwareTrafficCost = 
Optional.ofNullable(configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG));
+Optional rackAwareNonOverlapCost = 
Optional.ofNullable(configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG));
+
+final String assignorClassName = 
configs.getString(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG);
+if (StickyTaskAssignor.class.getName().equals(assignorClassName)) {
+if (!rackAwareTrafficCost.isPresent()) {
+rackAwareTrafficCost = 
Optional.of(StickyTaskAssignor.DEFAULT_STATEFUL_TRAFFIC_COST);
+}
+if (!rackAwareNonOverlapCost.isPresent()) {
+rackAwareNonOverlapCost = 
Optional.of(StickyTaskAssignor.DEFAULT_STATEFUL_NON_OVERLAP_COST);
+}
+}
+

Review Comment:
   nit: should be an else-if rather than two separate if statements



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java:
##
@@ -29,32 +33,59 @@ public class AssignmentConfigs {
 private final int numStandbyReplicas;
 private final long probingRebalanceIntervalMs;
 private final List rackAwareAssignmentTags;
-private final int rackAwareTrafficCost;
-private final int rackAwareNonOverlapCost;
+private final OptionalInt rackAwareTrafficCost;
+private final OptionalInt rackAwareNonOverlapCost;
 private final String rackAwareAssignmentStrategy;
 
-public AssignmentConfigs(final StreamsConfig configs) {
-this(
-configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG),
-configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG),
-configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG),
-
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG),
-configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG),
-
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG),
-
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG),
-
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG)
+public static AssignmentConfigs of(final StreamsConfig configs) {
+final long acceptableRecoveryLag = 
configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG);
+final int maxWarmupReplicas = 
configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG);
+final int numStandbyReplicas = 
configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+final long probingRebalanceIntervalMs = 
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG);
+final List rackAwareAssignmentTags = 
configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG);
+final String rackAwareAssign

[jira] [Created] (KAFKA-16870) Values.parseString returns objects which fail ConnectSchema.validateValue

2024-05-31 Thread Greg Harris (Jira)
Greg Harris created KAFKA-16870:
---

 Summary: Values.parseString returns objects which fail 
ConnectSchema.validateValue
 Key: KAFKA-16870
 URL: https://issues.apache.org/jira/browse/KAFKA-16870
 Project: Kafka
  Issue Type: Task
  Components: connect
Reporter: Greg Harris


Values.parseString attempts to parse schema'd data out of blind strings. It 
opportunistically parses maps and arrays, and tries to find a common schema 
that all values can be cast to.

If parsing succeeds but the values don't have a common schema, the Values class 
emits containers with null inner schemas (schemaless elements, keys, or values).

These are not acceptable in ConnectSchema.validateValue, which currently throws 
an NPE, and after KAFKA-16858 will throw DataException. We should avoid 
producing bad data from the Values class (and the SimpleHeaderConverter which 
relies on it) which causes exceptions when used later, for example, as the 
value of a Struct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16869) Rewrite HighAvailabilityTaskAssignor to implement the new TaskAssignor interface

2024-05-31 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-16869:
--

 Summary: Rewrite HighAvailabilityTaskAssignor to implement the new 
TaskAssignor interface
 Key: KAFKA-16869
 URL: https://issues.apache.org/jira/browse/KAFKA-16869
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: A. Sophie Blee-Goldman


We need to add a new HighAvailabilityTaskAssignor that implements the new 
TaskAssignor interface. Once we have that, we need to remember to also make 
these related changes:
 # Change the StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG default from null to the 
new HAAssignor
 # Check for this new HAAssignor type when evaluating the OptionalInt 
rack-aware assignment configs in the public AssignmentConfigs class. If these 
configs are Optional.empty()  and the new HAAssignor is used, they should be 
overridden to the HAAssignor-specific default values. This code already exists 
but should be updated to check for the new HAAssignor class name instead of  
"null" 
 # Until the old HAAssignor and old internal task assignor config can be 
removed completely, make sure the new HAAssignor is used by default when a 
TaskAssignor is selected in StreamsPartitionAssignor



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16868) Post KIP-924 StreamsPartitionAssignor code cleanup

2024-05-31 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-16868:
--

 Summary: Post KIP-924 StreamsPartitionAssignor code cleanup
 Key: KAFKA-16868
 URL: https://issues.apache.org/jira/browse/KAFKA-16868
 Project: Kafka
  Issue Type: Improvement
Reporter: A. Sophie Blee-Goldman


Making an umbrella task for all of the tech debt and code consolidation cleanup 
work that can/should be done following the implementation of [KIP-924: 
customizable task assignment for 
Streams|https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams]

Most of this revolves around deduplicating code once it's no longer needed, 
including classes like the ClientState, StandbyTaskAssignor and related 
elements, and the old TaskAssignor interface along with its implementations.

Note that in 3.8, the first version in which KIP-924 was released, we just 
added the new public config and new TaskAssignor interface but did not get rid 
of the old internal config or old TaskAssignor interface. If neither config is 
set in 3.8 we still default to the old HAAssignor, as a kind of opt-in feature 
flag, and internally will convert the output of the new TaskAssignor into the 
old style of ClientState-based assignment tracking. We intend to clean up all 
of the old code and eventually support only the new TaskAssignor interface as 
well as converting everything internally from the ClientState to the 
TaskAssignment/KafkaStreamsAssignment style output



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16858: Throw DataException from validateValue on array and map schemas without inner schemas [kafka]

2024-05-31 Thread via GitHub


gharris1727 opened a new pull request, #16161:
URL: https://github.com/apache/kafka/pull/16161

   The SchemaBuilder interface allows for objects with null valueSchema and/or 
keySchema to be constructed. These currently cause the validateValue to throw 
an NPE on non-empty containers. This changes empty and non-empty containers to 
throw DataException instead.
   
   The first commit rewrites some existing tests, and adds assertions for the 
prior behavior. Look at only changes in the later commits to see the actual 
behavior change.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16047: Use REQUEST_TIMEOUT_MS_CONFIG in AdminClient.fenceProducers [kafka]

2024-05-31 Thread via GitHub


edoardocomar commented on code in PR #16151:
URL: https://github.com/apache/kafka/pull/16151#discussion_r1623013225


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -4569,7 +4569,7 @@ public ListTransactionsResult 
listTransactions(ListTransactionsOptions options)
 public FenceProducersResult fenceProducers(Collection 
transactionalIds, FenceProducersOptions options) {
 AdminApiFuture.SimpleAdminApiFuture future =
 FenceProducersHandler.newFuture(transactionalIds);
-FenceProducersHandler handler = new FenceProducersHandler(logContext);
+FenceProducersHandler handler = new FenceProducersHandler(logContext, 
requestTimeoutMs);

Review Comment:
   https://issues.apache.org/jira/browse/KAFKA-16047?focusedCommentId=17851019 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16570) FenceProducers API returns "unexpected error" when successful

2024-05-31 Thread Edoardo Comar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar reassigned KAFKA-16570:
-

Assignee: Edoardo Comar  (was: Justine Olshan)

> FenceProducers API returns "unexpected error" when successful
> -
>
> Key: KAFKA-16570
> URL: https://issues.apache.org/jira/browse/KAFKA-16570
> Project: Kafka
>  Issue Type: Bug
>Reporter: Justine Olshan
>Assignee: Edoardo Comar
>Priority: Major
>
> When we want to fence a producer using the admin client, we send an 
> InitProducerId request.
> There is logic in that API to fence (and abort) any ongoing transactions and 
> that is what the API relies on to fence the producer. However, this handling 
> also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because 
> we want to actually get a new producer ID and want to retry until the the ID 
> is supplied or we time out.  
> [https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170]
>  
> [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1322]
>  
> In the case of fence producer, we don't retry and instead we have no handling 
> for concurrent transactions and log a message about an unexpected error.
> [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112]
>  
> This is not unexpected though and the operation was successful. We should 
> just swallow this error and treat this as a successful run of the command. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]

2024-05-31 Thread via GitHub


junrao commented on code in PR #15673:
URL: https://github.com/apache/kafka/pull/15673#discussion_r1623011371


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -228,7 +231,7 @@ public enum MetadataVersion {
  * Think carefully before you update this value. ONCE A METADATA 
VERSION IS PRODUCTION,
  * IT CANNOT BE CHANGED.
  */
-public static final MetadataVersion LATEST_PRODUCTION = IBP_3_7_IV4;
+public static final MetadataVersion LATEST_PRODUCTION = IBP_3_8_IV1;

Review Comment:
   Thanks, @CalvinConfluent .
   
   @clolov : In that case, we will need to (1) mark IBP_3_8_IV0 as unused, (2) 
create a new MV IBP_4_0_IV0, (3) replace existing references of IBP_3_8_IV0  
related to Elr like the following to IBP_4_0_IV0.
   
   ```
   public boolean isElrSupported() {
   return this.isAtLeast(IBP_3_8_IV0);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]

2024-05-31 Thread via GitHub


CalvinConfluent commented on code in PR #15673:
URL: https://github.com/apache/kafka/pull/15673#discussion_r1622979418


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -228,7 +231,7 @@ public enum MetadataVersion {
  * Think carefully before you update this value. ONCE A METADATA 
VERSION IS PRODUCTION,
  * IT CANNOT BE CHANGED.
  */
-public static final MetadataVersion LATEST_PRODUCTION = IBP_3_7_IV4;
+public static final MetadataVersion LATEST_PRODUCTION = IBP_3_8_IV1;

Review Comment:
   Not yet. We are still finalizing the last PR. 
https://github.com/apache/kafka/pull/15702
   @clolov I don't want to block your feature, we should let KIP-966 use 
another metadata version and mark the IBP_3_8_IV0 as reserved.
   @junrao What is your suggestion here? Using a future metadata version like 
IBP_4_0_IV0 for KIP-966 at the moment? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]

2024-05-31 Thread via GitHub


CalvinConfluent commented on code in PR #15673:
URL: https://github.com/apache/kafka/pull/15673#discussion_r1622977589


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -228,7 +231,7 @@ public enum MetadataVersion {
  * Think carefully before you update this value. ONCE A METADATA 
VERSION IS PRODUCTION,
  * IT CANNOT BE CHANGED.
  */
-public static final MetadataVersion LATEST_PRODUCTION = IBP_3_7_IV4;
+public static final MetadataVersion LATEST_PRODUCTION = IBP_3_8_IV1;

Review Comment:
   Not yet. We are still finalizing the last PR. 
https://github.com/apache/kafka/pull/15702
   @clolov I don't want to block your feature, we should mark KIP-966 enabled 
for another metadata version.
   @junrao What is your suggestion here? Using a future metadata version like 
IBP_4_0_IV0 for KIP-966? Also, should I make a change separately or we can do 
it in this PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]

2024-05-31 Thread via GitHub


CalvinConfluent commented on code in PR #15673:
URL: https://github.com/apache/kafka/pull/15673#discussion_r1622977589


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -228,7 +231,7 @@ public enum MetadataVersion {
  * Think carefully before you update this value. ONCE A METADATA 
VERSION IS PRODUCTION,
  * IT CANNOT BE CHANGED.
  */
-public static final MetadataVersion LATEST_PRODUCTION = IBP_3_7_IV4;
+public static final MetadataVersion LATEST_PRODUCTION = IBP_3_8_IV1;

Review Comment:
   Not yet. We are still finalizing the last PR. 
https://github.com/apache/kafka/pull/15702
   @clolov I don't want to block your feature, we should mark KIP-966 enabled 
for another metadata version.
   @junrao What is your suggestion here? Using a future metadata version like 
IBP_4_0_IV0 for KIP-966? Also, should I make a change separately or we can do 
it in this PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]

2024-05-31 Thread via GitHub


clolov commented on code in PR #15673:
URL: https://github.com/apache/kafka/pull/15673#discussion_r1622968644


##
core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala:
##
@@ -55,7 +55,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
   .build()
 
 val debugReplicaRequest = ListOffsetsRequest.Builder
-  .forReplica(ApiKeys.LIST_OFFSETS.latestVersion, 
ListOffsetsRequest.DEBUGGING_REPLICA_ID)
+  .forReplica(ApiKeys.LIST_OFFSETS.latestVersion(), 
ListOffsetsRequest.DEBUGGING_REPLICA_ID)

Review Comment:
   We do not, I will raise another version tomorrow morning which address this 
+ fix the merge conflicts



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16856) Zookeeper - Add new exception

2024-05-31 Thread Christo Lolov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christo Lolov reassigned KAFKA-16856:
-

Assignee: Muralidhar Basani

> Zookeeper - Add new exception
> -
>
> Key: KAFKA-16856
> URL: https://issues.apache.org/jira/browse/KAFKA-16856
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Muralidhar Basani
>Priority: Major
>
> *Summary*
> Add TIERED_STORAGE_DISABLEMENT_IN_PROGRESS exception



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16867) Streams should run tag-based standby assignment based on rack ids

2024-05-31 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-16867:
--

 Summary: Streams should run tag-based standby assignment based on 
rack ids
 Key: KAFKA-16867
 URL: https://issues.apache.org/jira/browse/KAFKA-16867
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: A. Sophie Blee-Goldman


In KIP-708, we introduced a tag-based standby task assignment algorithm that 
runs if the user has configured their clients with "rack aware assignment 
tags". If no tags are configured, the default load-based standby task 
assignment algorithm is run instead.

In KIP-924 we introduced a different kind of rack-aware assignment logic which 
is based on the "rack.id" of the consumers and topic partitions. While this did 
not replace the tag-based rack-aware assignment of KIP-708 which had different 
(and opposing) goals, we realized that Streams could leverage the rack.ids to 
run the tag-based standby task assignment algorithm even if clients were not 
configured with assignment tags.

Unfortunately, during implementation of KIP-924, a bug in the logic meant that 
the tag-based algorithm was never actually being run based on the rack ids. 
This bug is present to this day and carried over (intentionally) during the 
task assignor refactoring of KIP-924. 

We should still fix this bug so that users can benefit from the resiliency of 
KIP-708 based on consumer rack ids, even if they did not explicitly opt-in by 
configuring clients with assignment tags, since KIP-708 is a net benefit with 
no downside



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16248) Kafka consumer should cache leader offset ranges

2024-05-31 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-16248:
---

Assignee: Alieh Saeedi

> Kafka consumer should cache leader offset ranges
> 
>
> Key: KAFKA-16248
> URL: https://issues.apache.org/jira/browse/KAFKA-16248
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lucas Brutschy
>Assignee: Alieh Saeedi
>Priority: Critical
>
> We noticed a streams application received an OFFSET_OUT_OF_RANGE error 
> following a network partition and streams task rebalance and subsequently 
> reset its offsets to the beginning.
> Inspecting the logs, we saw multiple consumer log messages like: 
> {code:java}
> Setting offset for partition tp to the committed offset 
> FetchPosition{offset=1234, offsetEpoch=Optional.empty...)
> {code}
> Inspecting the streams code, it looks like kafka streams calls `commitSync` 
> passing through an explicit OffsetAndMetadata object but does not populate 
> the offset leader epoch.
> The offset leader epoch is required in the offset commit to ensure that all 
> consumers in the consumer group have coherent metadata before fetching. 
> Otherwise after a consumer group rebalance, a consumer may fetch with a stale 
> leader epoch with respect to the committed offset and get an offset out of 
> range error from a zombie partition leader.
> An example of where this can cause issues:
> 1. We have a consumer group with consumer 1 and consumer 2. Partition P is 
> assigned to consumer 1 which has up-to-date metadata for P. Consumer 2 has 
> stale metadata for P.
> 2. Consumer 1 fetches partition P with offset 50, epoch 8. commits the offset 
> 50 without an epoch.
> 3. The consumer group rebalances and P is now assigned to consumer 2. 
> Consumer 2 has a stale leader epoch for P (let's say leader epoch 7). 
> Consumer 2 will now try to fetch with leader epoch 7, offset 50. If we have a 
> zombie leader due to a network partition, the zombie leader may accept 
> consumer 2's fetch leader epoch and return an OFFSET_OUT_OF_RANGE to consumer 
> 2.
> If in step 1, consumer 1 committed the leader epoch for the message, then 
> when consumer 2 receives assignment P it would force a metadata refresh to 
> discover a sufficiently new leader epoch for the committed offset.
> Kafka Streams cannot fully determine the leader epoch of the offsets it wants 
> to commit - in EOS mode, streams commits the offset after the last control 
> records (to avoid always having a lag of >0), but the leader epoch of the 
> control record is not known to streams (since only non-control records are 
> returned from Consumer.poll).
> A fix discussed with [~hachikuji] is to have the consumer cache leader epoch 
> ranges, similar to how the broker maintains a leader epoch cache.
> This ticket was split from the original ticket 
> https://issues.apache.org/jira/browse/KAFKA-15344 which was described as a 
> streams fix, but the problem cannot be fully fixed in streams.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16698) Fix flaky kafka.network.DynamicConnectionQuotaTest#testDynamicIpConnectionRateQuota

2024-05-31 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851200#comment-17851200
 ] 

Chia-Ping Tsai commented on KAFKA-16698:


[~muralibasani] Could you file PR with your solution? I'd like to take a look 
and let see what happens in QA

> Fix flaky 
> kafka.network.DynamicConnectionQuotaTest#testDynamicIpConnectionRateQuota
> ---
>
> Key: KAFKA-16698
> URL: https://issues.apache.org/jira/browse/KAFKA-16698
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Priority: Major
>
> {code:java}
> org.opentest4j.AssertionFailedError: Timed out waiting for connection rate 
> update to propagateat 
> app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)   
> at app//org.junit.jupiter.api.Assertions.fail(Assertions.java:138)  at 
> app//kafka.network.DynamicConnectionQuotaTest.updateIpConnectionRate(DynamicConnectionQuotaTest.scala:279)
>at 
> app//kafka.network.DynamicConnectionQuotaTest.testDynamicIpConnectionRateQuota(DynamicConnectionQuotaTest.scala:255)
>  at 
> java.base@21.0.2/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
>  at java.base@21.0.2/java.lang.reflect.Method.invoke(Method.java:580)at 
> app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
>   at 
> app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>  at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
> at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
>   at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:94)
>at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
> at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16639) AsyncKafkaConsumer#close does not send heartbeat to leave group

2024-05-31 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851197#comment-17851197
 ] 

Chia-Ping Tsai commented on KAFKA-16639:


I have shipped the PR into trunk and 3.8

[~frankvicky] thanks for all your contribution. nice fix!

> AsyncKafkaConsumer#close does not send heartbeat to leave group
> ---
>
> Key: KAFKA-16639
> URL: https://issues.apache.org/jira/browse/KAFKA-16639
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Chia-Ping Tsai
>Assignee: TengYao Chi
>Priority: Critical
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> This bug can be reproduced by immediately closing a consumer which is just 
> created.
> The root cause is that we skip the new heartbeat used to leave group when 
> there is a in-flight heartbeat request 
> ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212).]
> It seems to me the simple solution is that we create a heartbeat request when 
> meeting above situation and then send it by pollOnClose 
> ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java#L62).]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16864) Copy on write in the Optimized Uniform Assignor

2024-05-31 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-16864.
-
Fix Version/s: 3.8.0
   Resolution: Fixed

> Copy on write in the Optimized Uniform Assignor
> ---
>
> Key: KAFKA-16864
> URL: https://issues.apache.org/jira/browse/KAFKA-16864
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ritika Reddy
>Assignee: David Jacot
>Priority: Major
> Fix For: 3.8.0
>
>
> An optimization for the uniform (homogenous) assignor by avoiding creating a 
> copy of all the assignments. Instead, the assignor creates a copy only if the 
> assignment is updated. It is a sort of copy-on-write. This change reduces the 
> overhead of the TargetAssignmentBuilder when ran with the uniform 
> (homogenous) assignor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16860) Introduce `group.version` feature flag

2024-05-31 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-16860.
-
Resolution: Fixed

> Introduce `group.version` feature flag
> --
>
> Key: KAFKA-16860
> URL: https://issues.apache.org/jira/browse/KAFKA-16860
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Blocker
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Optimize uniform (homogenous) assignor [kafka]

2024-05-31 Thread via GitHub


dajac merged PR #16088:
URL: https://github.com/apache/kafka/pull/16088


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16639) AsyncKafkaConsumer#close does not send heartbeat to leave group

2024-05-31 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16639.

Resolution: Fixed

> AsyncKafkaConsumer#close does not send heartbeat to leave group
> ---
>
> Key: KAFKA-16639
> URL: https://issues.apache.org/jira/browse/KAFKA-16639
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Chia-Ping Tsai
>Assignee: TengYao Chi
>Priority: Critical
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> This bug can be reproduced by immediately closing a consumer which is just 
> created.
> The root cause is that we skip the new heartbeat used to leave group when 
> there is a in-flight heartbeat request 
> ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java#L212).]
> It seems to me the simple solution is that we create a heartbeat request when 
> meeting above situation and then send it by pollOnClose 
> ([https://github.com/apache/kafka/blob/5de5d967adffd864bad3ec729760a430253abf38/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java#L62).]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16639: Ensure HeartbeatRequestManager generates leave request regardless of in-flight heartbeats. [kafka]

2024-05-31 Thread via GitHub


chia7712 merged PR #16017:
URL: https://github.com/apache/kafka/pull/16017


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14509: [4/4] Handle includeAuthorizedOperations [kafka]

2024-05-31 Thread via GitHub


dajac commented on code in PR #16158:
URL: https://github.com/apache/kafka/pull/16158#discussion_r1622909655


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -3852,6 +3853,17 @@ class KafkaApis(val requestChannel: RequestChannel,
 if (exception != null) {
   requestHelper.sendMaybeThrottle(request, 
consumerGroupDescribeRequest.getErrorResponse(exception))
 } else {
+  if (request.header.apiVersion >= 3 && includeAuthorizedOperations) {

Review Comment:
   I suppose that we don't need the version check because this API only has 
version zero.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16252: Dynamic KRaft network manager and channel [kafka]

2024-05-31 Thread via GitHub


ahuang98 opened a new pull request, #16160:
URL: https://github.com/apache/kafka/pull/16160

   See https://github.com/apache/kafka/pull/15986 for description and all 
comment history, this addresses some of the comments from that PR (anything 
with 👀)
   
   Specifically, 
https://github.com/apache/kafka/pull/15986#discussion_r1619520419, 
https://github.com/apache/kafka/pull/15986#discussion_r1619518236, and
   > I don't think we want to change all of the tests to use 
controller.quorum.bootstrap.servers. We still need to support the old 
configuration. Let's make it so that tests using IBP_3_8_IV0 or newer use the 
new thing, and tests using an older MV use the old configuration. That way we 
will have good coverage.
   
   are not addressed in this PR. I think it would be alright for them to be 
addressed in a followup.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Adjust validateOffsetCommit/Fetch in ConsumerGroup to ensure compatibility with classic protocol members [kafka]

2024-05-31 Thread via GitHub


dajac commented on code in PR #16145:
URL: https://github.com/apache/kafka/pull/16145#discussion_r1622897443


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -832,7 +855,20 @@ public void validateOffsetFetch(
 throw new UnknownMemberIdException(String.format("Member %s is not 
a member of group %s.",
 memberId, groupId));
 }
-validateMemberEpoch(memberEpoch, member.memberEpoch());
+
+if (member.useClassicProtocol()) {
+try {
+validateMemberEpoch(memberEpoch, member.memberEpoch());
+} catch (StaleMemberEpochException ex) {
+// StaleMemberEpochException is not supported in the classic 
protocol. We throw
+// IllegalGenerationException instead for compatibility.
+throw new IllegalGenerationException(String.format("Invalid 
offset commit because the "

Review Comment:
   nit: `offset fetch`.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -832,7 +855,20 @@ public void validateOffsetFetch(
 throw new UnknownMemberIdException(String.format("Member %s is not 
a member of group %s.",
 memberId, groupId));
 }
-validateMemberEpoch(memberEpoch, member.memberEpoch());
+
+if (member.useClassicProtocol()) {
+try {
+validateMemberEpoch(memberEpoch, member.memberEpoch());
+} catch (StaleMemberEpochException ex) {
+// StaleMemberEpochException is not supported in the classic 
protocol. We throw
+// IllegalGenerationException instead for compatibility.
+throw new IllegalGenerationException(String.format("Invalid 
offset commit because the "

Review Comment:
   I actually wonder if we could reuse the code too. We could define an helper 
method which takes the member, the received epoch and the source (e.g. offset 
commit or offset fetch). What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16652: add unit test for ClusterTemplate offering zero ClusterConfig [kafka]

2024-05-31 Thread via GitHub


chia7712 commented on code in PR #15862:
URL: https://github.com/apache/kafka/pull/15862#discussion_r1622894813


##
core/src/test/java/kafka/test/junit/ClusterTestExtensionsUnitTest.java:
##
@@ -17,29 +17,64 @@
 
 package kafka.test.junit;
 
+import kafka.test.ClusterConfig;
 import kafka.test.annotation.ClusterTemplate;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.extension.ExtensionContext;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.List;
+
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class ClusterTestExtensionsUnitTest {
+
+static List cfgEmpty() {
+return Collections.emptyList();
+}
+
+@SuppressWarnings({"unchecked", "rawtypes"})
+private ExtensionContext buildExtensionContext(String methodName) throws 
Exception {
+ExtensionContext extensionContext = mock(ExtensionContext.class);
+Class clazz = ClusterTestExtensionsUnitTest.class;
+Method method = clazz.getDeclaredMethod(methodName);
+when(extensionContext.getRequiredTestClass()).thenReturn(clazz);
+when(extensionContext.getRequiredTestMethod()).thenReturn(method);
+return extensionContext;
+}
+
 @Test
-void testProcessClusterTemplate() {
+void testProcessClusterTemplate() throws Exception {
 ClusterTestExtensions ext = new ClusterTestExtensions();
-ExtensionContext context = mock(ExtensionContext.class);
+ExtensionContext context = buildExtensionContext("cfgEmpty");
 
 ClusterTemplate annot = mock(ClusterTemplate.class);
-when(annot.value()).thenReturn("").thenReturn(" ");
+when(annot.value()).thenReturn("").thenReturn(" 
").thenReturn("cfgEmpty");
+
+Assertions.assertEquals(

Review Comment:
   This test is equal to next one, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16860; [2/2] Introduce group.version feature flag [kafka]

2024-05-31 Thread via GitHub


dajac merged PR #16149:
URL: https://github.com/apache/kafka/pull/16149


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]

2024-05-31 Thread via GitHub


dajac merged PR #16120:
URL: https://github.com/apache/kafka/pull/16120


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]

2024-05-31 Thread via GitHub


dajac commented on PR #16120:
URL: https://github.com/apache/kafka/pull/16120#issuecomment-2142887987

   I confirmed that the failed tests are not related. Merging to trunk and to 
3.8.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]

2024-05-31 Thread via GitHub


dajac commented on PR #16120:
URL: https://github.com/apache/kafka/pull/16120#issuecomment-2142885524

   testTopicPartitionsArgWithInternalIncluded does not fail locally. It is also 
a flaky test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] SMT InsertField current timestamp [kafka]

2024-05-31 Thread via GitHub


andreit-geomotiv opened a new pull request, #16159:
URL: https://github.com/apache/kafka/pull/16159

   Kafka Connect single message transform for inserting field with current 
system timestamp value.
   ```
 "transforms": "insertCurrentTimestamp",
 "transforms.insertCurrentTimestamp.type": 
"org.apache.kafka.connect.transforms.InsertField$Value",
 "transforms.insertCurrentTimestamp.current.timestamp.field": 
"LOAD_TIMESTAMP_UTC"
   ```
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: update all-latency-avg documentation [kafka]

2024-05-31 Thread via GitHub


sebastienviale commented on PR #16148:
URL: https://github.com/apache/kafka/pull/16148#issuecomment-2142880744

   hum, may be a check is needed. If yes, I will create another PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14509: [4/4] Handle includeAuthorizedOperations [kafka]

2024-05-31 Thread via GitHub


dajac commented on PR #16158:
URL: https://github.com/apache/kafka/pull/16158#issuecomment-2142879003

   @riedelmax Thanks for the patch. Could you please extend unit and 
integration tests to cover this change?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Bump trunk to 3.9.0-SNAPSHOT [kafka]

2024-05-31 Thread via GitHub


jlprat commented on PR #16150:
URL: https://github.com/apache/kafka/pull/16150#issuecomment-2142879659

   Hi @jolshan 
   According to KIP-1012 
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-1012%3A+The+need+for+a+Kafka+3.8.x+release)
 we agreed to have a version in the 3.x series with feature parity on 
KRaft-Zookeeper. KIP-853 
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Controller+Membership+Changes)
 and KIP-966 
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-966%3A+Eligible+Leader+Replicas)
 were identified as must haves. These two, are right now not done and it's not 
clear we could get them in before code freeze.
   
   For this reason I decided to err on the side precaution and consider there 
will be a 3.9.0. If we manage to get the desired parity we can always change it 
to 4.0.0.
   
   Let me know if you have further questions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15630 Improve documentation of offset.lag.max [kafka]

2024-05-31 Thread via GitHub


ganesh-sadanala commented on code in PR #16080:
URL: https://github.com/apache/kafka/pull/16080#discussion_r1622879301


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java:
##
@@ -101,7 +101,13 @@ public class MirrorSourceConfig extends 
MirrorConnectorConfig {
 public static final Class CONFIG_PROPERTY_FILTER_CLASS_DEFAULT = 
DefaultConfigPropertyFilter.class;
 
 public static final String OFFSET_LAG_MAX = "offset.lag.max";
-private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote 
partition can be before it is resynced.";
+private static final String OFFSET_LAG_MAX_DOC = "Determines the maximum 
allowed lag between the source and remote partitions before MirrorMaker 
initiates a resync operation to catch up the remote partition. The lag is 
calculated as the difference between the latest offset in the source partition 
and the last committed offset in the remote partition.\n" +
+"\n" +
+"When the lag for a remote partition exceeds the 
offset.lag.max value, MirrorMaker will initiate a resync operation 
to catch up the remote partition with the source partition. This involves 
reading records from the source partition starting from the last committed 
offset in the remote partition and writing them to the remote partition.\n" +

Review Comment:
   Thank you for pointing this out and providing the feedback. You're correct!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15630 Improve documentation of offset.lag.max [kafka]

2024-05-31 Thread via GitHub


ganesh-sadanala commented on code in PR #16080:
URL: https://github.com/apache/kafka/pull/16080#discussion_r1622872574


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java:
##
@@ -101,7 +101,13 @@ public class MirrorSourceConfig extends 
MirrorConnectorConfig {
 public static final Class CONFIG_PROPERTY_FILTER_CLASS_DEFAULT = 
DefaultConfigPropertyFilter.class;
 
 public static final String OFFSET_LAG_MAX = "offset.lag.max";
-private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote 
partition can be before it is resynced.";
+private static final String OFFSET_LAG_MAX_DOC = "Determines the maximum 
allowed lag between the source and remote partitions before MirrorMaker 
initiates a resync operation to catch up the remote partition. The lag is 
calculated as the difference between the latest offset in the source partition 
and the last committed offset in the remote partition.\n" +

Review Comment:
   This clearly explains the latest mapping record in the offset sync topic 
which clearly states the latest sync in the target/remote partition. Thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: update all-latency-avg documentation [kafka]

2024-05-31 Thread via GitHub


mjsax commented on PR #16148:
URL: https://github.com/apache/kafka/pull/16148#issuecomment-2142860090

   Just wondering. This seems to also apply to `all-latency-max` and the other 
metrics over iterators, ie, range?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: update all-latency-avg documentation [kafka]

2024-05-31 Thread via GitHub


mjsax commented on PR #16148:
URL: https://github.com/apache/kafka/pull/16148#issuecomment-2142858494

   Thanks for the PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: update all-latency-avg documentation [kafka]

2024-05-31 Thread via GitHub


mjsax merged PR #16148:
URL: https://github.com/apache/kafka/pull/16148


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]

2024-05-31 Thread via GitHub


AyoubOm commented on code in PR #15790:
URL: https://github.com/apache/kafka/pull/15790#discussion_r1622840595


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##
@@ -58,8 +60,21 @@ public void addChild(final ProcessorNode 
child) {
 public void init(final InternalProcessorContext context) {
 super.init(context);
 this.context = context;
-keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
-valSerializer = prepareValueSerializer(valSerializer, context, 
this.name());
+try {
+keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
+} catch (final ConfigException e) {
+throw new ConfigException(String.format("Failed to initialize key 
serdes for sink node %s", name()));
+} catch (final StreamsException e) {

Review Comment:
   Thanks everyone for your input ! I will make the change accordingly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-14509: [4/4] Handle includeAuthorizedOperations [kafka]

2024-05-31 Thread via GitHub


riedelmax opened a new pull request, #16158:
URL: https://github.com/apache/kafka/pull/16158

   Last PR for KAFKA-14509
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: update all-latency-avg documentation [kafka]

2024-05-31 Thread via GitHub


sebastienviale commented on code in PR #16148:
URL: https://github.com/apache/kafka/pull/16148#discussion_r1622820325


##
docs/ops.html:
##
@@ -3143,7 +3143,7 @@ 

Re: [PR] MINOR: Adjust validateOffsetCommit in ConsumerGroup to ensure compatibility with classic protocol members [kafka]

2024-05-31 Thread via GitHub


dongnuo123 commented on code in PR #16145:
URL: https://github.com/apache/kafka/pull/16145#discussion_r1622806164


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -806,7 +809,29 @@ public void validateOffsetCommit(
 if (memberEpoch < 0 && members().isEmpty()) return;
 
 final ConsumerGroupMember member = getOrMaybeCreateMember(memberId, 
false);
-validateMemberEpoch(memberEpoch, member.memberEpoch());
+if (member.useClassicProtocol()) {
+validateMemberInstanceId(member, groupInstanceId);

Review Comment:
   Let's remove this validation because we have to do 
`getOrMaybeCreateMember(memberId, false);` and know whether the member uses the 
classic protocol before deciding whether instance id should be validated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Adjust validateOffsetCommit in ConsumerGroup to ensure compatibility with classic protocol members [kafka]

2024-05-31 Thread via GitHub


dongnuo123 commented on code in PR #16145:
URL: https://github.com/apache/kafka/pull/16145#discussion_r1622804714


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -806,7 +809,29 @@ public void validateOffsetCommit(
 if (memberEpoch < 0 && members().isEmpty()) return;
 
 final ConsumerGroupMember member = getOrMaybeCreateMember(memberId, 
false);
-validateMemberEpoch(memberEpoch, member.memberEpoch());
+if (member.useClassicProtocol()) {
+validateMemberInstanceId(member, groupInstanceId);
+
+try {
+validateMemberEpoch(memberEpoch, member.memberEpoch());
+} catch (StaleMemberEpochException ex) {
+// StaleMemberEpochException is not supported in the classic 
protocol. We throw
+// IllegalGenerationException instead for compatibility.
+throw new IllegalGenerationException(String.format("Invalid 
offset commit because the "
++ "received generation id %d does not match the expected 
member epoch %d.",
+memberEpoch, member.memberEpoch()));
+}
+
+if (member.memberEpoch() < groupEpoch() ||
+member.state() == MemberState.UNREVOKED_PARTITIONS ||
+(member.state() == MemberState.UNRELEASED_PARTITIONS && 
!waitingOnUnreleasedPartition(member))) {
+throw new RebalanceInProgressException(String.format("Invalid 
offset commit because" +
+" a new rebalance has been triggered in group %s and 
member %s should rejoin to catch up.",
+groupId(), member.memberId()));
+}

Review Comment:
   Got it, now I know why the classic consumer offset commit validation doesn't 
check PREPARING_REBALANCE.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16570) FenceProducers API returns "unexpected error" when successful

2024-05-31 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851171#comment-17851171
 ] 

Justine Olshan commented on KAFKA-16570:


Hey [~ecomar], thanks. If you don't mind assigning yourself to the JIRA, I can 
review it (y) I've been busy with other 3.8 stuff. 

> FenceProducers API returns "unexpected error" when successful
> -
>
> Key: KAFKA-16570
> URL: https://issues.apache.org/jira/browse/KAFKA-16570
> Project: Kafka
>  Issue Type: Bug
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> When we want to fence a producer using the admin client, we send an 
> InitProducerId request.
> There is logic in that API to fence (and abort) any ongoing transactions and 
> that is what the API relies on to fence the producer. However, this handling 
> also returns CONCURRENT_TRANSACTIONS. In normal usage, this is good because 
> we want to actually get a new producer ID and want to retry until the the ID 
> is supplied or we time out.  
> [https://github.com/apache/kafka/blob/5193eb93237ba9093ae444d73a1eaa2d6abcc9c1/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L170]
>  
> [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1322]
>  
> In the case of fence producer, we don't retry and instead we have no handling 
> for concurrent transactions and log a message about an unexpected error.
> [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L112]
>  
> This is not unexpected though and the operation was successful. We should 
> just swallow this error and treat this as a successful run of the command. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]

2024-05-31 Thread via GitHub


junrao commented on code in PR #15673:
URL: https://github.com/apache/kafka/pull/15673#discussion_r1622789214


##
core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala:
##
@@ -55,7 +55,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
   .build()
 
 val debugReplicaRequest = ListOffsetsRequest.Builder
-  .forReplica(ApiKeys.LIST_OFFSETS.latestVersion, 
ListOffsetsRequest.DEBUGGING_REPLICA_ID)
+  .forReplica(ApiKeys.LIST_OFFSETS.latestVersion(), 
ListOffsetsRequest.DEBUGGING_REPLICA_ID)

Review Comment:
   Why do we need to add the brackets?



##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -228,7 +231,7 @@ public enum MetadataVersion {
  * Think carefully before you update this value. ONCE A METADATA 
VERSION IS PRODUCTION,
  * IT CANNOT BE CHANGED.
  */
-public static final MetadataVersion LATEST_PRODUCTION = IBP_3_7_IV4;
+public static final MetadataVersion LATEST_PRODUCTION = IBP_3_8_IV1;

Review Comment:
   @CalvinConfluent and @cmccabe : Is IBP_3_8_IV0 for KIP-966 production ready?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Bump trunk to 3.9.0-SNAPSHOT [kafka]

2024-05-31 Thread via GitHub


jolshan commented on PR #16150:
URL: https://github.com/apache/kafka/pull/16150#issuecomment-2142758803

   Hmmm. I'm not sure I follow the reasoning for bumping to 3.9 rather than 
4.0. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15630 Improve documentation of offset.lag.max [kafka]

2024-05-31 Thread via GitHub


gharris1727 commented on code in PR #16080:
URL: https://github.com/apache/kafka/pull/16080#discussion_r1622763556


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java:
##
@@ -101,7 +101,13 @@ public class MirrorSourceConfig extends 
MirrorConnectorConfig {
 public static final Class CONFIG_PROPERTY_FILTER_CLASS_DEFAULT = 
DefaultConfigPropertyFilter.class;
 
 public static final String OFFSET_LAG_MAX = "offset.lag.max";
-private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote 
partition can be before it is resynced.";
+private static final String OFFSET_LAG_MAX_DOC = "Determines the maximum 
allowed lag between the source and remote partitions before MirrorMaker 
initiates a resync operation to catch up the remote partition. The lag is 
calculated as the difference between the latest offset in the source partition 
and the last committed offset in the remote partition.\n" +
+"\n" +
+"When the lag for a remote partition exceeds the 
offset.lag.max value, MirrorMaker will initiate a resync operation 
to catch up the remote partition with the source partition. This involves 
reading records from the source partition starting from the last committed 
offset in the remote partition and writing them to the remote partition.\n" +

Review Comment:
   > This involves reading records from the source partition starting from the 
last committed offset in the remote partition and writing them to the remote 
partition.
   
   This is incorrect, and I don't know where this information came from.
   
   When the lag for a remote partition exceeds offset.lag.max, it will emit an 
offset sync to the offset sync topic, which can then be used by the 
MirrorCheckpointTask to translate upstream and downstream offsets.



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java:
##
@@ -101,7 +101,13 @@ public class MirrorSourceConfig extends 
MirrorConnectorConfig {
 public static final Class CONFIG_PROPERTY_FILTER_CLASS_DEFAULT = 
DefaultConfigPropertyFilter.class;
 
 public static final String OFFSET_LAG_MAX = "offset.lag.max";
-private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote 
partition can be before it is resynced.";
+private static final String OFFSET_LAG_MAX_DOC = "Determines the maximum 
allowed lag between the source and remote partitions before MirrorMaker 
initiates a resync operation to catch up the remote partition. The lag is 
calculated as the difference between the latest offset in the source partition 
and the last committed offset in the remote partition.\n" +
+"\n" +
+"When the lag for a remote partition exceeds the 
offset.lag.max value, MirrorMaker will initiate a resync operation 
to catch up the remote partition with the source partition. This involves 
reading records from the source partition starting from the last committed 
offset in the remote partition and writing them to the remote partition.\n" +
+"\n" +
+"Setting offset.lag.max to a lower value can be 
beneficial in scenarios where records may not flow constantly or at a 
consistent rate, as it ensures the remote partitions stay more closely in sync 
with the source partitions during periods of low throughput or inactivity. On 
the other hand, setting it to a higher value can be useful when the source 
topic has high throughput and the remote partitions can tolerate a larger 
lag.\n" +

Review Comment:
   > Setting offset.lag.max to a lower value can be beneficial in 
scenarios where records may not flow constantly or at a consistent rate
   
   Lowering the offset.lag.max to a non-zero value doesn't help when the flow 
is inconsistent. If a pause in the flow happens in-between syncs, the records 
after the last sync aren't translated. That was addressed in a separate 
improvement: https://issues.apache.org/jira/browse/KAFKA-15906
   
   Lowering the offset.lag.max to a nonzero value is only necessary when the 
partition has a consistent low throughput, and a fixed sync every 
offset.flush.interval.ms isn't acceptable, which is a very contrived scenario.



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java:
##
@@ -101,7 +101,13 @@ public class MirrorSourceConfig extends 
MirrorConnectorConfig {
 public static final Class CONFIG_PROPERTY_FILTER_CLASS_DEFAULT = 
DefaultConfigPropertyFilter.class;
 
 public static final String OFFSET_LAG_MAX = "offset.lag.max";
-private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote 
partition can be before it is resynced.";
+private static final String OFFSET_LAG_MAX_DOC = "Determines the maximum 
allowed lag between the source and remote partitions before MirrorMaker 
initiates a resync operation to catch up the remote partition. The lag is 
calculated as the difference between th

Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]

2024-05-31 Thread via GitHub


dajac commented on PR #16120:
URL: https://github.com/apache/kafka/pull/16120#issuecomment-2142740321

   It looks like testDescribeQuorumReplicationSuccessfu  also fails for other 
PRs: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-16154/1/tests.
 testTopicPartitionsArgWithInternalIncluded seems to be new.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]

2024-05-31 Thread via GitHub


dajac commented on PR #16120:
URL: https://github.com/apache/kafka/pull/16120#issuecomment-2142730589

   Will do when I get back to my computer. They indeed look suspicious.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]

2024-05-31 Thread via GitHub


jolshan commented on PR #16120:
URL: https://github.com/apache/kafka/pull/16120#issuecomment-2142726604

   Failures look like usual suspects, but can we confirm the 4.0 ones?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16047: Use REQUEST_TIMEOUT_MS_CONFIG in AdminClient.fenceProducers [kafka]

2024-05-31 Thread via GitHub


edoardocomar commented on code in PR #16151:
URL: https://github.com/apache/kafka/pull/16151#discussion_r1622769486


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -4569,7 +4569,7 @@ public ListTransactionsResult 
listTransactions(ListTransactionsOptions options)
 public FenceProducersResult fenceProducers(Collection 
transactionalIds, FenceProducersOptions options) {
 AdminApiFuture.SimpleAdminApiFuture future =
 FenceProducersHandler.newFuture(transactionalIds);
-FenceProducersHandler handler = new FenceProducersHandler(logContext);
+FenceProducersHandler handler = new FenceProducersHandler(logContext, 
requestTimeoutMs);

Review Comment:
   as an override to the AdminClient REQUEST_TIMEOUT_MS_CONFIG ?
   The options doc says it's on override on API timeout 
   but yes we could use that too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16809: Run Javadoc in CI [kafka]

2024-05-31 Thread via GitHub


gharris1727 merged PR #16025:
URL: https://github.com/apache/kafka/pull/16025


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16047: Use REQUEST_TIMEOUT_MS_CONFIG in AdminClient.fenceProducers [kafka]

2024-05-31 Thread via GitHub


gharris1727 commented on code in PR #16151:
URL: https://github.com/apache/kafka/pull/16151#discussion_r1622743028


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -4569,7 +4569,7 @@ public ListTransactionsResult 
listTransactions(ListTransactionsOptions options)
 public FenceProducersResult fenceProducers(Collection 
transactionalIds, FenceProducersOptions options) {
 AdminApiFuture.SimpleAdminApiFuture future =
 FenceProducersHandler.newFuture(transactionalIds);
-FenceProducersHandler handler = new FenceProducersHandler(logContext);
+FenceProducersHandler handler = new FenceProducersHandler(logContext, 
requestTimeoutMs);

Review Comment:
   There's a requestTimeoutMs in options, can you use that if specified?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: update all-latency-avg documentation [kafka]

2024-05-31 Thread via GitHub


mjsax commented on code in PR #16148:
URL: https://github.com/apache/kafka/pull/16148#discussion_r1622742983


##
docs/ops.html:
##
@@ -3143,7 +3143,7 @@ 

Re: [PR] MINOR: update all-latency-avg documentation [kafka]

2024-05-31 Thread via GitHub


mjsax commented on code in PR #16148:
URL: https://github.com/apache/kafka/pull/16148#discussion_r1622740601


##
docs/ops.html:
##
@@ -3143,7 +3143,7 @@ 

Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]

2024-05-31 Thread via GitHub


jolshan commented on code in PR #16120:
URL: https://github.com/apache/kafka/pull/16120#discussion_r1622739622


##
core/src/test/java/kafka/test/ClusterInstance.java:
##
@@ -159,9 +158,7 @@ default Set supportedGroupProtocols() {
 Set supportedGroupProtocols = new HashSet<>();
 supportedGroupProtocols.add(CLASSIC);
 
-// KafkaConfig#isNewGroupCoordinatorEnabled check both 
NEW_GROUP_COORDINATOR_ENABLE_CONFIG and 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG
-if (serverProperties.getOrDefault(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
"").equals("true") ||

Review Comment:
   Got it. Let me run through the tests one more time with this understanding.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15713: KRaft support in AclCommandTest [kafka]

2024-05-31 Thread via GitHub


chia7712 commented on code in PR #15830:
URL: https://github.com/apache/kafka/pull/15830#discussion_r1622737514


##
core/src/test/java/kafka/admin/AclCommandIntegrationTest.java:
##
@@ -0,0 +1,453 @@
+package kafka.admin;
+
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.*;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.acl.*;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.Resource;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.security.authorizer.AclEntry;
+import org.apache.kafka.server.config.ServerConfigs;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
+import scala.Console$;
+import scala.collection.JavaConverters;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.*;
+import static java.util.stream.Collectors.*;
+import static java.util.stream.Stream.concat;
+import static org.apache.kafka.common.acl.AclOperation.*;
+import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
+import static org.apache.kafka.common.acl.AclPermissionType.DENY;
+import static org.apache.kafka.common.resource.PatternType.LITERAL;
+import static org.apache.kafka.common.resource.PatternType.PREFIXED;
+import static org.apache.kafka.common.resource.ResourceType.*;
+import static 
org.apache.kafka.metadata.authorizer.StandardAuthorizer.SUPER_USERS_CONFIG;
+import static org.apache.kafka.test.TestUtils.tempFile;
+import static org.junit.jupiter.api.Assertions.*;
+import static scala.collection.JavaConverters.setAsJavaSet;
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults
+@Tag("integration")
+public class AclCommandIntegrationTest {
+private final ClusterInstance cluster;
+
+private static final String AUTHORIZER = 
ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG;
+private static final String ACL_AUTHORIZER = 
"kafka.security.authorizer.AclAuthorizer";
+private static final String STANDARD_AUTHORIZER = 
"org.apache.kafka.metadata.authorizer.StandardAuthorizer";
+private static final String SUPER_USERS = "super.users";
+private static final String USER_ANONYMOUS = "User:ANONYMOUS";
+
+private static final KafkaPrincipal PRINCIPAL = 
SecurityUtils.parseKafkaPrincipal("User:test2");

Review Comment:
   ok, that will have many duplicate code ...
   
   Sorry that could we merge this test into `AclCommandTest`? We don't need to 
use new test infra in this PR, and we can do a follow-up for that. Also, using 
scala is ok to me for now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15713: KRaft support in AclCommandTest [kafka]

2024-05-31 Thread via GitHub


chia7712 commented on code in PR #15830:
URL: https://github.com/apache/kafka/pull/15830#discussion_r1622734164


##
core/src/main/scala/kafka/admin/AclCommand.scala:
##
@@ -115,8 +115,6 @@ object AclCommand extends Logging {
   val aclBindings = acls.map(acl => new AclBinding(resource, 
acl)).asJavaCollection
   adminClient.createAcls(aclBindings).all().get()
 }
-
-listAcls(adminClient)

Review Comment:
   > Do you think it's worth migrating those tests to java at this stage, if 
they are going to be deleted anyway?
   
   we can keep origin test in this PR. the new test file you added is good to me
   
   > we can probably remove console output Current ACLs
   
   agree



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15713: KRaft support in AclCommandTest [kafka]

2024-05-31 Thread via GitHub


chia7712 commented on code in PR #15830:
URL: https://github.com/apache/kafka/pull/15830#discussion_r1622734164


##
core/src/main/scala/kafka/admin/AclCommand.scala:
##
@@ -115,8 +115,6 @@ object AclCommand extends Logging {
   val aclBindings = acls.map(acl => new AclBinding(resource, 
acl)).asJavaCollection
   adminClient.createAcls(aclBindings).all().get()
 }
-
-listAcls(adminClient)

Review Comment:
   > Do you think it's worth migrating those tests to java at this stage, if 
they are going to be deleted anyway?
   
   we can keep origin test :)
   
   > we can probably remove console output Current ACLs



##
core/src/main/scala/kafka/admin/AclCommand.scala:
##
@@ -115,8 +115,6 @@ object AclCommand extends Logging {
   val aclBindings = acls.map(acl => new AclBinding(resource, 
acl)).asJavaCollection
   adminClient.createAcls(aclBindings).all().get()
 }
-
-listAcls(adminClient)

Review Comment:
   > Do you think it's worth migrating those tests to java at this stage, if 
they are going to be deleted anyway?
   
   we can keep origin test :)
   
   > we can probably remove console output Current ACLs
   
   agree



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]

2024-05-31 Thread via GitHub


dajac commented on code in PR #16120:
URL: https://github.com/apache/kafka/pull/16120#discussion_r1622733949


##
core/src/test/java/kafka/test/ClusterInstance.java:
##
@@ -159,9 +158,7 @@ default Set supportedGroupProtocols() {
 Set supportedGroupProtocols = new HashSet<>();
 supportedGroupProtocols.add(CLASSIC);
 
-// KafkaConfig#isNewGroupCoordinatorEnabled check both 
NEW_GROUP_COORDINATOR_ENABLE_CONFIG and 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG
-if (serverProperties.getOrDefault(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
"").equals("true") ||

Review Comment:
   Correct!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16866: Used the right constant in RLMT#testFetchQuotaManagerConfig [kafka]

2024-05-31 Thread via GitHub


chia7712 merged PR #16152:
URL: https://github.com/apache/kafka/pull/16152


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Refactor DynamicConfig [kafka]

2024-05-31 Thread via GitHub


chia7712 merged PR #16133:
URL: https://github.com/apache/kafka/pull/16133


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: speed up reset consumer group offset test [kafka]

2024-05-31 Thread via GitHub


chia7712 commented on code in PR #16155:
URL: https://github.com/apache/kafka/pull/16155#discussion_r1622712633


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java:
##
@@ -62,7 +63,7 @@ static List forConsumerGroupCoordinator() {
 serverProperties.put(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true");
 
 return Collections.singletonList(ClusterConfig.defaultBuilder()
-.setTypes(Stream.of(KRAFT, 
CO_KRAFT).collect(Collectors.toSet()))

Review Comment:
   Could you please add comments for this change?



##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java:
##
@@ -75,6 +76,7 @@ static List forClassicGroupCoordinator() {
 serverProperties.put(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "false");
 
 return Collections.singletonList(ClusterConfig.defaultBuilder()
+.setTypes(Stream.of(ZK, KRAFT).collect(Collectors.toSet()))

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 16) TaskAssignor.onAssignmentComputed handling [kafka]

2024-05-31 Thread via GitHub


apourchet commented on code in PR #16147:
URL: https://github.com/apache/kafka/pull/16147#discussion_r1622591542


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##
@@ -481,10 +479,24 @@ private static int getCrossRackTrafficCost(final 
Set topicPa
  */
 private static boolean canPerformRackAwareOptimization(final 
ApplicationState applicationState,
final 
AssignedTask.Type taskType) {
-final String rackAwareAssignmentStrategy = 
applicationState.assignmentConfigs().rackAwareAssignmentStrategy();
+final AssignmentConfigs assignmentConfigs = 
applicationState.assignmentConfigs();
+final String rackAwareAssignmentStrategy = 
assignmentConfigs.rackAwareAssignmentStrategy();
 if 
(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(rackAwareAssignmentStrategy))
 {
+LOG.warn("Rack aware task assignment optimization disabled: rack 
aware strategy was set to {}",
+rackAwareAssignmentStrategy);
+return false;
+}
+
+if (!assignmentConfigs.rackAwareTrafficCost().isPresent()) {

Review Comment:
   `isEmpty` is JDK 11 unfortunately:
   
https://docs.oracle.com/en%2Fjava%2Fjavase%2F11%2Fdocs%2Fapi%2F%2F/java.base/java/util/Optional.html#isEmpty()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16811) Punctuate Ratio metric almost impossible to track

2024-05-31 Thread Ganesh Sadanala (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851160#comment-17851160
 ] 

Ganesh Sadanala edited comment on KAFKA-16811 at 5/31/24 4:53 PM:
--

[~sebviale] [~rohanpd] I have completed the implementation using the 
SlidingWindow approach with x=30 seconds for testing. Here are the changes: 
[https://github.com/ganesh-sadanala/kafka/commit/20873613b4f94cea5de32382e7e30cde25ef4c99]

 

I have followed these steps to test the changes, but I still see the 
puncutate-ratio as zero for all the instances of example Demo class. 
 # Start ZooKeeper, Kafka Broker.
 # Created input and output topics with 3 partitions (for the sake of having 
active tasks distributed to multiple instances of WordCountProcessorDemo stream 
class) 
 # 
{code:java}
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
streams-plaintext-input --partitions 3 --replication-factor 1

bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
streams-wordcount-output --partitions 3 --replication-factor 1 {code}
4. Run the 3 instances of Kafka Streams Demo Application in different 
terminals/processors:

 # 
{code:java}
bin/kafka-run-class.sh 
org.apache.kafka.streams.examples.wordcount.WordCountProcessorDemo {code}
5. Produce and consume data

 # 
{code:java}
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
streams-plaintext-input

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 
streams-wordcount-output --from-beginning{code}

       6. Open the jconsole and watch the metrics

 

I see that all the metrics are getting calculated. When I run the debugger, I 
see that in this code tasks.activeTasks() is an empty list. Because of that 
punctuated values is becoming zero, hence the punctuate ratio.

TaskExecutor.java
{code:java}
int punctuate() {
int punctuated = 0;

for (final Task task : tasks.activeTasks()) {
try {
if (executionMetadata.canPunctuateTask(task)) {
if (task.maybePunctuateStreamTime()) {
punctuated++;
}
if (task.maybePunctuateSystemTime()) {
punctuated++;
}
}
} catch (final TaskMigratedException e) {
log.info("Failed to punctuate stream task {} since it got 
migrated to another thread already. " +
"Will trigger a new rebalance and close all tasks as 
zombies together.", task.id());
throw e;
} catch (final StreamsException e) {
log.error("Failed to punctuate stream task {} due to the 
following error:", task.id(), e);
e.setTaskId(task.id());
throw e;
} catch (final KafkaException e) {
log.error("Failed to punctuate stream task {} due to the 
following error:", task.id(), e);
throw new StreamsException(e, task.id());
}
}

return punctuated;
}
} {code}
Is there a way to make active tasks list non-empty, thus I can test the changes 
and write some unit tests?

 

Is this behaviour normal in the local environment?


was (Author: JIRAUSER305566):
[~sebviale] [~rohanpd] I have completed the implementation using the 
SlidingWindow approach with x=30 seconds for testing. Here are the changes: 
[https://github.com/ganesh-sadanala/kafka/commit/20873613b4f94cea5de32382e7e30cde25ef4c99]

 

I have followed these steps to test the changes, but I still see the 
puncutate-ratio as zero for all the instances of example Demo class. 
 # Start ZooKeeper, Kafka Broker.
 # Created input and output topics with 3 partitions (for the sake of having 
active tasks distributed to multiple instances of WordCountProcessorDemo stream 
class) 
 # 
{code:java}
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
streams-plaintext-input --partitions 3 --replication-factor 1

bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
streams-wordcount-output --partitions 3 --replication-factor 1 {code}
4. Run the 3 instances of Kafka Streams Demo Application in different 
terminals/processors:

 # 
{code:java}
bin/kafka-run-class.sh 
org.apache.kafka.streams.examples.wordcount.WordCountProcessorDemo {code}
5. Produce and consume data

 # 
{code:java}
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
streams-plaintext-input

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 
streams-wordcount-output --from-beginning{code}

       6. Open the jconsole and watch the metrics

 

I see that all the metrics are getting calculated, but when in run the 
debugger, I see that in this code tasks.activeTasks() is empty list.

TaskExecutor.java
{code:java}
int punctuate() {
  

Re: [PR] KAFKA-16757: Fix broker re-registration issues around MV 3.7-IV2 [kafka]

2024-05-31 Thread via GitHub


cmccabe commented on PR #15945:
URL: https://github.com/apache/kafka/pull/15945#issuecomment-2142641154

   rebased


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]

2024-05-31 Thread via GitHub


jolshan commented on code in PR #16120:
URL: https://github.com/apache/kafka/pull/16120#discussion_r1622699466


##
core/src/test/java/kafka/test/ClusterInstance.java:
##
@@ -159,9 +158,7 @@ default Set supportedGroupProtocols() {
 Set supportedGroupProtocols = new HashSet<>();
 supportedGroupProtocols.add(CLASSIC);
 
-// KafkaConfig#isNewGroupCoordinatorEnabled check both 
NEW_GROUP_COORDINATOR_ENABLE_CONFIG and 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG
-if (serverProperties.getOrDefault(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
"").equals("true") ||

Review Comment:
   Hmmm -- so when you say the new group coordinator -- you mean the code 
(which handles both protocols) and not the protocol itself.
   
   Do I have this right?
   1. If the protocol is consumer -> new coordinator
   2. If the config is enabled -> new coordinator
   3. If protocol is not consumer && config is not enabled  -> old coordinator



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16811) Punctuate Ratio metric almost impossible to track

2024-05-31 Thread Ganesh Sadanala (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851160#comment-17851160
 ] 

Ganesh Sadanala edited comment on KAFKA-16811 at 5/31/24 4:47 PM:
--

[~sebviale] [~rohanpd] I have completed the implementation using the 
SlidingWindow approach with x=30 seconds for testing. Here are the changes: 
[https://github.com/ganesh-sadanala/kafka/commit/20873613b4f94cea5de32382e7e30cde25ef4c99]

 

I have followed these steps to test the changes, but I still see the 
puncutate-ratio as zero for all the instances of example Demo class. 
 # Start ZooKeeper, Kafka Broker.
 # Created input and output topics with 3 partitions (for the sake of having 
active tasks distributed to multiple instances of WordCountProcessorDemo stream 
class) 
 # 
{code:java}
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
streams-plaintext-input --partitions 3 --replication-factor 1

bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
streams-wordcount-output --partitions 3 --replication-factor 1 {code}
4. Run the 3 instances of Kafka Streams Demo Application in different 
terminals/processors:

 # 
{code:java}
bin/kafka-run-class.sh 
org.apache.kafka.streams.examples.wordcount.WordCountProcessorDemo {code}
5. Produce and consume data

 # 
{code:java}
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
streams-plaintext-input

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 
streams-wordcount-output --from-beginning{code}

       6. Open the jconsole and watch the metrics

 

I see that all the metrics are getting calculated, but when in run the 
debugger, I see that in this code tasks.activeTasks() is empty list.

TaskExecutor.java
{code:java}
int punctuate() {
int punctuated = 0;

for (final Task task : tasks.activeTasks()) {
try {
if (executionMetadata.canPunctuateTask(task)) {
if (task.maybePunctuateStreamTime()) {
punctuated++;
}
if (task.maybePunctuateSystemTime()) {
punctuated++;
}
}
} catch (final TaskMigratedException e) {
log.info("Failed to punctuate stream task {} since it got 
migrated to another thread already. " +
"Will trigger a new rebalance and close all tasks as 
zombies together.", task.id());
throw e;
} catch (final StreamsException e) {
log.error("Failed to punctuate stream task {} due to the 
following error:", task.id(), e);
e.setTaskId(task.id());
throw e;
} catch (final KafkaException e) {
log.error("Failed to punctuate stream task {} due to the 
following error:", task.id(), e);
throw new StreamsException(e, task.id());
}
}

return punctuated;
}
} {code}
Is there a way to make active tasks list non-empty, thus I can test the changes 
and write some unit tests?

 

Is this behaviour normal in the local environment?


was (Author: JIRAUSER305566):
[~sebviale] I have completed the implementation using the SlidingWindow 
approach with x=30 seconds for testing. Here are the changes: 
[https://github.com/ganesh-sadanala/kafka/commit/20873613b4f94cea5de32382e7e30cde25ef4c99]

 

I have followed these steps to test the changes, but I still see the 
puncutate-ratio as zero for all the instances of example Demo class. 
 # Start ZooKeeper, Kafka Broker.
 # Created input and output topics with 3 partitions (for the sake of having 
active tasks distributed to multiple instances of WordCountProcessorDemo stream 
class) 
 # 
{code:java}
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
streams-plaintext-input --partitions 3 --replication-factor 1

bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
streams-wordcount-output --partitions 3 --replication-factor 1 {code}
4. Run the 3 instances of Kafka Streams Demo Application in different 
terminals/processors:
 # 
{code:java}
bin/kafka-run-class.sh 
org.apache.kafka.streams.examples.wordcount.WordCountProcessorDemo {code}
5. Produce and consume data
 # 
{code:java}
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
streams-plaintext-input

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 
streams-wordcount-output --from-beginning{code}

       6. Open the jconsole and watch the metrics

 

I see that all the metrics are getting calculated, but when in run the 
debugger, I see that in this code tasks.activeTasks() is empty list.

TaskExecutor.java
{code:java}
int punctuate() {
int punctuated = 0;

for (final Task task : tasks.activeTasks()) {
tr

Re: [PR] KAFKA-16860; [1/2] Introduce group.version feature flag [kafka]

2024-05-31 Thread via GitHub


jolshan commented on code in PR #16120:
URL: https://github.com/apache/kafka/pull/16120#discussion_r1622699466


##
core/src/test/java/kafka/test/ClusterInstance.java:
##
@@ -159,9 +158,7 @@ default Set supportedGroupProtocols() {
 Set supportedGroupProtocols = new HashSet<>();
 supportedGroupProtocols.add(CLASSIC);
 
-// KafkaConfig#isNewGroupCoordinatorEnabled check both 
NEW_GROUP_COORDINATOR_ENABLE_CONFIG and 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG
-if (serverProperties.getOrDefault(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
"").equals("true") ||

Review Comment:
   Hmmm -- do when you say the new group coordinator -- you mean the code and 
not the protocol.
   
   Do I have this right?
   1. If the protocol is consumer -> new coordinator
   2. If the config is enabled -> new coordinator
   3. If protocol is not consumer && config is not enabled  -> old coordinator



##
core/src/test/java/kafka/test/ClusterInstance.java:
##
@@ -159,9 +158,7 @@ default Set supportedGroupProtocols() {
 Set supportedGroupProtocols = new HashSet<>();
 supportedGroupProtocols.add(CLASSIC);
 
-// KafkaConfig#isNewGroupCoordinatorEnabled check both 
NEW_GROUP_COORDINATOR_ENABLE_CONFIG and 
GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG
-if (serverProperties.getOrDefault(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
"").equals("true") ||

Review Comment:
   Hmmm -- so when you say the new group coordinator -- you mean the code and 
not the protocol.
   
   Do I have this right?
   1. If the protocol is consumer -> new coordinator
   2. If the config is enabled -> new coordinator
   3. If protocol is not consumer && config is not enabled  -> old coordinator



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16811) Punctuate Ratio metric almost impossible to track

2024-05-31 Thread Ganesh Sadanala (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851160#comment-17851160
 ] 

Ganesh Sadanala commented on KAFKA-16811:
-

[~sebviale] I have completed the implementation using the SlidingWindow 
approach with x=30 seconds for testing. Here are the changes: 
[https://github.com/ganesh-sadanala/kafka/commit/20873613b4f94cea5de32382e7e30cde25ef4c99]

 

I have followed these steps to test the changes, but I still see the 
puncutate-ratio as zero for all the instances of example Demo class. 
 # Start ZooKeeper, Kafka Broker.
 # Created input and output topics with 3 partitions (for the sake of having 
active tasks distributed to multiple instances of WordCountProcessorDemo stream 
class) 
 # 
{code:java}
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
streams-plaintext-input --partitions 3 --replication-factor 1

bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
streams-wordcount-output --partitions 3 --replication-factor 1 {code}
4. Run the 3 instances of Kafka Streams Demo Application in different 
terminals/processors:
 # 
{code:java}
bin/kafka-run-class.sh 
org.apache.kafka.streams.examples.wordcount.WordCountProcessorDemo {code}
5. Produce and consume data
 # 
{code:java}
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
streams-plaintext-input

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 
streams-wordcount-output --from-beginning{code}

       6. Open the jconsole and watch the metrics

 

I see that all the metrics are getting calculated, but when in run the 
debugger, I see that in this code tasks.activeTasks() is empty list.

TaskExecutor.java
{code:java}
int punctuate() {
int punctuated = 0;

for (final Task task : tasks.activeTasks()) {
try {
if (executionMetadata.canPunctuateTask(task)) {
if (task.maybePunctuateStreamTime()) {
punctuated++;
}
if (task.maybePunctuateSystemTime()) {
punctuated++;
}
}
} catch (final TaskMigratedException e) {
log.info("Failed to punctuate stream task {} since it got 
migrated to another thread already. " +
"Will trigger a new rebalance and close all tasks as 
zombies together.", task.id());
throw e;
} catch (final StreamsException e) {
log.error("Failed to punctuate stream task {} due to the 
following error:", task.id(), e);
e.setTaskId(task.id());
throw e;
} catch (final KafkaException e) {
log.error("Failed to punctuate stream task {} due to the 
following error:", task.id(), e);
throw new StreamsException(e, task.id());
}
}

return punctuated;
}
} {code}
Is there a way to make active tasks list non-empty, thus I can test the changes 
and write some unit tests?

 

Is this behaviour normal in the local environment?

> Punctuate Ratio metric almost impossible to track
> -
>
> Key: KAFKA-16811
> URL: https://issues.apache.org/jira/browse/KAFKA-16811
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sebastien Viale
>Assignee: Ganesh Sadanala
>Priority: Minor
>
> The Punctuate ratio metric is returned after the last record of the poll 
> loop. It is recomputed in every poll loop.
> After a puntuate, the value is close to 1, but there is little chance that 
> metric is sampled at this time. 
> So its value is almost always 0.   
> A solution could be to apply a kind of "sliding window" to it and report the 
> value for the last x seconds.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   >