[jira] [Resolved] (KAFKA-14957) Default value for state.dir is confusing

2024-02-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14957.
-
Fix Version/s: 3.8.0
   Resolution: Fixed

> Default value for state.dir is confusing
> 
>
> Key: KAFKA-14957
> URL: https://issues.apache.org/jira/browse/KAFKA-14957
> Project: Kafka
>  Issue Type: Bug
>  Components: docs, streams
>Reporter: Mickael Maison
>Assignee: Owen C.H. Leung
>Priority: Minor
>  Labels: beginner, newbie
> Fix For: 3.8.0
>
>
> The default value for state.dir is documented as 
> /var/folders/0t/68svdzmx1sld0mxjl8dgmmzmgq/T//kafka-streams
> This is misleading, the value will be different in each environment as it 
> computed using System.getProperty("java.io.tmpdir"). We should update the 
> description to mention how the path is computed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14957) Default value for state.dir is confusing

2024-02-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14957.
-
Fix Version/s: 3.8.0
   Resolution: Fixed

> Default value for state.dir is confusing
> 
>
> Key: KAFKA-14957
> URL: https://issues.apache.org/jira/browse/KAFKA-14957
> Project: Kafka
>  Issue Type: Bug
>  Components: docs, streams
>Reporter: Mickael Maison
>Assignee: Owen C.H. Leung
>Priority: Minor
>  Labels: beginner, newbie
> Fix For: 3.8.0
>
>
> The default value for state.dir is documented as 
> /var/folders/0t/68svdzmx1sld0mxjl8dgmmzmgq/T//kafka-streams
> This is misleading, the value will be different in each environment as it 
> computed using System.getProperty("java.io.tmpdir"). We should update the 
> description to mention how the path is computed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16241) Kafka Streams hits IllegalStateException trying to recycle a task

2024-02-09 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816238#comment-17816238
 ] 

Matthias J. Sax commented on KAFKA-16241:
-

{quote} Detected out-of-order KTable update
{quote}
I would hope that versioned store by default would fix this, but there is no 
real timeline for this... (we could also add a sensor for it and remove the log 
line – we did discuss this in the past with no resolution... not sure...)
{quote}Detected that shutdown was requested. All clients in this app will now 
begin to shutdown
{quote}
Interesting – I did not look into the logs in detail but just collected them – 
it on the main processing loop so no wonder we this this so often; guess 
`taskManager.rebalanceInProgress()` just return `true` for a longer period of 
while (I would assume that `isRunning()` is already `false`) – maybe just a 
simple static boolean flag to execute `maybeSendShutdown();` only once might be 
sufficient to address this?

> Kafka Streams hits IllegalStateException trying to recycle a task
> -
>
> Key: KAFKA-16241
> URL: https://issues.apache.org/jira/browse/KAFKA-16241
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>        Reporter: Matthias J. Sax
>Priority: Major
> Attachments: streams-1.zip, streams-2.zip, streams-3.zip
>
>
> Running with EOS-v2 (not sure if relevant or not) and hitting:
> {code:java}
> [2024-02-08 20:57:42,325] ERROR [i-0fede2697f39580f9-StreamThread-1] 
> stream-thread [i-0fede2697f39580f9-StreamThread-1] Failed to recycle task 1_0 
> cleanly. Attempting to close remaining tasks before re-throwing: 
> (org.apache.kafka.streams.processor.internals.TaskManager)
> java.lang.IllegalStateException: Illegal state RESTORING while recycling 
> active task 1_0
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.prepareRecycle(StreamTask.java:582)
>     at 
> org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createStandbyTaskFromActive(StandbyTaskCreator.java:125)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.convertActiveToStandby(TaskManager.java:675)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:651)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350)
>     at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:954)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
>  {code}
> Logs of all three KS instances attached.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16241) Kafka Streams hits IllegalStateException trying to recycle a task

2024-02-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16241:

Affects Version/s: 3.6.1

> Kafka Streams hits IllegalStateException trying to recycle a task
> -
>
> Key: KAFKA-16241
> URL: https://issues.apache.org/jira/browse/KAFKA-16241
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>        Reporter: Matthias J. Sax
>Priority: Major
> Attachments: streams-1.zip, streams-2.zip, streams-3.zip
>
>
> Running with EOS-v2 (not sure if relevant or not) and hitting:
> {code:java}
> [2024-02-08 20:57:42,325] ERROR [i-0fede2697f39580f9-StreamThread-1] 
> stream-thread [i-0fede2697f39580f9-StreamThread-1] Failed to recycle task 1_0 
> cleanly. Attempting to close remaining tasks before re-throwing: 
> (org.apache.kafka.streams.processor.internals.TaskManager)
> java.lang.IllegalStateException: Illegal state RESTORING while recycling 
> active task 1_0
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.prepareRecycle(StreamTask.java:582)
>     at 
> org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createStandbyTaskFromActive(StandbyTaskCreator.java:125)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.convertActiveToStandby(TaskManager.java:675)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:651)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350)
>     at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:954)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
>  {code}
> Logs of all three KS instances attached.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16241) Kafka Streams hits IllegalStateException trying to recycle a task

2024-02-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16241:

Attachment: streams-3.zip

> Kafka Streams hits IllegalStateException trying to recycle a task
> -
>
> Key: KAFKA-16241
> URL: https://issues.apache.org/jira/browse/KAFKA-16241
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>        Reporter: Matthias J. Sax
>Priority: Major
> Attachments: streams-1.zip, streams-2.zip, streams-3.zip
>
>
> Running with EOS-v2 (not sure if relevant or not) and hitting:
> {code:java}
> [2024-02-08 20:57:42,325] ERROR [i-0fede2697f39580f9-StreamThread-1] 
> stream-thread [i-0fede2697f39580f9-StreamThread-1] Failed to recycle task 1_0 
> cleanly. Attempting to close remaining tasks before re-throwing: 
> (org.apache.kafka.streams.processor.internals.TaskManager)
> java.lang.IllegalStateException: Illegal state RESTORING while recycling 
> active task 1_0
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.prepareRecycle(StreamTask.java:582)
>     at 
> org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createStandbyTaskFromActive(StandbyTaskCreator.java:125)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.convertActiveToStandby(TaskManager.java:675)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:651)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350)
>     at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:954)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
>  {code}
> Logs of all three KS instances attached.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16241) Kafka Streams hits IllegalStateException trying to recycle a task

2024-02-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16241:

Attachment: streams-2.zip

> Kafka Streams hits IllegalStateException trying to recycle a task
> -
>
> Key: KAFKA-16241
> URL: https://issues.apache.org/jira/browse/KAFKA-16241
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>            Reporter: Matthias J. Sax
>Priority: Major
> Attachments: streams-1.zip, streams-2.zip
>
>
> Running with EOS-v2 (not sure if relevant or not) and hitting:
> {code:java}
> [2024-02-08 20:57:42,325] ERROR [i-0fede2697f39580f9-StreamThread-1] 
> stream-thread [i-0fede2697f39580f9-StreamThread-1] Failed to recycle task 1_0 
> cleanly. Attempting to close remaining tasks before re-throwing: 
> (org.apache.kafka.streams.processor.internals.TaskManager)
> java.lang.IllegalStateException: Illegal state RESTORING while recycling 
> active task 1_0
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.prepareRecycle(StreamTask.java:582)
>     at 
> org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createStandbyTaskFromActive(StandbyTaskCreator.java:125)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.convertActiveToStandby(TaskManager.java:675)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:651)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350)
>     at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:954)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
>  {code}
> Logs of all three KS instances attached.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16241) Kafka Streams hits IllegalStateException trying to recycle a task

2024-02-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16241:

Attachment: streams-1.zip

> Kafka Streams hits IllegalStateException trying to recycle a task
> -
>
> Key: KAFKA-16241
> URL: https://issues.apache.org/jira/browse/KAFKA-16241
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>            Reporter: Matthias J. Sax
>Priority: Major
> Attachments: streams-1.zip
>
>
> Running with EOS-v2 (not sure if relevant or not) and hitting:
> {code:java}
> [2024-02-08 20:57:42,325] ERROR [i-0fede2697f39580f9-StreamThread-1] 
> stream-thread [i-0fede2697f39580f9-StreamThread-1] Failed to recycle task 1_0 
> cleanly. Attempting to close remaining tasks before re-throwing: 
> (org.apache.kafka.streams.processor.internals.TaskManager)
> java.lang.IllegalStateException: Illegal state RESTORING while recycling 
> active task 1_0
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.prepareRecycle(StreamTask.java:582)
>     at 
> org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createStandbyTaskFromActive(StandbyTaskCreator.java:125)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.convertActiveToStandby(TaskManager.java:675)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:651)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350)
>     at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478)
>     at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:954)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
>  {code}
> Logs of all three KS instances attached.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16241) Kafka Streams hits IllegalStateException trying to recycle a task

2024-02-09 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16241:
---

 Summary: Kafka Streams hits IllegalStateException trying to 
recycle a task
 Key: KAFKA-16241
 URL: https://issues.apache.org/jira/browse/KAFKA-16241
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Matthias J. Sax


Running with EOS-v2 (not sure if relevant or not) and hitting:
{code:java}
[2024-02-08 20:57:42,325] ERROR [i-0fede2697f39580f9-StreamThread-1] 
stream-thread [i-0fede2697f39580f9-StreamThread-1] Failed to recycle task 1_0 
cleanly. Attempting to close remaining tasks before re-throwing: 
(org.apache.kafka.streams.processor.internals.TaskManager)
java.lang.IllegalStateException: Illegal state RESTORING while recycling active 
task 1_0
    at 
org.apache.kafka.streams.processor.internals.StreamTask.prepareRecycle(StreamTask.java:582)
    at 
org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createStandbyTaskFromActive(StandbyTaskCreator.java:125)
    at 
org.apache.kafka.streams.processor.internals.TaskManager.convertActiveToStandby(TaskManager.java:675)
    at 
org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:651)
    at 
org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350)
    at 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469)
    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478)
    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:954)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
 {code}
Logs of all three KS instances attached.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16241) Kafka Streams hits IllegalStateException trying to recycle a task

2024-02-09 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16241:
---

 Summary: Kafka Streams hits IllegalStateException trying to 
recycle a task
 Key: KAFKA-16241
 URL: https://issues.apache.org/jira/browse/KAFKA-16241
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Matthias J. Sax


Running with EOS-v2 (not sure if relevant or not) and hitting:
{code:java}
[2024-02-08 20:57:42,325] ERROR [i-0fede2697f39580f9-StreamThread-1] 
stream-thread [i-0fede2697f39580f9-StreamThread-1] Failed to recycle task 1_0 
cleanly. Attempting to close remaining tasks before re-throwing: 
(org.apache.kafka.streams.processor.internals.TaskManager)
java.lang.IllegalStateException: Illegal state RESTORING while recycling active 
task 1_0
    at 
org.apache.kafka.streams.processor.internals.StreamTask.prepareRecycle(StreamTask.java:582)
    at 
org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createStandbyTaskFromActive(StandbyTaskCreator.java:125)
    at 
org.apache.kafka.streams.processor.internals.TaskManager.convertActiveToStandby(TaskManager.java:675)
    at 
org.apache.kafka.streams.processor.internals.TaskManager.closeAndRecycleTasks(TaskManager.java:651)
    at 
org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:350)
    at 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1381)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:315)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:469)
    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:478)
    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:389)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:564)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1220)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:954)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
 {code}
Logs of all three KS instances attached.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-877: Mechanism for plugins and connectors to register metrics

2024-02-08 Thread Matthias J. Sax

Still need to digest the KIP, but one thing coming to mind:

Instead of requiring existing interfaces to implement `Closable`, would 
it make sense to make `Monitorable extends Closable` to sidestep this issue?



-Matthias

On 1/25/24 9:03 AM, Mickael Maison wrote:

Hi Luke,

The reason vary for each plugin, I've added details to most plugins in
the table.
The plugins without an explanation are all from Streams. I admit I
don't know these interfaces enough to decide if it makes sense making
them closeable and instrumenting them. It would be nice to get some
input from Streams contributors to know.

Thanks,
Mickael

On Thu, Jan 25, 2024 at 5:50 PM Mickael Maison  wrote:


Hi Tom,

Thanks for taking a look at the KIP!

1. Yes I considered several names (see the previous messages in the
discussion). KIP-608, which this KIP superseeds, used "monitor()" for
the method name. I find "withMetrics()" to be nicer due to the way the
method should be used. That said, I'm not attached to the name so if
more people prefer "monitor()", or can come up with a better name, I'm
happy to make the change. I updated the javadoc to clarify the usage
and mention when to close the PluginMetrics instance.

2. Yes I added a note to the PluginMetrics interface

3. I used this exception to follow the behavior of Metrics.addMetric()
which throws IllegalArgumentException if a metric with the same name
already exist.

4. I added details to the javadoc

Thanks,
Mickael


On Thu, Jan 25, 2024 at 10:32 AM Luke Chen  wrote:


Hi Mickael,

Thanks for the KIP.
The motivation and solution makes sense to me.

Just one question:
If we could extend `closable` for Converter plugin, why don't we do that
for the "Unsupported Plugins" without close method?
I don't say we must do that in this KIP, but maybe you could add the reason
in the "rejected alternatives".

Thanks.
Luke

On Thu, Jan 25, 2024 at 3:46 PM Slathia p  wrote:


Hi Team,



Greetings,



Apologies for the delay in reply as I was down with flu.



We actually reached out to you for IT/ SAP/ Oracle/ Infor / Microsoft
“VOTEC IT SERVICE PARTNERSHIP”  “IT SERVICE OUTSOURCING” “ “PARTNER SERVICE
SUBCONTRACTING”



We have very attractive newly introduce reasonably price PARTNER IT
SERVICE ODC SUBCONTRACTING MODEL in USA, Philippines, India and Singapore
etc with White Label Model.



Our LOW COST IT SERVICE ODC MODEL eliminate the cost of expensive employee
payroll, Help partner to get profit more than 50% on each project.. ..We
really mean it.



We are already working with platinum partner like NTT DATA, NEC Singapore,
Deloitte, Hitachi consulting. ACCENTURE, Abeam Singapore etc.



Are u keen to understand VOTEC IT SERVICE MODEL PARTNERSHIP offerings?



Let us know your availability this week OR Next week?? We can arrange
discussion with Partner Manager.

On 01/25/2024 9:56 AM +08 Tom Bentley  wrote:


Hi Mickael,

Thanks for the KIP! I can tell a lot of thought went into this. I have a
few comments, but they're all pretty trivial and aimed at making the
correct use of this API clearer to implementors.

1. Configurable and Reconfigurable both use a verb in the imperative mood
for their method name. Monitorable doesn't, which initially seemed a bit
inconsistent to me, but I think your intention is to allow plugins to
merely retain a reference to the PluginMetrics, and allow registering
metrics at any later point? If that's the case you could add something

like

"Plugins can register and unregister metrics using the given

PluginMetrics

at any point in their lifecycle prior to their close method being called"
to the javadoc to make clear how this can be used.
2. I assume PluginMetrics will be thread-safe? We should document that as
part of the contract.
3. I don't think IAE is quite right for duplicate metrics. In this case

the

arguments themselves are fine, it's the current state of the

PluginMetrics

which causes the problem. If the earlier point about plugins being

allowed

to register and unregister metrics at any point is correct then this
exception could be thrown after configuration time. That being the case I
think a new exception type might be clearer.
4. You define some semantics for PluginMetrics.close(): It might be a

good

idea to override the inherited method and add that as javadoc.
5. You say "It will be the responsibility of the plugin that creates
metrics to call close() of the PluginMetrics instance they were given to
remove their metrics." But you don't provide any guidance to users about
when they need to do this. I guess that they should be doing this in

their

plugin's close method (and that's why you're only adding Monitorable to
plugins which implement Closeable and AutoCloseable), but if that's the
case then let's say so in the Javadoc.

Thanks again,

Tom

On Wed, 13 Dec 2023 at 06:09, Mickael Maison 
wrote:


Hi,

I've not received any feedback since I updated the KIP.
I'll wait a few more days and if there's no further feedback I'll

start a

vote.


Re: Kafka-Streams-Scala for Scala 3

2024-02-08 Thread Matthias J. Sax

Josep,

thanks for helping with this. I was also thinking if we might need a KIP 
for this change. Since you had the same though, I would say, yes, let's 
do a KIP.


@Matthias: can you prepare a KIP? You can read up on the details on the 
wiki page: 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals


If you have any questions about the process, please let us know.

Thanks for pushing this forward!


-Matthias

On 2/8/24 8:08 AM, Matthias Berndt wrote:

Hey Josep et al,

I've created a ticket regarding this.
https://issues.apache.org/jira/browse/KAFKA-16237

All the best,
Matthias

Am Do., 8. Feb. 2024 um 11:42 Uhr schrieb Josep Prat
:


Go ahead and ask for a JIRA and Wiki account (Confluence). Let us know when
your accounts are created and we'll properly set them up so you can create
and assign tickets to you.

Best,

On Thu, Feb 8, 2024 at 11:32 AM Matthias Berndt 
wrote:


Thanks Josep, I've applied for a JIRA account and addressed your
review comments.

Am Do., 8. Feb. 2024 um 09:19 Uhr schrieb Josep Prat
:


Hi Matthias,

I think for this particular case it would be worth creating a JIRA ticket
for this as it's a new "feature".
Regarding the change itself, I think we need to clarify how the release
process would work. Right now, the script `gradlewAll` is used (which
basically runs the build with Scala version 2.12 and 2.13). If I

understand

your changes correctly, we would need to run the build 3 times:
- 1 with property scalaVersion 2.12
- 1 with scalaVersion 2.13 and streamsScalaVersion 2.13
- 1 with scalaVersion 2.13 and streamsScalaVersion 3.1

I think we should document this and discuss when to have this feature.

If I

remember correctly from when I tried to update Kafka to Scala 3, the idea
was to push this to a Kafka 4.0 version because we didn't want to

maintain

more than 2 Scala versions at the same time. I would encourage if not
having a KIP, at least open up a [DISCUSS] thread to clarify some of

these

points.

I'll add some feedback on the PR itself regarding the changes.

Best,

On Thu, Feb 8, 2024 at 1:57 AM Matthias Berndt <

matthias.ber...@ttmzero.com>

wrote:


Hi Matthias J., Hi Lucas, Hi Josep,

Thank you for your encouraging responses regarding a Scala 3 port of
Kafka-Streams-Scala, and apologies for the late response from my side.
I have now created a PR to port Kafka-Streams-Scala to Scala 3 (while
retaining support for 2.13 and 2.12). Almost no changes to the code
were required and the tests also pass. Please take a look and let me
know what you think :-)
https://github.com/apache/kafka/pull/15338

All the best
Matthias

Am Do., 1. Feb. 2024 um 16:35 Uhr schrieb Josep Prat
:


Hi,

For reference, prior work on this:
https://github.com/apache/kafka/pull/11350
https://github.com/apache/kafka/pull/11432

Best,

On Thu, Feb 1, 2024, 15:55 Lucas Brutschy 
.invalid>

wrote:


Hi Matthiases,

I know Scala 2 fairly well, so I'd be happy to review changes that

add

Scala 3 support. However, as Matthias S. said, it has to be driven

by

people who use Scala day-to-day, since I believe most Kafka Streams
committers are working with Java.

Rewriting the tests to not use EmbeddedKafkaCluster seems like a

large

undertaking, so option 1 is the first thing we should explore.

I don't have any experience with Scala 3 migration topics, but on

the

Scala website it says

The first piece of good news is that the Scala 3 compiler is

able to

read the Scala 2.13 Pickle format and thus it can type check code

that

depends on modules or libraries compiled with Scala 2.13.

One notable example is the Scala 2.13 library. We have indeed

decided

that the Scala 2.13 library is the official standard library for

Scala

3.

So wouldn't that mean that we are safe in terms of standard library
upgrades if we use core_2.13 in the tests?

Cheers,
Lucas


On Wed, Jan 31, 2024 at 9:20 PM Matthias J. Sax 

wrote:


Thanks for raising this. The `kafka-streams-scala` module seems

to

be an

important feature for Kafka Streams and I am generally in favor

of

your

proposal to add Scala 3 support. However, I am personally no

Scala

person and it sounds like quite some overhead.

If you are willing to drive and own this initiative happy to

support

you

to the extend I can.

About the concrete proposal: my understanding is that :core will

move

off Scala long-term (not 100% sure what the timeline is, but new

modules

are written in Java only). Thus, down the road the compatibility

issue

would go away naturally, but it's unclear when.

Thus, if we can test kafak-stream-scala_3 with core_2.13 it

seems we

could add support for Scala 3 now, taking a risk that it might

break

in

the future assume that the migration off Scala from core is not

fast

enough.


For proposal (2), I don't think that it would be easily possible

for

unit/integration tests. We could fall back to system tests

though,

but

they would be much more heavy weight of cou

[jira] [Updated] (KAFKA-16237) Kafka-Streams-Scala for Scala 3

2024-02-08 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16237:

Labels: needs-kip  (was: )

> Kafka-Streams-Scala for Scala 3
> ---
>
> Key: KAFKA-16237
> URL: https://issues.apache.org/jira/browse/KAFKA-16237
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias Berndt
>Assignee: Matthias Berndt
>Priority: Major
>  Labels: needs-kip
>
> Kafka-Streams-Scala is currently not available for Scala 3. The required 
> changes to compile the codebase with Scala 3 are trivial, so it's mostly 
> about how the build system needs to change.
> I've created a PR to enable separate configuration of the Scala version for 
> the Kafka Core and Kafka-Streams-Scala.
> [https://github.com/apache/kafka/pull/15338]
> This is necessary because the Kafka-Streams-Scala Tests depend on Core, which 
> hasn't been ported to Scala 3 yet. The most important thing to figure out is 
> probably how CI and publishing should work – I'm not familiar with Kafka's CI 
> setup, so I'm going to need some input from the Kafka team regarding this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16237) Kafka-Streams-Scala for Scala 3

2024-02-08 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815763#comment-17815763
 ] 

Matthias J. Sax commented on KAFKA-16237:
-

Thanks for filing the ticket and working on a PR. – I added you to the list on 
contributors and assigned the ticket to you (you can know also self-assign 
tickets).

> Kafka-Streams-Scala for Scala 3
> ---
>
> Key: KAFKA-16237
> URL: https://issues.apache.org/jira/browse/KAFKA-16237
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias Berndt
>Assignee: Matthias Berndt
>Priority: Major
>
> Kafka-Streams-Scala is currently not available for Scala 3. The required 
> changes to compile the codebase with Scala 3 are trivial, so it's mostly 
> about how the build system needs to change.
> I've created a PR to enable separate configuration of the Scala version for 
> the Kafka Core and Kafka-Streams-Scala.
> [https://github.com/apache/kafka/pull/15338]
> This is necessary because the Kafka-Streams-Scala Tests depend on Core, which 
> hasn't been ported to Scala 3 yet. The most important thing to figure out is 
> probably how CI and publishing should work – I'm not familiar with Kafka's CI 
> setup, so I'm going to need some input from the Kafka team regarding this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16237) Kafka-Streams-Scala for Scala 3

2024-02-08 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-16237:
---

Assignee: Matthias Berndt

> Kafka-Streams-Scala for Scala 3
> ---
>
> Key: KAFKA-16237
> URL: https://issues.apache.org/jira/browse/KAFKA-16237
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias Berndt
>Assignee: Matthias Berndt
>Priority: Major
>
> Kafka-Streams-Scala is currently not available for Scala 3. The required 
> changes to compile the codebase with Scala 3 are trivial, so it's mostly 
> about how the build system needs to change.
> I've created a PR to enable separate configuration of the Scala version for 
> the Kafka Core and Kafka-Streams-Scala.
> [https://github.com/apache/kafka/pull/15338]
> This is necessary because the Kafka-Streams-Scala Tests depend on Core, which 
> hasn't been ported to Scala 3 yet. The most important thing to figure out is 
> probably how CI and publishing should work – I'm not familiar with Kafka's CI 
> setup, so I'm going to need some input from the Kafka team regarding this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16236) Interactive Query v2 does not support Global KTables

2024-02-08 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16236?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16236.
-
Resolution: Duplicate

Thanks for filing this tickets. It's duplicate of 
https://issues.apache.org/jira/browse/KAFKA-13523 – so yes, we want to close 
this gap, but I don't think it's on the roadmap already...

> Interactive Query v2 does not support Global KTables
> 
>
> Key: KAFKA-16236
> URL: https://issues.apache.org/jira/browse/KAFKA-16236
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Christian Zügner
>Priority: Major
>
> Query Global KTable using IQ v2 API is currently not supported:
> java.lang.IllegalArgumentException: Cannot get result for failed query. 
> Failure is UNKNOWN_QUERY_TYPE: Global stores do not yet support the 
> KafkaStreams#query API. Use KafkaStreams#store instead.
> I would kindly as ask if this feature could be implemented for GlobalKTable 
> as well?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16236) Interactive Query v2 does not support Global KTables

2024-02-08 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815760#comment-17815760
 ] 

Matthias J. Sax edited comment on KAFKA-16236 at 2/8/24 4:52 PM:
-

Thanks for filing this tickets. It's a duplicate of 
https://issues.apache.org/jira/browse/KAFKA-13523 – so yes, we want to close 
this gap, but I don't think it's on the roadmap already...


was (Author: mjsax):
Thanks for filing this tickets. It's duplicate of 
https://issues.apache.org/jira/browse/KAFKA-13523 – so yes, we want to close 
this gap, but I don't think it's on the roadmap already...

> Interactive Query v2 does not support Global KTables
> 
>
> Key: KAFKA-16236
> URL: https://issues.apache.org/jira/browse/KAFKA-16236
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Christian Zügner
>Priority: Major
>
> Query Global KTable using IQ v2 API is currently not supported:
> java.lang.IllegalArgumentException: Cannot get result for failed query. 
> Failure is UNKNOWN_QUERY_TYPE: Global stores do not yet support the 
> KafkaStreams#query API. Use KafkaStreams#store instead.
> I would kindly as ask if this feature could be implemented for GlobalKTable 
> as well?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16236) Interactive Query v2 does not support Global KTables

2024-02-08 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16236?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16236.
-
Resolution: Duplicate

Thanks for filing this tickets. It's duplicate of 
https://issues.apache.org/jira/browse/KAFKA-13523 – so yes, we want to close 
this gap, but I don't think it's on the roadmap already...

> Interactive Query v2 does not support Global KTables
> 
>
> Key: KAFKA-16236
> URL: https://issues.apache.org/jira/browse/KAFKA-16236
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Christian Zügner
>Priority: Major
>
> Query Global KTable using IQ v2 API is currently not supported:
> java.lang.IllegalArgumentException: Cannot get result for failed query. 
> Failure is UNKNOWN_QUERY_TYPE: Global stores do not yet support the 
> KafkaStreams#query API. Use KafkaStreams#store instead.
> I would kindly as ask if this feature could be implemented for GlobalKTable 
> as well?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14747) FK join should record discarded subscription responses

2024-02-08 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815759#comment-17815759
 ] 

Matthias J. Sax commented on KAFKA-14747:
-

It seems [~kma] did never find time to pick it up – let's give him a few 
days to response – if he does not response, feel free to pick it up.

About testing: not sure from top of my head; could also just be a test gap. – 
IIRC (did not check the code), we don't use the dropped record sensor for this 
case, and thus it might not have been important to test; if we add tracking 
dropped records with this ticket, it seems reasonable to also start testing the 
sensor (if possible).

> FK join should record discarded subscription responses
> --
>
> Key: KAFKA-14747
> URL: https://issues.apache.org/jira/browse/KAFKA-14747
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>            Reporter: Matthias J. Sax
>Assignee: Koma Zhang
>Priority: Minor
>  Labels: beginner, newbie
>
> FK-joins are subject to a race condition: If the left-hand side record is 
> updated, a subscription is sent to the right-hand side (including a hash 
> value of the left-hand side record), and the right-hand side might send back 
> join responses (also including the original hash). The left-hand side only 
> processed the responses if the returned hash matches to current hash of the 
> left-hand side record, because a different hash implies that the lef- hand 
> side record was updated in the mean time (including sending a new 
> subscription to the right hand side), and thus the data is stale and the 
> response should not be processed (joining the response to the new record 
> could lead to incorrect results).
> A similar thing can happen on a right-hand side update that triggers a 
> response, that might be dropped if the left-hand side record was updated in 
> parallel.
> While the behavior is correct, we don't record if this happens. We should 
> consider to record this using the existing "dropped record" sensor or maybe 
> add a new sensor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Re-key by multiple properties without composite key

2024-02-07 Thread Matthias J. Sax

Using the DSL, this sounds about right.

I am not worried about the complexity -- KS can handle it, and it's not 
uncommon to end up with such topologies.


You might be able to cut down on complexity by not using the DSL, but 
the Processor API. It gives you more control, and thus you might be able 
to optimize the overall topology.


Maybe inspect the details of `TopologyDescription` to spot 
inefficiencies of the DSL generated Topology that might give you an idea 
how much you could optimize using Processor API (to estimate if it would 
be worth the effort).


It's hard to tell w/o knowing the details. It could also be just an 
inherently complex problem, and the DSL program is already as efficient 
as it gets...


Of course, there might also be ways to play with configs to cut down on 
latency to some extend, if e2e latency is your main concern. Again, I 
don't know the use case: for many case, sub-second latency is actually 
sufficient.


HTH.

-Matthias

On 2/7/24 7:41 AM, Karsten Stöckmann wrote:

Sorry for being late with the response - I've been quite busy working
on our Streams application lately.

That leads me back to my initial question. The Folder class contains
multiple fields with FK pointing to the Person table, all of them with
different semantics (customer, billing address, etc). So in order to
find _all_ folders related to a particular person regardless of its
role, I guess I need to

a) re-key the folder table on each person FK independently and then
b) outer join the result tables.

The entire topology is insanely complex, I've got around 10 tables
with different levels of nesting (e.g. folder -- 1:n --> dependency a
-- 1:n --> dependency b) that all need to be aggregated and in the end
re-keyed to person IDs in order to build an aggregate person. There
are 14 sub topologies... - measuring the e2e latency shows values
around 600ms which seems rather high to me. Does that sound crazy? ;)

Best wishes
Karsten

Am Do., 1. Feb. 2024 um 19:02 Uhr schrieb Matthias J. Sax :


I see. You need to ensure that you get _all_ Person.

For this case, I guess you are right. You would need to first aggregate
the folder per person:

KTable allPersonFolders =
  folder.groupBy((...) -> (folder.customerId, ...))
.aggregate(...)

And in a second step, do a left join:

result = personTable.leftJoin(allPersonFolders,...)



Reading the topic as a table directly did not work out as that crashed
the application; apparently reading the topic as a KTable and then
using that for three independent re-key-operations is not allowed.


Not sure if I can follow. What do you mean by "crashed". -- For tables,
there is no `selectKey()` nor  a `repartition()` as explained in my
previous reply. However, doing a `table.groupBy(...)` will set a new key
and repartition the data to your needs.


-Matthias


On 2/1/24 1:12 AM, Karsten Stöckmann wrote:

Thanks so much for taking a look. An FK-table-table join is an inner
join which implies there would be no Person entites without associated
Folders. Unfortunately, that's not the case. That lead me to an
attempt of re-keying the Folder topic by each of the three possible
foreign keys in order to be able to left join Persons against each of
the three re-keyed KTables to build an eventual Person aggregation
containing all possible Folders associated in any way.

Reading the topic as a table directly did not work out as that crashed
the application; apparently reading the topic as a KTable and then
using that for three independent re-key-operations is not allowed.

Best wishes,
Karsten

Am Do., 1. Feb. 2024 um 02:16 Uhr schrieb Matthias J. Sax :


Thanks for the details. This does make sense.

So it seems you can read all topic as table (ie, builder.table("topic")
-- no need to so `builder.stream().toTable()`).

And you can use the built-in FK-table-table join, and aggregate the result:

KTable result =
 folderTable
 .join(personTable, (folderId, folder) -> folder.customerId, ...)
 .groupBy((...) -> (personId, ...))
 .aggregate(...);
result.toStream().to("resultTopic");

Note the fk-extractor `(folderId, folder) -> folder.customerId` that
tells the join to use `customerId` from the `folderTable` to lookup the
person from personTable.

Think of `folderTable` as fact-table and `personTable` as dimension table.


KS will take care of everything else under the hood automatically.


-Matthias

On 1/30/24 11:25 AM, Karsten Stöckmann wrote:

Matthias, thanks for getting back on this. I'll try to illustrate my
intent with an example as I'm not yet fully familiar with Kafka
(Streams) and its idioms...

Assume classes Person and Folder:

class Person {
 Long id;
 String firstname;
 String lastname;
 // some content
}

class Folder {
 Long id;
 String folderNumber;
 // some other content
 Long customerId; // FK, points to Person.id
 Long billingAddressId; // FK, al

[jira] [Commented] (KAFKA-13292) InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id

2024-02-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815404#comment-17815404
 ] 

Matthias J. Sax commented on KAFKA-13292:
-

Sounds like a question about Spring... For a plain Java application using a 
`KafkaProducer` you would use a `try-catch-block` to handle this case – in the 
end, you would need to `close()` the producer and create a new producer 
instance to recover from the error w/o letting the thread die to begin with.

Thus, I don't know, as I am not familiar with Spring.

> InvalidPidMappingException: The producer attempted to use a producer id which 
> is not currently assigned to its transactional id
> ---
>
> Key: KAFKA-13292
> URL: https://issues.apache.org/jira/browse/KAFKA-13292
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: NEERAJ VAIDYA
>Priority: Major
>
> I have a KafkaStreams application which consumes from a topic which has 12 
> partitions. The incoming message rate into this topic is very low, perhaps 
> 3-4 per minute. Also, some partitions will not receive messages for more than 
> 7 days.
>  
> Exactly after 7 days of starting this application, I seem to be getting the 
> following exception and the application shuts down, without processing 
> anymore messages :
>  
> {code:java}
> 2021-09-10T12:21:59.636 [kafka-producer-network-thread | 
> mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] 
> INFO  o.a.k.c.p.i.TransactionManager - MSG=[Producer 
> clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer,
>  transactionalId=mtx-caf-0_2] Transiting to abortable error state due to 
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.
> 2021-09-10T12:21:59.642 [kafka-producer-network-thread | 
> mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] 
> ERROR o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_2] 
> Error encountered sending record to topic 
> mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to:
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.
> Exception handler choose to FAIL the processing, no more records would be 
> sent.
> 2021-09-10T12:21:59.740 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] ERROR 
> o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Encountered the 
> following exception during processing and the thread is going to shut down:
> org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
> record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to:
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.
> Exception handler choose to FAIL the processing, no more records would be 
> sent.
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:214)
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:186)
>         at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363)
>         at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
>         at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
>         at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
>         at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The 
> producer attempted to use a producer id which is not currently assigned to 
> its transactional id.
> 2021

[jira] [Commented] (KAFKA-12823) Remove Deprecated method KStream#through

2024-02-06 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815089#comment-17815089
 ] 

Matthias J. Sax commented on KAFKA-12823:
-

Look for ticket labeled "beginner" or "newbie": 
https://issues.apache.org/jira/browse/KAFKA-16209?jql=project%20%3D%20KAFKA%20AND%20labels%20in%20(Beginner%2C%20beginner%2C%20newbie%2C%20%22newbie%2B%2B%22)%20ORDER%20BY%20created%20DESC%2C%20priority%20DESC%2C%20updated%20DESC
 

> Remove Deprecated method KStream#through
> 
>
> Key: KAFKA-12823
> URL: https://issues.apache.org/jira/browse/KAFKA-12823
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Josep Prat
>Priority: Blocker
> Fix For: 4.0.0
>
>
> The method through in Java and Scala class KStream was deprecated in version 
> 2.6:
>  * org.apache.kafka.streams.scala.kstream.KStream#through
>  * org.apache.kafka.streams.kstream.KStream#through(java.lang.String)
>  * org.apache.kafka.streams.kstream.KStream#through(java.lang.String, 
> org.apache.kafka.streams.kstream.Produced)
>  
> See KAFKA-10003 and KIP-221



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14567) Kafka Streams crashes after ProducerFencedException

2024-02-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14567:

Description: 
Running a Kafka Streams application with EOS-v2.

We first see a `ProducerFencedException`. After the fencing, the fenced thread 
crashed resulting in a non-recoverable error:
{quote}[2022-12-22 13:49:13,423] ERROR [i-0c291188ec2ae17a0-StreamThread-3] 
stream-thread [i-0c291188ec2ae17a0-StreamThread-3] Failed to process stream 
task 1_2 due to the following error: 
(org.apache.kafka.streams.processor.internals.TaskExecutor)
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. 
taskId=1_2, processor=KSTREAM-SOURCE-05, topic=node-name-repartition, 
partition=2, offset=539776276, stacktrace=java.lang.IllegalStateException: 
TransactionalId stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: 
Invalid transition attempted from state FATAL_ERROR to state ABORTABLE_ERROR
at 
org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
at 
org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394)
at 
org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620)
at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079)
at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959)
at 
org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257)
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207)
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
at 
org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
at 
org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
at 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791)
at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867)
at 
org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791)
at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722)
at 
org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
at 
org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76)
at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569)
at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:748)
at 
org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
at 
org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76)
at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569)
Caused by: java.lang.IllegalStateException: TransactionalId 
stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition 
attempted from state FATAL_ERROR to state ABORTABLE_ERROR

[jira] [Resolved] (KAFKA-16221) IllegalStateException from Producer

2024-02-05 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16221.
-
Resolution: Fixed

> IllegalStateException from Producer
> ---
>
> Key: KAFKA-16221
> URL: https://issues.apache.org/jira/browse/KAFKA-16221
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.0
>        Reporter: Matthias J. Sax
>Priority: Critical
> Fix For: 3.7.0
>
>
> https://issues.apache.org/jira/browse/KAFKA-14831 fixed a producer bug about 
> internal TX state transition and the producer is now throwing an 
> IllegalStateException in situations it did swallow an internal error before.
> This change surfaces a bug in Kafka Streams: Kafka Streams calls 
> `abortTransaction()` blindly when a task is closed dirty, even if the 
> Producer is already in an internal fatal state. However, if the Producer is 
> in a fatal state, Kafka Streams should skip `abortTransaction` and only 
> `close()` the Producer when closing a task dirty.
> The bug is surfaced after `commitTransaction()` did timeout or after an 
> `InvalidProducerEpochException` from a `send()` call, leading to the call to 
> `abortTransaction()` – Kafka Streams does not track right now if a commit-TX 
> is in progress.
> {code:java}
> java.lang.IllegalStateException: Cannot attempt operation `abortTransaction` 
> because the previous call to `commitTransaction` timed out and must be retried
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1203)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:326)
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:274) {code}
> and
> {code:java}
> [2024-01-16 04:19:32,584] ERROR [kafka-producer-network-thread | 
> i-01aea6907970b1bf6-StreamThread-1-producer] stream-thread 
> [i-01aea6907970b1bf6-StreamThread-1] stream-task [1_2] Error encountered 
> sending r   ecord to topic joined-counts for task 1_2 due to:
> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
> attempted to produce with an old epoch.
> Written offsets would not be recorded and no more records would be sent since 
> the producer is fenced, indicating the task may be migrated out 
> (org.apache.kafka.streams.processor.internals.RecordCollectorImp   l)
> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
> attempted to produce with an old epoch.
> // followed by
> [2024-01-16 04:19:32,587] ERROR [kafka-producer-network-thread | 
> i-01aea6907970b1bf6-StreamThread-1-producer] [Producer 
> clientId=i-01aea6907970b1bf6-StreamThread-1-producer, 
> transactionalId=stream-soak-test   
> -bbb995dc-1ba2-41ed-8791-0512ab4b904d-1] Aborting producer batches due to 
> fatal error (org.apache.kafka.clients.producer.internals.Sender)
> java.lang.IllegalStateException: TransactionalId 
> stream-soak-test-bbb995dc-1ba2-41ed-8791-0512ab4b904d-1: Invalid transition 
> attempted from state FATAL_ERROR to state ABORTABLE_ERROR
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:996)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:451)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:664)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.handleFailedBatch(TransactionManager.java:669)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:835)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:819)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:771)
> at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:702)
> at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$null$1(Sender.java:627)
> at java.util.ArrayList.forEach(ArrayList.java:1259)
> at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$2(Sender.java:612)
> at java.lang.Iterable.forEach(Iterable.java:75)
> at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:612)
> at 
> org.apache.kafka.clients.produc

[jira] [Commented] (KAFKA-16221) IllegalStateException from Producer

2024-02-05 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814437#comment-17814437
 ] 

Matthias J. Sax commented on KAFKA-16221:
-

Marking this as resolved to 3.7 for now – we might want to re-open this ticket 
after 3.7.0 was release for a proper fix.

> IllegalStateException from Producer
> ---
>
> Key: KAFKA-16221
> URL: https://issues.apache.org/jira/browse/KAFKA-16221
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.0
>        Reporter: Matthias J. Sax
>Priority: Critical
> Fix For: 3.7.0
>
>
> https://issues.apache.org/jira/browse/KAFKA-14831 fixed a producer bug about 
> internal TX state transition and the producer is now throwing an 
> IllegalStateException in situations it did swallow an internal error before.
> This change surfaces a bug in Kafka Streams: Kafka Streams calls 
> `abortTransaction()` blindly when a task is closed dirty, even if the 
> Producer is already in an internal fatal state. However, if the Producer is 
> in a fatal state, Kafka Streams should skip `abortTransaction` and only 
> `close()` the Producer when closing a task dirty.
> The bug is surfaced after `commitTransaction()` did timeout or after an 
> `InvalidProducerEpochException` from a `send()` call, leading to the call to 
> `abortTransaction()` – Kafka Streams does not track right now if a commit-TX 
> is in progress.
> {code:java}
> java.lang.IllegalStateException: Cannot attempt operation `abortTransaction` 
> because the previous call to `commitTransaction` timed out and must be retried
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1203)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:326)
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:274) {code}
> and
> {code:java}
> [2024-01-16 04:19:32,584] ERROR [kafka-producer-network-thread | 
> i-01aea6907970b1bf6-StreamThread-1-producer] stream-thread 
> [i-01aea6907970b1bf6-StreamThread-1] stream-task [1_2] Error encountered 
> sending r   ecord to topic joined-counts for task 1_2 due to:
> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
> attempted to produce with an old epoch.
> Written offsets would not be recorded and no more records would be sent since 
> the producer is fenced, indicating the task may be migrated out 
> (org.apache.kafka.streams.processor.internals.RecordCollectorImp   l)
> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
> attempted to produce with an old epoch.
> // followed by
> [2024-01-16 04:19:32,587] ERROR [kafka-producer-network-thread | 
> i-01aea6907970b1bf6-StreamThread-1-producer] [Producer 
> clientId=i-01aea6907970b1bf6-StreamThread-1-producer, 
> transactionalId=stream-soak-test   
> -bbb995dc-1ba2-41ed-8791-0512ab4b904d-1] Aborting producer batches due to 
> fatal error (org.apache.kafka.clients.producer.internals.Sender)
> java.lang.IllegalStateException: TransactionalId 
> stream-soak-test-bbb995dc-1ba2-41ed-8791-0512ab4b904d-1: Invalid transition 
> attempted from state FATAL_ERROR to state ABORTABLE_ERROR
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:996)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:451)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:664)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.handleFailedBatch(TransactionManager.java:669)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:835)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:819)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:771)
> at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:702)
> at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$null$1(Sender.java:627)
> at java.util.ArrayList.forEach(ArrayList.java:1259)
> at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$2(Sender.java:612)
> at java.lang.Iterable.forEach(Iterable.java:75)
> at 
> org.apache.kafka.clients.producer.internals.Se

[jira] [Resolved] (KAFKA-16221) IllegalStateException from Producer

2024-02-05 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16221.
-
Resolution: Fixed

> IllegalStateException from Producer
> ---
>
> Key: KAFKA-16221
> URL: https://issues.apache.org/jira/browse/KAFKA-16221
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.0
>        Reporter: Matthias J. Sax
>Priority: Critical
> Fix For: 3.7.0
>
>
> https://issues.apache.org/jira/browse/KAFKA-14831 fixed a producer bug about 
> internal TX state transition and the producer is now throwing an 
> IllegalStateException in situations it did swallow an internal error before.
> This change surfaces a bug in Kafka Streams: Kafka Streams calls 
> `abortTransaction()` blindly when a task is closed dirty, even if the 
> Producer is already in an internal fatal state. However, if the Producer is 
> in a fatal state, Kafka Streams should skip `abortTransaction` and only 
> `close()` the Producer when closing a task dirty.
> The bug is surfaced after `commitTransaction()` did timeout or after an 
> `InvalidProducerEpochException` from a `send()` call, leading to the call to 
> `abortTransaction()` – Kafka Streams does not track right now if a commit-TX 
> is in progress.
> {code:java}
> java.lang.IllegalStateException: Cannot attempt operation `abortTransaction` 
> because the previous call to `commitTransaction` timed out and must be retried
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1203)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:326)
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:274) {code}
> and
> {code:java}
> [2024-01-16 04:19:32,584] ERROR [kafka-producer-network-thread | 
> i-01aea6907970b1bf6-StreamThread-1-producer] stream-thread 
> [i-01aea6907970b1bf6-StreamThread-1] stream-task [1_2] Error encountered 
> sending r   ecord to topic joined-counts for task 1_2 due to:
> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
> attempted to produce with an old epoch.
> Written offsets would not be recorded and no more records would be sent since 
> the producer is fenced, indicating the task may be migrated out 
> (org.apache.kafka.streams.processor.internals.RecordCollectorImp   l)
> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
> attempted to produce with an old epoch.
> // followed by
> [2024-01-16 04:19:32,587] ERROR [kafka-producer-network-thread | 
> i-01aea6907970b1bf6-StreamThread-1-producer] [Producer 
> clientId=i-01aea6907970b1bf6-StreamThread-1-producer, 
> transactionalId=stream-soak-test   
> -bbb995dc-1ba2-41ed-8791-0512ab4b904d-1] Aborting producer batches due to 
> fatal error (org.apache.kafka.clients.producer.internals.Sender)
> java.lang.IllegalStateException: TransactionalId 
> stream-soak-test-bbb995dc-1ba2-41ed-8791-0512ab4b904d-1: Invalid transition 
> attempted from state FATAL_ERROR to state ABORTABLE_ERROR
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:996)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:451)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:664)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.handleFailedBatch(TransactionManager.java:669)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:835)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:819)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:771)
> at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:702)
> at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$null$1(Sender.java:627)
> at java.util.ArrayList.forEach(ArrayList.java:1259)
> at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$2(Sender.java:612)
> at java.lang.Iterable.forEach(Iterable.java:75)
> at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:612)
> at 
> org.apache.kafka.clients.produc

[jira] [Updated] (KAFKA-16221) IllegalStateException from Producer

2024-02-05 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16221:

Fix Version/s: 3.7.0

> IllegalStateException from Producer
> ---
>
> Key: KAFKA-16221
> URL: https://issues.apache.org/jira/browse/KAFKA-16221
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.0
>        Reporter: Matthias J. Sax
>Priority: Critical
> Fix For: 3.7.0
>
>
> https://issues.apache.org/jira/browse/KAFKA-14831 fixed a producer bug about 
> internal TX state transition and the producer is now throwing an 
> IllegalStateException in situations it did swallow an internal error before.
> This change surfaces a bug in Kafka Streams: Kafka Streams calls 
> `abortTransaction()` blindly when a task is closed dirty, even if the 
> Producer is already in an internal fatal state. However, if the Producer is 
> in a fatal state, Kafka Streams should skip `abortTransaction` and only 
> `close()` the Producer when closing a task dirty.
> The bug is surfaced after `commitTransaction()` did timeout or after an 
> `InvalidProducerEpochException` from a `send()` call, leading to the call to 
> `abortTransaction()` – Kafka Streams does not track right now if a commit-TX 
> is in progress.
> {code:java}
> java.lang.IllegalStateException: Cannot attempt operation `abortTransaction` 
> because the previous call to `commitTransaction` timed out and must be retried
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1203)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:326)
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:274) {code}
> and
> {code:java}
> [2024-01-16 04:19:32,584] ERROR [kafka-producer-network-thread | 
> i-01aea6907970b1bf6-StreamThread-1-producer] stream-thread 
> [i-01aea6907970b1bf6-StreamThread-1] stream-task [1_2] Error encountered 
> sending r   ecord to topic joined-counts for task 1_2 due to:
> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
> attempted to produce with an old epoch.
> Written offsets would not be recorded and no more records would be sent since 
> the producer is fenced, indicating the task may be migrated out 
> (org.apache.kafka.streams.processor.internals.RecordCollectorImp   l)
> org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
> attempted to produce with an old epoch.
> // followed by
> [2024-01-16 04:19:32,587] ERROR [kafka-producer-network-thread | 
> i-01aea6907970b1bf6-StreamThread-1-producer] [Producer 
> clientId=i-01aea6907970b1bf6-StreamThread-1-producer, 
> transactionalId=stream-soak-test   
> -bbb995dc-1ba2-41ed-8791-0512ab4b904d-1] Aborting producer batches due to 
> fatal error (org.apache.kafka.clients.producer.internals.Sender)
> java.lang.IllegalStateException: TransactionalId 
> stream-soak-test-bbb995dc-1ba2-41ed-8791-0512ab4b904d-1: Invalid transition 
> attempted from state FATAL_ERROR to state ABORTABLE_ERROR
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:996)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:451)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:664)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.handleFailedBatch(TransactionManager.java:669)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:835)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:819)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:771)
> at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:702)
> at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$null$1(Sender.java:627)
> at java.util.ArrayList.forEach(ArrayList.java:1259)
> at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$2(Sender.java:612)
> at java.lang.Iterable.forEach(Iterable.java:75)
> at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:612)
> at 
> org.apache.kafka.clients.produc

[jira] [Updated] (KAFKA-16221) IllegalStateException from Producer

2024-02-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16221:

Description: 
https://issues.apache.org/jira/browse/KAFKA-14831 fixed a producer bug about 
internal TX state transition and the producer is now throwing an 
IllegalStateException in situations it did swallow an internal error before.

This change surfaces a bug in Kafka Streams: Kafka Streams calls 
`abortTransaction()` blindly when a task is closed dirty, even if the Producer 
is already in an internal fatal state. However, if the Producer is in a fatal 
state, Kafka Streams should skip `abortTransaction` and only `close()` the 
Producer when closing a task dirty.

The bug is surfaced after `commitTransaction()` did timeout or after an 
`InvalidProducerEpochException` from a `send()` call, leading to the call to 
`abortTransaction()` – Kafka Streams does not track right now if a commit-TX is 
in progress.
{code:java}
java.lang.IllegalStateException: Cannot attempt operation `abortTransaction` 
because the previous call to `commitTransaction` timed out and must be retried
at 
org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1203)
at 
org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:326)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:274) 
{code}
and
{code:java}
[2024-01-16 04:19:32,584] ERROR [kafka-producer-network-thread | 
i-01aea6907970b1bf6-StreamThread-1-producer] stream-thread 
[i-01aea6907970b1bf6-StreamThread-1] stream-task [1_2] Error encountered 
sending r   ecord to topic joined-counts for task 1_2 due to:
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
attempted to produce with an old epoch.
Written offsets would not be recorded and no more records would be sent since 
the producer is fenced, indicating the task may be migrated out 
(org.apache.kafka.streams.processor.internals.RecordCollectorImp   l)
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
attempted to produce with an old epoch.

// followed by

[2024-01-16 04:19:32,587] ERROR [kafka-producer-network-thread | 
i-01aea6907970b1bf6-StreamThread-1-producer] [Producer 
clientId=i-01aea6907970b1bf6-StreamThread-1-producer, 
transactionalId=stream-soak-test   -bbb995dc-1ba2-41ed-8791-0512ab4b904d-1] 
Aborting producer batches due to fatal error 
(org.apache.kafka.clients.producer.internals.Sender)
java.lang.IllegalStateException: TransactionalId 
stream-soak-test-bbb995dc-1ba2-41ed-8791-0512ab4b904d-1: Invalid transition 
attempted from state FATAL_ERROR to state ABORTABLE_ERROR
at 
org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:996)
at 
org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:451)
at 
org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:664)
at 
org.apache.kafka.clients.producer.internals.TransactionManager.handleFailedBatch(TransactionManager.java:669)
at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:835)
at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:819)
at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:771)
at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:702)
at 
org.apache.kafka.clients.producer.internals.Sender.lambda$null$1(Sender.java:627)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at 
org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$2(Sender.java:612)
at java.lang.Iterable.forEach(Iterable.java:75)
at 
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:612)
at 
org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$8(Sender.java:917)
at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600)
at 
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:460)
at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:337)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:252)
at java.lang.Thread.run(Thread.java:750) {code}
If the Producer throws an IllegalStateException on `abortTransaction()` Kafka 
Streams treats this exception ("correctly") as fatal, and StreamsThread dies. 
However, Kaf

[jira] [Updated] (KAFKA-16221) IllegalStateException from Producer

2024-02-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16221:

Description: 
https://issues.apache.org/jira/browse/KAFKA-14831 fixed a producer bug about 
internal TX state transition and the producer is now throwing an 
IllegalStateException in situations it did swallow an internal error before.

This change surfaces a bug in Kafka Streams: Kafka Streams calls 
`abortTransaction()` blindly when a task is closed dirty, even if the Producer 
is already in an internal fatal state. However, if the Producer is in a fatal 
state, Kafka Streams should skip `abortTransaction` and only `close()` the 
Producer when closing a task dirty.

The bug is surfaced after `commitTransaction()` did timeout or after an 
`InvalidProducerEpochException` from a `send()` call, leading to the call to 
`abortTransaction()` – Kafka Streams does not track right now if a commit-TX is 
in progress.
{code:java}
java.lang.IllegalStateException: Cannot attempt operation `abortTransaction` 
because the previous call to `commitTransaction` timed out and must be retried
at 
org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1203)
at 
org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:326)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:274) 
{code}
and
{code:java}
[2024-01-16 04:19:32,584] ERROR [kafka-producer-network-thread | 
i-01aea6907970b1bf6-StreamThread-1-producer] stream-thread 
[i-01aea6907970b1bf6-StreamThread-1] stream-task [1_2] Error encountered 
sending r   ecord to topic joined-counts for task 1_2 due to:
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
attempted to produce with an old epoch.
Written offsets would not be recorded and no more records would be sent since 
the producer is fenced, indicating the task may be migrated out 
(org.apache.kafka.streams.processor.internals.RecordCollectorImp   l)
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
attempted to produce with an old epoch.

// followed by

[2024-01-16 04:19:32,587] ERROR [kafka-producer-network-thread | 
i-01aea6907970b1bf6-StreamThread-1-producer] [Producer 
clientId=i-01aea6907970b1bf6-StreamThread-1-producer, 
transactionalId=stream-soak-test   -bbb995dc-1ba2-41ed-8791-0512ab4b904d-1] 
Aborting producer batches due to fatal error 
(org.apache.kafka.clients.producer.internals.Sender)
java.lang.IllegalStateException: TransactionalId 
stream-soak-test-bbb995dc-1ba2-41ed-8791-0512ab4b904d-1: Invalid transition 
attempted from state FATAL_ERROR to state ABORTABLE_ERROR
at 
org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:996)
at 
org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:451)
at 
org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:664)
at 
org.apache.kafka.clients.producer.internals.TransactionManager.handleFailedBatch(TransactionManager.java:669)
at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:835)
at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:819)
at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:771)
at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:702)
at 
org.apache.kafka.clients.producer.internals.Sender.lambda$null$1(Sender.java:627)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at 
org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$2(Sender.java:612)
at java.lang.Iterable.forEach(Iterable.java:75)
at 
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:612)
at 
org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$8(Sender.java:917)
at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600)
at 
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:460)
at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:337)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:252)
at java.lang.Thread.run(Thread.java:750) {code}
If the Producer throws an IllegalStateException on `abortTransaction()` Kafka 
Streams treats this exception ("correctly") as fatal, and StreamsThread dies. 
However, Kaf

[jira] [Updated] (KAFKA-16221) IllegalStateException from Producer

2024-02-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16221:

Description: 
https://issues.apache.org/jira/browse/KAFKA-14831 fixed a producer bug about 
internal TX state transition and the producer is now throwing an 
IllegalStateException in situations it did swallow an internal error before.

This change surfaces a bug in Kafka Streams: Kafka Streams calls 
`abortTransaction()` blindly when a task is closed dirty, even if the Producer 
is already in an internal fatal state. However, if the Producer is in a fatal 
state, Kafka Streams should skip `abortTransaction` and only `close()` the 
Producer when closing a task dirty.

The bug is surfaced after `commitTransaction()` did timeout and another error 
happens leading to the call to `abortTransaction()` – Kafka Streams does not 
track right now if a commit-TX is in progress.

If the Producer throws an IllegalStateException on `abortTransaction()` Kafka 
Streams treats this exception ("correctly") as fatal, and StreamsThread dies. 
However, Kafka Streams is actually in a state in which it can recover from, and 
thus should not let StreamThread die by carry forward (by not calling 
`abortTransaction()` and moving forward with the dirty close of the task).

  was:
https://issues.apache.org/jira/browse/KAFKA-14831 fixed a producer bug about 
internal TX state transition and the producer is now throwing an 
IllegalStateException in situations it did swallow an internal error before.

This change surfaces a bug in Kafka Streams: Kafka Streams calls 
`abortTransaction()` blindly when a task is closed dirty, even if the Producer 
is already in an internal fatal state. However, if the Producer is in a fatal 
state, Kafka Streams should skip `abortTransaction` and only `close()` the 
Producer when closing a task dirty.

If the Producer throws an IllegalStateException on `abortTransaction()` Kafka 
Streams treats this exception ("correctly") as fatal, and StreamsThread dies. 
However, Kafka Streams is actually in a state in which it can recover from, and 
thus should not let StreamThread die by carry forward (by not calling 
`abortTransaction()` and moving forward with the dirty close of the task).


> IllegalStateException from Producer
> ---
>
> Key: KAFKA-16221
> URL: https://issues.apache.org/jira/browse/KAFKA-16221
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.0
>Reporter: Matthias J. Sax
>Priority: Critical
>
> https://issues.apache.org/jira/browse/KAFKA-14831 fixed a producer bug about 
> internal TX state transition and the producer is now throwing an 
> IllegalStateException in situations it did swallow an internal error before.
> This change surfaces a bug in Kafka Streams: Kafka Streams calls 
> `abortTransaction()` blindly when a task is closed dirty, even if the 
> Producer is already in an internal fatal state. However, if the Producer is 
> in a fatal state, Kafka Streams should skip `abortTransaction` and only 
> `close()` the Producer when closing a task dirty.
> The bug is surfaced after `commitTransaction()` did timeout and another error 
> happens leading to the call to `abortTransaction()` – Kafka Streams does not 
> track right now if a commit-TX is in progress.
> If the Producer throws an IllegalStateException on `abortTransaction()` Kafka 
> Streams treats this exception ("correctly") as fatal, and StreamsThread dies. 
> However, Kafka Streams is actually in a state in which it can recover from, 
> and thus should not let StreamThread die by carry forward (by not calling 
> `abortTransaction()` and moving forward with the dirty close of the task).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16221) IllegalStateException from Producer

2024-02-04 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16221:
---

 Summary: IllegalStateException from Producer
 Key: KAFKA-16221
 URL: https://issues.apache.org/jira/browse/KAFKA-16221
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.6.0
Reporter: Matthias J. Sax


https://issues.apache.org/jira/browse/KAFKA-14831 fixed a producer bug about 
internal TX state transition and the producer is now throwing an 
IllegalStateException in situations it did swallow an internal error before.

This change surfaces a bug in Kafka Streams: Kafka Streams calls 
`abortTransaction()` blindly when a task is closed dirty, even if the Producer 
is already in an internal fatal state. However, if the Producer is in a fatal 
state, Kafka Streams should skip `abortTransaction` and only `close()` the 
Producer when closing a task dirty.

If the Producer throws an IllegalStateException on `abortTransaction()` Kafka 
Streams treats this exception ("correctly") as fatal, and StreamsThread dies. 
However, Kafka Streams is actually in a state in which it can recover from, and 
thus should not let StreamThread die by carry forward (by not calling 
`abortTransaction()` and moving forward with the dirty close of the task).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16221) IllegalStateException from Producer

2024-02-04 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16221:
---

 Summary: IllegalStateException from Producer
 Key: KAFKA-16221
 URL: https://issues.apache.org/jira/browse/KAFKA-16221
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.6.0
Reporter: Matthias J. Sax


https://issues.apache.org/jira/browse/KAFKA-14831 fixed a producer bug about 
internal TX state transition and the producer is now throwing an 
IllegalStateException in situations it did swallow an internal error before.

This change surfaces a bug in Kafka Streams: Kafka Streams calls 
`abortTransaction()` blindly when a task is closed dirty, even if the Producer 
is already in an internal fatal state. However, if the Producer is in a fatal 
state, Kafka Streams should skip `abortTransaction` and only `close()` the 
Producer when closing a task dirty.

If the Producer throws an IllegalStateException on `abortTransaction()` Kafka 
Streams treats this exception ("correctly") as fatal, and StreamsThread dies. 
However, Kafka Streams is actually in a state in which it can recover from, and 
thus should not let StreamThread die by carry forward (by not calling 
`abortTransaction()` and moving forward with the dirty close of the task).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16217) Transactional producer stuck in IllegalStateException during close

2024-02-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16217:

Component/s: producer 

> Transactional producer stuck in IllegalStateException during close
> --
>
> Key: KAFKA-16217
> URL: https://issues.apache.org/jira/browse/KAFKA-16217
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Calvin Liu
>Priority: Major
>
> The producer is stuck during the close. It keeps retrying to abort the 
> transaction but it never succeeds. 
> {code:java}
> [ERROR] 2024-02-01 17:21:22,804 [kafka-producer-network-thread | 
> producer-transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] 
> org.apache.kafka.clients.producer.internals.Sender run - [Producer 
> clientId=producer-transaction-ben
> ch-transaction-id-f60SGdyRQGGFjdgg3vUgKg, 
> transactionalId=transaction-bench-transaction-id-f60SGdyRQGGFjdgg3vUgKg] 
> Error in kafka producer I/O thread while aborting transaction:
> java.lang.IllegalStateException: Cannot attempt operation `abortTransaction` 
> because the previous call to `commitTransaction` timed out and must be retried
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.handleCachedTransactionRequestResult(TransactionManager.java:1138)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:323)
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:274)
> at java.base/java.lang.Thread.run(Thread.java:1583)
> at org.apache.kafka.common.utils.KafkaThread.run(KafkaThread.java:66) 
> {code}
> With the additional log, I found the root cause. If the producer is in a bad 
> transaction state(in my case, the TransactionManager.pendingTransition was 
> set to commitTransaction and did not get cleaned), then the producer calls 
> close and tries to abort the existing transaction, the producer will get 
> stuck in the transaction abortion. It is related to the fix 
> [https://github.com/apache/kafka/pull/13591].
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Re-key by multiple properties without composite key

2024-02-01 Thread Matthias J. Sax

I see. You need to ensure that you get _all_ Person.

For this case, I guess you are right. You would need to first aggregate 
the folder per person:


KTable allPersonFolders =
folder.groupBy((...) -> (folder.customerId, ...))
  .aggregate(...)

And in a second step, do a left join:

result = personTable.leftJoin(allPersonFolders,...)



Reading the topic as a table directly did not work out as that crashed
the application; apparently reading the topic as a KTable and then
using that for three independent re-key-operations is not allowed.


Not sure if I can follow. What do you mean by "crashed". -- For tables, 
there is no `selectKey()` nor  a `repartition()` as explained in my 
previous reply. However, doing a `table.groupBy(...)` will set a new key 
and repartition the data to your needs.



-Matthias


On 2/1/24 1:12 AM, Karsten Stöckmann wrote:

Thanks so much for taking a look. An FK-table-table join is an inner
join which implies there would be no Person entites without associated
Folders. Unfortunately, that's not the case. That lead me to an
attempt of re-keying the Folder topic by each of the three possible
foreign keys in order to be able to left join Persons against each of
the three re-keyed KTables to build an eventual Person aggregation
containing all possible Folders associated in any way.

Reading the topic as a table directly did not work out as that crashed
the application; apparently reading the topic as a KTable and then
using that for three independent re-key-operations is not allowed.

Best wishes,
Karsten

Am Do., 1. Feb. 2024 um 02:16 Uhr schrieb Matthias J. Sax :


Thanks for the details. This does make sense.

So it seems you can read all topic as table (ie, builder.table("topic")
-- no need to so `builder.stream().toTable()`).

And you can use the built-in FK-table-table join, and aggregate the result:

KTable result =
folderTable
.join(personTable, (folderId, folder) -> folder.customerId, ...)
.groupBy((...) -> (personId, ...))
.aggregate(...);
result.toStream().to("resultTopic");

Note the fk-extractor `(folderId, folder) -> folder.customerId` that
tells the join to use `customerId` from the `folderTable` to lookup the
person from personTable.

Think of `folderTable` as fact-table and `personTable` as dimension table.


KS will take care of everything else under the hood automatically.


-Matthias

On 1/30/24 11:25 AM, Karsten Stöckmann wrote:

Matthias, thanks for getting back on this. I'll try to illustrate my
intent with an example as I'm not yet fully familiar with Kafka
(Streams) and its idioms...

Assume classes Person and Folder:

class Person {
Long id;
String firstname;
String lastname;
// some content
}

class Folder {
Long id;
String folderNumber;
// some other content
Long customerId; // FK, points to Person.id
Long billingAddressId; // FK, also points to Person.id
}

Thus both foreign keys of Folder point to Person entities, yet with
different semantics. They're not composite keys but act independently.

Now assume I want to build an aggregate Person object containing
Folder.folderNumber of all folders associated with a Person entity,
regardless whether it acts as a customer or billing address. My
(naive) idea was to build re-keyed KTables by Folder.customerId and
Folder.billingAddressId and then joining / aggregating them with the
Person KTable in order to build something like this:

class AggregatedPerson {
Long id;
List folderNumbers; // or even List
// ...
}

(The latter supposed to be written to an output topic in order to
serve as input for Solr or ElasticSearch.)

Does this even make sense?



If you read the topic a KTable, you cannot repartition because it
violates the contract. A KTable must be partitioned by it's primary key,
ie, the ID field, and thus the DSL does not offer you a repartition option.


So re-key means repartition? ATM the partition size of all input
topics is 1 as per Kafka UI, as I've specified no extra configuration
for them.

Best wishes,
Karsten

Am Di., 30. Jan. 2024 um 20:03 Uhr schrieb Matthias J. Sax :



Both fk1 and fk2 point to the PK of another entity (not shown for
brevity, of no relevance to the question).


It this two independent FK, or one two-column FK?



Ingesting the topic into a Kafka Streams application, how can I re-key
the resulting KTable by both fk1 and fk2?


If you read the topic a KTable, you cannot repartition because it
violates the contract. A KTable must be partitioned by it's primary key,
ie, the ID field, and thus the DSL does not offer you a repartition option.

You could read the topic as KStream though, and provide a custom
`StreamPartitioner` for a `repartition()` operation. However, this is
also "dangerous" because for a KStream it's also assumed that it's
partitioned by it's key, and you might break downstream DSL operators
with such a violation of the

Re: Re-key by multiple properties without composite key

2024-01-31 Thread Matthias J. Sax

Thanks for the details. This does make sense.

So it seems you can read all topic as table (ie, builder.table("topic") 
-- no need to so `builder.stream().toTable()`).


And you can use the built-in FK-table-table join, and aggregate the result:

KTable result =
  folderTable
  .join(personTable, (folderId, folder) -> folder.customerId, ...)
  .groupBy((...) -> (personId, ...))
  .aggregate(...);
result.toStream().to("resultTopic");

Note the fk-extractor `(folderId, folder) -> folder.customerId` that 
tells the join to use `customerId` from the `folderTable` to lookup the 
person from personTable.


Think of `folderTable` as fact-table and `personTable` as dimension table.


KS will take care of everything else under the hood automatically.


-Matthias

On 1/30/24 11:25 AM, Karsten Stöckmann wrote:

Matthias, thanks for getting back on this. I'll try to illustrate my
intent with an example as I'm not yet fully familiar with Kafka
(Streams) and its idioms...

Assume classes Person and Folder:

class Person {
   Long id;
   String firstname;
   String lastname;
   // some content
}

class Folder {
   Long id;
   String folderNumber;
   // some other content
   Long customerId; // FK, points to Person.id
   Long billingAddressId; // FK, also points to Person.id
}

Thus both foreign keys of Folder point to Person entities, yet with
different semantics. They're not composite keys but act independently.

Now assume I want to build an aggregate Person object containing
Folder.folderNumber of all folders associated with a Person entity,
regardless whether it acts as a customer or billing address. My
(naive) idea was to build re-keyed KTables by Folder.customerId and
Folder.billingAddressId and then joining / aggregating them with the
Person KTable in order to build something like this:

class AggregatedPerson {
   Long id;
   List folderNumbers; // or even List
   // ...
}

(The latter supposed to be written to an output topic in order to
serve as input for Solr or ElasticSearch.)

Does this even make sense?



If you read the topic a KTable, you cannot repartition because it
violates the contract. A KTable must be partitioned by it's primary key,
ie, the ID field, and thus the DSL does not offer you a repartition option.


So re-key means repartition? ATM the partition size of all input
topics is 1 as per Kafka UI, as I've specified no extra configuration
for them.

Best wishes,
Karsten

Am Di., 30. Jan. 2024 um 20:03 Uhr schrieb Matthias J. Sax :



Both fk1 and fk2 point to the PK of another entity (not shown for
brevity, of no relevance to the question).


It this two independent FK, or one two-column FK?



Ingesting the topic into a Kafka Streams application, how can I re-key
the resulting KTable by both fk1 and fk2?


If you read the topic a KTable, you cannot repartition because it
violates the contract. A KTable must be partitioned by it's primary key,
ie, the ID field, and thus the DSL does not offer you a repartition option.

You could read the topic as KStream though, and provide a custom
`StreamPartitioner` for a `repartition()` operation. However, this is
also "dangerous" because for a KStream it's also assumed that it's
partitioned by it's key, and you might break downstream DSL operators
with such a violation of the "contract".

Looking into your solution:


.toTable()
 .groupBy(
 (key, value) -> KeyValue.pair(value.fk1(), value),
 Grouped.with(...))


This will set fk1 as key, what seems not to align with you previous
comment about the key should stay the ID? (Same for f2k).

Your last step seems to join fk1-fk2 -- is this on purpose? I guess it's
unclear what you try to actually do to begin with? It sound like it's
overall a self-join of the input topic on fk1 and fk2 ?


-Matthias

On 1/28/24 2:24 AM, Karsten Stöckmann wrote:

Hi all,

just stumbled upon another Kafka Streams issue that keeps me busy these days.

Assume a (simplified) class A like this:

class A {
  private Long id;
  private String someContent;
  private Long fk1;
  private Long fk2;
  // Getters and setters accordingly
}

Both fk1 and fk2 point to the PK of another entity (not shown for
brevity, of no relevance to the question).

Now assume a Kafka topic built from instances of class A, keyed by its
id (see above).

Ingesting the topic into a Kafka Streams application, how can I re-key
the resulting KTable by both fk1 and fk2? Note that the
resulting key should not be changed or turned into some kind of
composite key as it is used in later join operations.

My (naive) solution involves creating two KTables from the input
stream, re-keying them by fk1 and fk2 accordingly and then outer
joining both resulting (re-keyed) KTables.

KStream in = streamsBuilder.stream(topic, Consumed.with(...));

KTable rekeyedByFk1 = in
  .toTable()
  .groupBy(
  (key, value) -> KeyValue.pair(value.fk1(), value),
  G

[jira] [Commented] (KAFKA-12549) Allow state stores to opt-in transactional support

2024-01-31 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17812993#comment-17812993
 ] 

Matthias J. Sax commented on KAFKA-12549:
-

[~high.lee] – this ticket is superceded by 
https://issues.apache.org/jira/browse/KAFKA-14412 which is already WIP.

> Allow state stores to opt-in transactional support
> --
>
> Key: KAFKA-12549
> URL: https://issues.apache.org/jira/browse/KAFKA-12549
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>
> Right now Kafka Stream's EOS implementation does not make any assumptions 
> about the state store's transactional support. Allowing the state stores to 
> optionally provide transactional support can have multiple benefits. E.g., if 
> we add some APIs into the {{StateStore}} interface, like {{beginTxn}}, 
> {{commitTxn}} and {{abortTxn}}. Streams library can determine if these are 
> supported via an additional {{boolean transactional()}} API, and if yes the 
> these APIs can be used under both ALOS and EOS like the following (otherwise 
> then just fallback to the normal processing logic):
> Within thread processing loops:
> 1. store.beginTxn
> 2. store.put // during processing
> 3. streams commit // either through eos protocol or not
> 4. store.commitTxn
> 5. start the next txn by store.beginTxn
> If the state stores allow Streams to do something like above, we can have the 
> following benefits:
> * Reduce the duplicated records upon crashes for ALOS (note this is not EOS 
> still, but some middle-ground where uncommitted data within a state store 
> would not be retained if store.commitTxn failed).
> * No need to wipe the state store and re-bootstrap from scratch upon crashes 
> for EOS. E.g., if a crash-failure happened between streams commit completes 
> and store.commitTxn. We can instead just roll-forward the transaction by 
> replaying the changelog from the second recent streams committed offset 
> towards the most recent committed offset.
> * Remote stores that support txn then do not need to support wiping 
> (https://issues.apache.org/jira/browse/KAFKA-12475).
> * We can fix the known issues of emit-on-change 
> (https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams).
> * We can support "query committed data only" for interactive queries (see 
> below for reasons).
> As for the implementation of these APIs, there are several options:
> * The state store itself have natural transaction features (e.g. RocksDB).
> * Use an in-memory buffer for all puts within a transaction, and upon 
> `commitTxn` write the whole buffer as a batch to the underlying state store, 
> or just drop the whole buffer upon aborting. Then for interactive queries, 
> one can optionally only query the underlying store for committed data only.
> * Use a separate store as the transient persistent buffer. Upon `beginTxn` 
> create a new empty transient store, and upon `commitTxn` merge the store into 
> the underlying store. Same applies for interactive querying committed-only 
> data. This has a benefit compared with the one above that there's no memory 
> pressure even with long transactions, but incurs more complexity / 
> performance overhead with the separate persistent store.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Kafka-Streams-Scala for Scala 3

2024-01-31 Thread Matthias J. Sax
Thanks for raising this. The `kafka-streams-scala` module seems to be an 
important feature for Kafka Streams and I am generally in favor of your 
proposal to add Scala 3 support. However, I am personally no Scala 
person and it sounds like quite some overhead.


If you are willing to drive and own this initiative happy to support you 
to the extend I can.


About the concrete proposal: my understanding is that :core will move 
off Scala long-term (not 100% sure what the timeline is, but new modules 
are written in Java only). Thus, down the road the compatibility issue 
would go away naturally, but it's unclear when.


Thus, if we can test kafak-stream-scala_3 with core_2.13 it seems we 
could add support for Scala 3 now, taking a risk that it might break in 
the future assume that the migration off Scala from core is not fast enough.


For proposal (2), I don't think that it would be easily possible for 
unit/integration tests. We could fall back to system tests though, but 
they would be much more heavy weight of course.


Might be good to hear from others. We might actually also want to do a 
KIP for this?



-Matthias

On 1/20/24 10:34 AM, Matthias Berndt wrote:

Hey there,

I'd like to discuss a Scala 3 port of the kafka-streams-scala library.
Currently, the build system is set up such that kafka-streams-scala
and core (i. e. kafka itself) are compiled with the same Scala
compiler versions. This is not an optimal situation because it means
that a Scala 3 release of kafka-streams-scala cannot happen
independently of kafka itself. I think this should be changed

The production codebase of scala-streams-kafka actually compiles just
fine on Scala 3.3.1 with two lines of trivial syntax changes. The
problem is with the tests. These use the `EmbeddedKafkaCluster` class,
which means that kafka is pulled into the classpath, potentially
leading to binary compatibility issues.
I can see several approaches to fixing this:

1. Run the kafka-streams-scala tests using the compatible version of
:core if one is available. Currently, this means that everything can
be tested (test kafka-streams-scala_2.12 using core_2.12,
kafka-streams-scala_2.13 using core_2.13 and kafka-streams-scala_3
using core_2.13, as these should be compatible), but when a new
scala-library version is released that is no longer compatible with
2.13, we won't be able to test that.
2. Rewrite the tests to run without EmbeddedKafkaCluster, instead
running the test cluster in a separate JVM or perhaps even a
container.

I'd be willing to get my hands dirty working on this, but before I
start I'd like to get some feedback from the Kafka team regarding the
approaches outlined above.

All the best
Matthias Berndt


Re: What does kafka streams groupBy does internally?

2024-01-30 Thread Matthias J. Sax

Did reply on SO.

-Matthias

On 1/24/24 2:18 AM, warrior2...@gmail.com wrote:
Let's say there's a topic in which chunks of different files are all 
mixed up represented by a tuple |(FileId, Chunk)|.


Chunks of a same file also can be a little out of order.

The task is to aggregate all files and store them into some store.

The number of files is unbound.

In pseudo stream DSL that might look like

|topic('chunks') .groupByKey((fileId, chunk) -> fileId) .sortBy((fileId, 
chunk) -> chunk.offset) .aggregate((fileId, chunk) -> 
store.append(fileId, chunk)); |


I want to understand whether kafka streams can solve this efficiently. 
Since the number of files is unbound how would kafka manage intermediate 
topics for groupBy operation? How many partitions will it use etc? Can't 
find this details in the docs. Also let's say chunk has a flag that 
indicates EOF. How to indicate that specific group will no longer have 
any new data?



That’s a copy of my stack overflow question.
apple-touch-i...@2.png
What does kafka streams groupBy does internally? 

stackoverflow.com 






—
Michael


Re: Re-key by multiple properties without composite key

2024-01-30 Thread Matthias J. Sax

Both fk1 and fk2 point to the PK of another entity (not shown for
brevity, of no relevance to the question).


It this two independent FK, or one two-column FK?



Ingesting the topic into a Kafka Streams application, how can I re-key
the resulting KTable by both fk1 and fk2?


If you read the topic a KTable, you cannot repartition because it 
violates the contract. A KTable must be partitioned by it's primary key, 
ie, the ID field, and thus the DSL does not offer you a repartition option.


You could read the topic as KStream though, and provide a custom 
`StreamPartitioner` for a `repartition()` operation. However, this is 
also "dangerous" because for a KStream it's also assumed that it's 
partitioned by it's key, and you might break downstream DSL operators 
with such a violation of the "contract".


Looking into your solution:


.toTable()
.groupBy(
(key, value) -> KeyValue.pair(value.fk1(), value),
Grouped.with(...))


This will set fk1 as key, what seems not to align with you previous 
comment about the key should stay the ID? (Same for f2k).


Your last step seems to join fk1-fk2 -- is this on purpose? I guess it's 
unclear what you try to actually do to begin with? It sound like it's 
overall a self-join of the input topic on fk1 and fk2 ?



-Matthias

On 1/28/24 2:24 AM, Karsten Stöckmann wrote:

Hi all,

just stumbled upon another Kafka Streams issue that keeps me busy these days.

Assume a (simplified) class A like this:

class A {
 private Long id;
 private String someContent;
 private Long fk1;
 private Long fk2;
 // Getters and setters accordingly
}

Both fk1 and fk2 point to the PK of another entity (not shown for
brevity, of no relevance to the question).

Now assume a Kafka topic built from instances of class A, keyed by its
id (see above).

Ingesting the topic into a Kafka Streams application, how can I re-key
the resulting KTable by both fk1 and fk2? Note that the
resulting key should not be changed or turned into some kind of
composite key as it is used in later join operations.

My (naive) solution involves creating two KTables from the input
stream, re-keying them by fk1 and fk2 accordingly and then outer
joining both resulting (re-keyed) KTables.

KStream in = streamsBuilder.stream(topic, Consumed.with(...));

KTable rekeyedByFk1 = in
 .toTable()
 .groupBy(
 (key, value) -> KeyValue.pair(value.fk1(), value),
 Grouped.with(...))
 .aggregate(
 Aggregate::new,
 (key, value, aggregate) -> aggregate.add(value),
 (key, value, aggregate) -> aggregate.remove(value),
 Materialized.with(...));

KTable rekeyedByFk2 = in
 .toTable()
 .groupBy(
 (key, value) -> KeyValue.pair(value.fk2(), value),
 Grouped.with(...))
 .aggregate(
 ... same as above
 );

KTable joined = rekeyedByFk1
 .outerJoin(
 rekeyedByFk2,
 )
   .groupBy(KeyValue::pair, Grouped.with(...))
 .aggregate(...);

 would integrate the (already pre-joined) Aggregates as
to avoid duplicates.

Does this seem like a viable solution, or are there better / simpler /
more efficient implementations?

Best wishes,
Karsten


Re: Kafka consumer group crashing and not able to consume once service is up

2024-01-30 Thread Matthias J. Sax

I am a not sure if I can follow completely.

From the figures you show, you have a topic with 4 partitions, and 4 
consumer groups. Thus, each consumer group should read all 4 partitions, 
but the figure indicate that each group would read a single partition only?


Can you clarify? Are you using `consumer.subscribe` or `consumer.assign`?

In general, might be good too collect some INFO (or DEBUG) level logs 
for the crashing service after restart to see what it's doing.



-Matthias

On 1/30/24 7:17 AM, Marigowda, Santhosh Aditya wrote:

Hi Kafka Dev Team,

Could you please help us with our problem.

In our POC, if one of the kafka consumer(Service) shuts down or crashes 
then post restart of service none of the messages are getting consumed 
by the crashed Service.


Other services are consuming without any issues.

One of service crash/Shutdown

If we rename the Kafka consumer group name and start the service then 
messages start consuming.


Consumer configuration :

{

     delay:1000

     timeout:0

     topic-name= test

     handlers=["Listener"]

     source="kafka-consumer"

     enable_auto_commit="false"

     group="Consumer-Group-1"

     }

local-kafka-consumer = {

     server= 
"{kafka-hostname}:{kafka-port}"



deserializer="org.apache.kafka.common.serialization.StringDeserializer"


        auto_offset_reset="latest"

        enable_auto_commit="true"

        maxRequestSize=20971520

         }

Thanks,

Santhosh Aditya



[jira] [Resolved] (KAFKA-10184) Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores

2024-01-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-10184.
-
  Assignee: (was: John Roesler)
Resolution: Cannot Reproduce

This ticket is pretty old, and looking into Gradle Enterprise the test seems to 
be stable. Closing for now.

> Flaky 
> HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
> --
>
> Key: KAFKA-10184
> URL: https://issues.apache.org/jira/browse/KAFKA-10184
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Priority: Critical
>
> {code}
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 12. Input 
> records haven't all been written to the changelog: 442
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398)
>   at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:149)
>   at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:91)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch

[jira] [Resolved] (KAFKA-10184) Flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores

2024-01-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-10184.
-
  Assignee: (was: John Roesler)
Resolution: Cannot Reproduce

This ticket is pretty old, and looking into Gradle Enterprise the test seems to 
be stable. Closing for now.

> Flaky 
> HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
> --
>
> Key: KAFKA-10184
> URL: https://issues.apache.org/jira/browse/KAFKA-10184
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Priority: Critical
>
> {code}
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 12. Input 
> records haven't all been written to the changelog: 442
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$6(TestUtils.java:401)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:449)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:398)
>   at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:149)
>   at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:91)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch

[jira] [Updated] (KAFKA-16017) Checkpointed offset is incorrect when task is revived and restoring

2024-01-29 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16017:

Fix Version/s: 3.5.3
   3.4.2
   3.6.2

> Checkpointed offset is incorrect when task is revived and restoring 
> 
>
> Key: KAFKA-16017
> URL: https://issues.apache.org/jira/browse/KAFKA-16017
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.1
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 3.4.2, 3.7.0, 3.6.2, 3.5.3
>
>
> Streams checkpoints the wrong offset when a task is revived after a 
> {{TaskCorruptedException}} and the task is then migrated to another stream 
> thread during restoration.
> This might happen in a situation like the following if the Streams 
> application runs under EOS:
> 1. Streams encounters a Network error which triggers a 
> {{TaskCorruptedException}}
> 2. The task that encountered the exception is closed dirty and revived. The 
> state store directory is wiped out and a rebalance is triggered.
> 3. Until the sync of the rebalance is received the revived task is restoring.
> 4. When the sync is received the revived task is revoked and a new rebalance 
> is triggered. During the revocation the task is closed cleanly and a 
> checkpoint file is written.
> 5. With the next rebalance the task moves back to stream thread from which it 
> was revoked, read the checkpoint and starts restoring. (I might be enough if 
> the task moves to a stream thread on the same Streams client that shares the 
> same state directory).
> 6. The state of the task misses some records
> To mitigate the issue one can restart the the stream thread and delete of the 
> state on disk. After that the state restores completely from the changelog 
> topic and the state does not miss any records anymore.
> The root cause is that the checkpoint that is written in step 4 contains the 
> offset that the record collector stored when it sent the records to the 
> changelog topic. However, since in step 2 the state directory is wiped out, 
> the state does not contain those records anymore. It only contains the 
> records that it restored in step 3 which might be less. The root cause of 
> this is that the offsets in the record collector are not cleaned up when the 
> record collector is closed. 
> I created a repro under https://github.com/cadonna/kafka/tree/KAFKA-16017.
> The repro can be started with
> {code}
> ./gradlew streams:test -x checkstyleMain -x checkstyleTest -x spotbugsMain -x 
> spotbugsTest --tests RestoreIntegrationTest.test --info > test.log
> {code}
> The repro writes records into a state store and tries to retrieve them again 
> (https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java#L582).
>  It will throw an {{IllegalStateException}} if it cannot find a record in the 
> state 
> (https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java#L594).
>  Once the offsets in the record collector are cleared on close 
> (https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L332
>  and 
> https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L349),
>  the {{IllegalStateException}} does not occur anymore.
> In the logs you can check for 
> - {{Restore batch end offset is}} which are the restored offsets in the state.
> - {{task [0_1] Writing checkpoint:}} which are the written checkpoints.
> - {{task [0_1] Checkpointable offsets}} which show the offsets coming from 
> the sending records to the changelog topic 
> {{RestoreIntegrationTesttest-stateStore-changelog-1}}
> Always the last instances of these before the {{IllegalStateException}} is 
> thrown.
> You will see that the restored offsets are less than the offsets that are 
> written to the checkpoint. The offsets written to the checkpoint come from 
> the offsets stored when sending the records to the changelog topic.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16141) StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails consistently in 3.7

2024-01-17 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16141:

Fix Version/s: 3.7.0

> StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ 
> “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails 
> consistently in 3.7
> 
>
> Key: KAFKA-16141
> URL: https://issues.apache.org/jira/browse/KAFKA-16141
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Stanislav Kozlovski
>Assignee: Matthias J. Sax
>Priority: Blocker
> Fix For: 3.7.0
>
>
> {code:java}
> kafkatest.tests.streams.streams_standby_replica_test.StreamsStandbyTask#test_standby_tasks_rebalanceArguments:{
>  “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false}
> TimeoutError("Did expect to read 'ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]' from 
> ubuntu@worker26")
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_standby_replica_test.py",
>  line 79, in test_standby_tasks_rebalance
> self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/base_streams_test.py",
>  line 96, in wait_for_verification
> err_msg="Did expect to read '%s' from %s" % (message, 
> processor.node.account))
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
>  line 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: Did expect to read 'ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]' from ubuntu@worker26
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] 3.7.0 RC2

2024-01-17 Thread Matthias J. Sax

Stan, thanks for driving this all forward! Excellent job.

About


StreamsStandbyTask - https://issues.apache.org/jira/browse/KAFKA-16141
StreamsUpgradeTest - https://issues.apache.org/jira/browse/KAFKA-16139


For `StreamsUpgradeTest` it was a test setup issue and should be fixed 
now in trunk and 3.7 (and actually also in 3.6...)


For `StreamsStandbyTask` the failing test exposes a regression bug, so 
it's a blocker. I updated the ticket accordingly. We already have an 
open PR that reverts the code introducing the regression.



-Matthias

On 1/17/24 9:44 AM, Proven Provenzano wrote:

We have another blocking issue for the RC :
https://issues.apache.org/jira/browse/KAFKA-16157. This bug is similar to
https://issues.apache.org/jira/browse/KAFKA-14616. The new issue however
can lead to the new topic having partitions that a producer cannot write to.

--Proven

On Tue, Jan 16, 2024 at 12:04 PM Proven Provenzano 
wrote:



I have a PR https://github.com/apache/kafka/pull/15197 for
https://issues.apache.org/jira/browse/KAFKA-16131 that is building now.
--Proven

On Mon, Jan 15, 2024 at 5:03 AM Jakub Scholz  wrote:


*> Hi Jakub,> > Thanks for trying the RC. I think what you found is a
blocker bug because it *
*> will generate huge amount of logspam. I guess we didn't find it in
junit
tests *
*> since logspam doesn't fail the automated tests. But certainly it's not
suitable *
*> for production. Did you file a JIRA yet?*

Hi Colin,

I opened https://issues.apache.org/jira/browse/KAFKA-16131.

Thanks & Regards
Jakub

On Mon, Jan 15, 2024 at 8:57 AM Colin McCabe  wrote:


Hi Stanislav,

Thanks for making the first RC. The fact that it's titled RC2 is messing
with my mind a bit. I hope this doesn't make people think that we're
farther along than we are, heh.

On Sun, Jan 14, 2024, at 13:54, Jakub Scholz wrote:

*> Nice catch! It does seem like we should have gated this behind the
metadata> version as KIP-858 implies. Is the cluster configured with
multiple log> dirs? What is the impact of the error messages?*

I did not observe any obvious impact. I was able to send and receive
messages as normally. But to be honest, I have no idea what else
this might impact, so I did not try anything special.

I think everyone upgrading an existing KRaft cluster will go through

this

stage (running Kafka 3.7 with an older metadata version for at least a
while). So even if it is just a logged exception without any other

impact I

wonder if it might scare users from upgrading. But I leave it to

others

to

decide if this is a blocker or not.



Hi Jakub,

Thanks for trying the RC. I think what you found is a blocker bug

because

it will generate huge amount of logspam. I guess we didn't find it in

junit

tests since logspam doesn't fail the automated tests. But certainly it's
not suitable for production. Did you file a JIRA yet?


On Sun, Jan 14, 2024 at 10:17 PM Stanislav Kozlovski
 wrote:


Hey Luke,

This is an interesting problem. Given the fact that the KIP for

having a

3.8 release passed, I think it weights the scale towards not calling

this a

blocker and expecting it to be solved in 3.7.1.

It is unfortunate that it would not seem safe to migrate to KRaft in

3.7.0

(given the inability to rollback safely), but if that's true - the

same

case would apply for 3.6.0. So in any case users w\ould be expected

to

use a

patch release for this.


Hi Luke,

Thanks for testing rollback. I think this is a case where the
documentation is wrong. The intention was to for the steps to basically

be:


1. roll all the brokers into zk mode, but with migration enabled
2. take down the kraft quorum
3. rmr /controller, allowing a hybrid broker to take over.
4. roll all the brokers into zk mode without migration enabled (if

desired)


With these steps, there isn't really unavailability since a ZK

controller

can be elected quickly after the kraft quorum is gone.


Further, since we will have a 3.8 release - it is
likely we will ultimately recommend users upgrade from that version

given

its aim is to have strategic KRaft feature parity with ZK.
That being said, I am not 100% on this. Let me know whether you think

this

should block the release, Luke. I am also tagging Colin and David to

weigh

in with their opinions, as they worked on the migration logic.


The rollback docs are new in 3.7 so the fact that they're wrong is a

clear

blocker, I think. But easy to fix, I believe. I will create a PR.

best,
Colin



Hey Kirk and Chris,

Unless I'm missing something - KAFKALESS-16029 is simply a bad log

due

to

improper closing. And the PR description implies this has been

present

since 3.5. While annoying, I don't see a strong reason for this to

block

the release.

Hey Jakub,

Nice catch! It does seem like we should have gated this behind the

metadata

version as KIP-858 implies. Is the cluster configured with multiple

log

dirs? What is the impact of the error messages?

Tagging Igor (the author of the KIP) to weigh 

[jira] [Updated] (KAFKA-16141) StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails consistently in 3.7

2024-01-17 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16141:

Component/s: streams

> StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ 
> “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails 
> consistently in 3.7
> 
>
> Key: KAFKA-16141
> URL: https://issues.apache.org/jira/browse/KAFKA-16141
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Stanislav Kozlovski
>Assignee: Matthias J. Sax
>Priority: Blocker
>
> {code:java}
> kafkatest.tests.streams.streams_standby_replica_test.StreamsStandbyTask#test_standby_tasks_rebalanceArguments:{
>  “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false}
> TimeoutError("Did expect to read 'ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]' from 
> ubuntu@worker26")
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_standby_replica_test.py",
>  line 79, in test_standby_tasks_rebalance
> self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/base_streams_test.py",
>  line 96, in wait_for_verification
> err_msg="Did expect to read '%s' from %s" % (message, 
> processor.node.account))
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
>  line 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: Did expect to read 'ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]' from ubuntu@worker26
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16139) StreamsUpgradeTest fails consistently in 3.7.0

2024-01-17 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16139:

Component/s: streams
 system tests

> StreamsUpgradeTest fails consistently in 3.7.0
> --
>
> Key: KAFKA-16139
> URL: https://issues.apache.org/jira/browse/KAFKA-16139
> Project: Kafka
>  Issue Type: Test
>  Components: streams, system tests
>Affects Versions: 3.7.0
>Reporter: Stanislav Kozlovski
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 3.7.0, 3.6.1
>
>
> h1. 
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest#test_rolling_upgrade_with_2_bouncesArguments:\{
>  “from_version”: “3.5.1”, “to_version”: “3.7.0-SNAPSHOT”}
>  
> {{TimeoutError('Could not detect Kafka Streams version 3.7.0-SNAPSHOT on 
> ubuntu@worker2')}}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15770) org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy is flaky

2024-01-17 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-15770:
---

Assignee: Matthias J. Sax

> org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy
>  is flaky 
> ---
>
> Key: KAFKA-15770
> URL: https://issues.apache.org/jira/browse/KAFKA-15770
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Alok Thatikunta
>    Assignee: Matthias J. Sax
>Priority: Major
>  Labels: flaky-test
>
> Test fails on CI, passes locally
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14607/9/testReport/junit/org.apache.kafka.streams.integration/ConsistencyVectorIntegrationTest/Build___JDK_17_and_Scala_2_13___shouldHaveSamePositionBoundActiveAndStandBy/]
> {code:java}
> java.lang.AssertionError: 
> Result:SucceededQueryResult{result=<0,1698511250443>, executionInfo=[], 
> position=Position{position={input-topic={0=50
> Expected: is 
>  but: was  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16123) KStreamKStreamJoinProcessor forwards null records for left/outer joins unconditionally of the join window.

2024-01-17 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16123:

Component/s: streams

> KStreamKStreamJoinProcessor forwards null records for left/outer joins 
> unconditionally of the join window.
> --
>
> Key: KAFKA-16123
> URL: https://issues.apache.org/jira/browse/KAFKA-16123
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Florin Akermann
>Assignee: Florin Akermann
>Priority: Major
>
> As part of KIP-962 the non-null key requirements have been relaxed for left 
> and outer joins.
> However, the implementation forwards null records for left/outer joins 
> unconditionally of the join window.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16159) Prune excessive logging from Telemetry Reporter

2024-01-17 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16159:

Affects Version/s: 3.7.0

> Prune excessive logging from Telemetry Reporter
> ---
>
> Key: KAFKA-16159
> URL: https://issues.apache.org/jira/browse/KAFKA-16159
> Project: Kafka
>  Issue Type: Task
>  Components: consumer, log
>Affects Versions: 3.7.0
>Reporter: Philip Nee
>Assignee: Apoorv Mittal
>Priority: Major
>  Labels: consumer, logging
> Fix For: 3.8.0
>
>
> While running system tests locally, I've noticed excessive logging of the 
> Telemtry Reporter.  This I believe was introduced in KIP-714.
> {code:java}
> [2024-01-15 09:44:16,911] DEBUG For telemetry state SUBSCRIPTION_NEEDED, 
> returning the value 224678 ms; the client will wait before submitting the 
> next GetTelemetrySubscriptions network API request 
> (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter) {code}
> This is logged several times per ms - Also, given the amount of log being 
> emitted, can we also check the CPU profile to see if there's a process 
> running a tight loop?
>  
> Update
> ---
> Looking from the beginning, is this caused by the following?
> {code:java}
> DEBUG The broker generated an error for the get telemetry network API request 
> (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter)
>     146 org.apache.kafka.common.errors.UnsupportedVersionException: The node 
> does not support GET_TELEMETRY_SUBSCRIPTIONS {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16159) Prune excessive logging from Telemetry Reporter

2024-01-17 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16159:

Fix Version/s: (was: 3.8.0)

> Prune excessive logging from Telemetry Reporter
> ---
>
> Key: KAFKA-16159
> URL: https://issues.apache.org/jira/browse/KAFKA-16159
> Project: Kafka
>  Issue Type: Task
>  Components: consumer, log
>Affects Versions: 3.7.0
>Reporter: Philip Nee
>Assignee: Apoorv Mittal
>Priority: Major
>  Labels: consumer, logging
>
> While running system tests locally, I've noticed excessive logging of the 
> Telemtry Reporter.  This I believe was introduced in KIP-714.
> {code:java}
> [2024-01-15 09:44:16,911] DEBUG For telemetry state SUBSCRIPTION_NEEDED, 
> returning the value 224678 ms; the client will wait before submitting the 
> next GetTelemetrySubscriptions network API request 
> (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter) {code}
> This is logged several times per ms - Also, given the amount of log being 
> emitted, can we also check the CPU profile to see if there's a process 
> running a tight loop?
>  
> Update
> ---
> Looking from the beginning, is this caused by the following?
> {code:java}
> DEBUG The broker generated an error for the get telemetry network API request 
> (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter)
>     146 org.apache.kafka.common.errors.UnsupportedVersionException: The node 
> does not support GET_TELEMETRY_SUBSCRIPTIONS {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16139) StreamsUpgradeTest fails consistently in 3.7.0

2024-01-17 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16139.
-
Fix Version/s: 3.7.0
   3.6.1
   Resolution: Fixed

> StreamsUpgradeTest fails consistently in 3.7.0
> --
>
> Key: KAFKA-16139
> URL: https://issues.apache.org/jira/browse/KAFKA-16139
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.7.0
>Reporter: Stanislav Kozlovski
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 3.7.0, 3.6.1
>
>
> h1. 
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest#test_rolling_upgrade_with_2_bouncesArguments:\{
>  “from_version”: “3.5.1”, “to_version”: “3.7.0-SNAPSHOT”}
>  
> {{TimeoutError('Could not detect Kafka Streams version 3.7.0-SNAPSHOT on 
> ubuntu@worker2')}}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16139) StreamsUpgradeTest fails consistently in 3.7.0

2024-01-17 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16139.
-
Fix Version/s: 3.7.0
   3.6.1
   Resolution: Fixed

> StreamsUpgradeTest fails consistently in 3.7.0
> --
>
> Key: KAFKA-16139
> URL: https://issues.apache.org/jira/browse/KAFKA-16139
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.7.0
>Reporter: Stanislav Kozlovski
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 3.7.0, 3.6.1
>
>
> h1. 
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest#test_rolling_upgrade_with_2_bouncesArguments:\{
>  “from_version”: “3.5.1”, “to_version”: “3.7.0-SNAPSHOT”}
>  
> {{TimeoutError('Could not detect Kafka Streams version 3.7.0-SNAPSHOT on 
> ubuntu@worker2')}}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16141) StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails consistently in 3.7

2024-01-17 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16141:

Issue Type: Bug  (was: Test)

> StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ 
> “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails 
> consistently in 3.7
> 
>
> Key: KAFKA-16141
> URL: https://issues.apache.org/jira/browse/KAFKA-16141
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Stanislav Kozlovski
>    Assignee: Matthias J. Sax
>Priority: Blocker
>
> {code:java}
> kafkatest.tests.streams.streams_standby_replica_test.StreamsStandbyTask#test_standby_tasks_rebalanceArguments:{
>  “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false}
> TimeoutError("Did expect to read 'ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]' from 
> ubuntu@worker26")
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_standby_replica_test.py",
>  line 79, in test_standby_tasks_rebalance
> self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/base_streams_test.py",
>  line 96, in wait_for_verification
> err_msg="Did expect to read '%s' from %s" % (message, 
> processor.node.account))
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
>  line 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: Did expect to read 'ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]' from ubuntu@worker26
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16141) StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails consistently in 3.7

2024-01-17 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17807890#comment-17807890
 ] 

Matthias J. Sax edited comment on KAFKA-16141 at 1/17/24 9:15 PM:
--

After discussion with Almog (and testing it) implementing `WrappedStateStore` 
won't do the trick. Given that this is a regression, it seems best (most 
pragmatic) to just revert the change on 
`KeyValueToTimestampedKeyValueByteStoreAdapter` and don't let it implement 
`TimestampedBytesStore` for now (even if this still seems to be the right thing 
to do – however, it might require some larger changes to make it work – will 
filed a follow up ticket for this: 
https://issues.apache.org/jira/browse/KAFKA-16158).


was (Author: mjsax):
After discussion with Almog (and testing it) implementing `WrappedStateStore` 
won't do the trick. Given that this is a regression, it seems best (most 
pragmatic) to just revert the change on 
`KeyValueToTimestampedKeyValueByteStoreAdapter` and don't let it implement 
`TimestampedBytesStore` for now (even if this still seems to be the right thing 
to do – however, it might require some larger changes to make it work – will 
file a follow up ticket for this).

> StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ 
> “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails 
> consistently in 3.7
> 
>
> Key: KAFKA-16141
> URL: https://issues.apache.org/jira/browse/KAFKA-16141
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.7.0
>Reporter: Stanislav Kozlovski
>    Assignee: Matthias J. Sax
>Priority: Blocker
>
> {code:java}
> kafkatest.tests.streams.streams_standby_replica_test.StreamsStandbyTask#test_standby_tasks_rebalanceArguments:{
>  “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false}
> TimeoutError("Did expect to read 'ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]' from 
> ubuntu@worker26")
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_standby_replica_test.py",
>  line 79, in test_standby_tasks_rebalance
> self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/base_streams_test.py",
>  line 96, in wait_for_verification
> err_msg="Did expect to read '%s' from %s" % (message, 
> processor.node.account))
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
>  line 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: Did expect to read 'ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]' from ubuntu@worker26
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16158) Cleanup usage of `TimestampedBytesStore` interface

2024-01-17 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16158:
---

 Summary: Cleanup usage of `TimestampedBytesStore` interface
 Key: KAFKA-16158
 URL: https://issues.apache.org/jira/browse/KAFKA-16158
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


We added `TimestampedBytesStore` interface many release ago. It's purpose is to 
indicate if a byte-store's binary value contains a "plain value" or a 
"" format. Stores with "" format should implement the 
interface, however not all stores which this format do.

We tried to fix one occurrence via 
https://issues.apache.org/jira/browse/KAFKA-15629 by adding 
`TimestampedBytesStore` to `KeyValueToTimestampedKeyValueByteStoreAdapter`, 
whoever this change broke the restore code path (cf 
https://issues.apache.org/jira/browse/KAFKA-16141) and thus we reverted the 
change.

During the investigation, we also notices that 
`InMemoryTimestampedKeyValueStoreMarker` implements `TimestampedBytesStore` but 
does not do a byte-array translation (it's unclear why no byte array 
translation happens) – and it's also unclear if in-memory store is testes 
properly.

We should try to clean this all up, adding `TimestampedBytesStore` to 
`KeyValueToTimestampedKeyValueByteStoreAdapter` and figure out how avoid 
breaking the restore code path. In addition, we should verify if 
`InMemoryTimestampedKeyValueStoreMarker` is correct or not, and if the restore 
code path (and maybe also IQv2 code path) is tested properly and correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16158) Cleanup usage of `TimestampedBytesStore` interface

2024-01-17 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16158:
---

 Summary: Cleanup usage of `TimestampedBytesStore` interface
 Key: KAFKA-16158
 URL: https://issues.apache.org/jira/browse/KAFKA-16158
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


We added `TimestampedBytesStore` interface many release ago. It's purpose is to 
indicate if a byte-store's binary value contains a "plain value" or a 
"" format. Stores with "" format should implement the 
interface, however not all stores which this format do.

We tried to fix one occurrence via 
https://issues.apache.org/jira/browse/KAFKA-15629 by adding 
`TimestampedBytesStore` to `KeyValueToTimestampedKeyValueByteStoreAdapter`, 
whoever this change broke the restore code path (cf 
https://issues.apache.org/jira/browse/KAFKA-16141) and thus we reverted the 
change.

During the investigation, we also notices that 
`InMemoryTimestampedKeyValueStoreMarker` implements `TimestampedBytesStore` but 
does not do a byte-array translation (it's unclear why no byte array 
translation happens) – and it's also unclear if in-memory store is testes 
properly.

We should try to clean this all up, adding `TimestampedBytesStore` to 
`KeyValueToTimestampedKeyValueByteStoreAdapter` and figure out how avoid 
breaking the restore code path. In addition, we should verify if 
`InMemoryTimestampedKeyValueStoreMarker` is correct or not, and if the restore 
code path (and maybe also IQv2 code path) is tested properly and correct.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16141) StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails consistently in 3.7

2024-01-17 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17807890#comment-17807890
 ] 

Matthias J. Sax commented on KAFKA-16141:
-

After discussion with Almog (and testing it) implementing `WrappedStateStore` 
won't do the trick. Given that this is a regression, it seems best (most 
pragmatic) to just revert the change on 
`KeyValueToTimestampedKeyValueByteStoreAdapter` and don't let it implement 
`TimestampedBytesStore` for now (even if this still seems to be the right thing 
to do – however, it might require some larger changes to make it work – will 
file a follow up ticket for this).

> StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ 
> “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails 
> consistently in 3.7
> 
>
> Key: KAFKA-16141
> URL: https://issues.apache.org/jira/browse/KAFKA-16141
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.7.0
>Reporter: Stanislav Kozlovski
>    Assignee: Matthias J. Sax
>Priority: Blocker
>
> {code:java}
> kafkatest.tests.streams.streams_standby_replica_test.StreamsStandbyTask#test_standby_tasks_rebalanceArguments:{
>  “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false}
> TimeoutError("Did expect to read 'ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]' from 
> ubuntu@worker26")
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_standby_replica_test.py",
>  line 79, in test_standby_tasks_rebalance
> self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/base_streams_test.py",
>  line 96, in wait_for_verification
> err_msg="Did expect to read '%s' from %s" % (message, 
> processor.node.account))
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
>  line 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: Did expect to read 'ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]' from ubuntu@worker26
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16141) StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails consistently in 3.7

2024-01-17 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-16141:
---

Assignee: Matthias J. Sax  (was: Almog Gavra)

> StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ 
> “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails 
> consistently in 3.7
> 
>
> Key: KAFKA-16141
> URL: https://issues.apache.org/jira/browse/KAFKA-16141
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.7.0
>Reporter: Stanislav Kozlovski
>    Assignee: Matthias J. Sax
>Priority: Blocker
>
> {code:java}
> kafkatest.tests.streams.streams_standby_replica_test.StreamsStandbyTask#test_standby_tasks_rebalanceArguments:{
>  “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false}
> TimeoutError("Did expect to read 'ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]' from 
> ubuntu@worker26")
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_standby_replica_test.py",
>  line 79, in test_standby_tasks_rebalance
> self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/base_streams_test.py",
>  line 96, in wait_for_verification
> err_msg="Did expect to read '%s' from %s" % (message, 
> processor.node.account))
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
>  line 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: Did expect to read 'ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]' from ubuntu@worker26
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [Kafka Streams], Processor API for KTable and KGroupedStream

2024-01-17 Thread Matthias J. Sax
You cannot add a `Processor`. You can only use `aggregate() / reduce() / 
count()` (which of course will add a pre-defined processor).


`groupByKey()` is really just a "meta operation" that checks if the key 
was changes upstream, and to insert a repartition/shuffle step if necessary.


Thus, if you don't change the upstream key, you can just add a processor 
to `someStream` (groupByKey() would be a no-op anyway).


If you did change the key upstream, you can do 
`someStream.repartition().transform()` to repartition explicitly.



HTH.

On 1/13/24 3:14 AM, Igor Maznitsa wrote:
Thanks a lot for explanation but could you provide a bit more details 
about KGroupedStream? It is just interface and not extends KStream so 
how I can add processor in the case below?

/
   KStream someStream = /
/  someStream /
/     .groupByKey()
/ */how to add processor for resulted grouped stream here ???/*

On 2024-Jan-13 01:22, Matthias J. Sax wrote:
`KGroupedStream` is just an "intermediate representation" to get a 
better flow in the DSL. It's not a "top level" abstraction like 
KStream/KTable.


For `KTable` there is `transformValue()` -- there is no `transform()` 
because keying must be preserved -- if you want to change the keying 
you  need to use `KTable#groupBy()` (data needs to be repartitioned if 
you change the key).


HTH.

-Matthias

On 1/12/24 11:47 AM, Igor Maznitsa wrote:

Hello

Is there any way in Kafka Streams API to define processors for KTable 
and KGroupedStream like KStream#transform? How to provide a custom 
processor for KTable or KGroupedStream which could for instance 
provide way to not downstream selected events?







[jira] [Commented] (KAFKA-16141) StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails consistently in 3.7

2024-01-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17807543#comment-17807543
 ] 

Matthias J. Sax commented on KAFKA-16141:
-

Thanks. I am wondering if the right fix would be, to change 
`KeyValueToTimestampedKeyValueByteStoreAdapter` to implement 
`WrappedStateStore` – in the end, it is a wrapper, but the restore code path 
does not recognize it as such, and thus cannot "unwrap" it's inner store to 
pick the right converter? – Comparing to 
`InMemoryTimestampedKeyValueStoreMarker` it also implement both `WrappedStore` 
and `TimestampedBytesStore`?

We did consider it a bug-fix to add `TimestampedBytesStore` to 
`KeyValueToTimestampedKeyValueByteStoreAdapter` because in the end, it's does 
expect `` value byte format an `put()` and also returns this format 
on `get()`.

It's just that the restore code path is not interested in some upper layer, but 
only in the most inner wrapper store type?

> StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ 
> “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails 
> consistently in 3.7
> 
>
> Key: KAFKA-16141
> URL: https://issues.apache.org/jira/browse/KAFKA-16141
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.7.0
>Reporter: Stanislav Kozlovski
>Assignee: Almog Gavra
>Priority: Blocker
>
> {code:java}
> kafkatest.tests.streams.streams_standby_replica_test.StreamsStandbyTask#test_standby_tasks_rebalanceArguments:{
>  “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false}
> TimeoutError("Did expect to read 'ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]' from 
> ubuntu@worker26")
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_standby_replica_test.py",
>  line 79, in test_standby_tasks_rebalance
> self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/base_streams_test.py",
>  line 96, in wait_for_verification
> err_msg="Did expect to read '%s' from %s" % (message, 
> processor.node.account))
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
>  line 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: Did expect to read 'ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]' from ubuntu@worker26
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16141) StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails consistently in 3.7

2024-01-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17807411#comment-17807411
 ] 

Matthias J. Sax commented on KAFKA-16141:
-

Assigned to [~agavra] and marked as blocker. Might be a regression introduced 
via KIP-954.

> StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ 
> “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails 
> consistently in 3.7
> 
>
> Key: KAFKA-16141
> URL: https://issues.apache.org/jira/browse/KAFKA-16141
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.7.0
>Reporter: Stanislav Kozlovski
>Assignee: Almog Gavra
>Priority: Blocker
>
> {code:java}
> kafkatest.tests.streams.streams_standby_replica_test.StreamsStandbyTask#test_standby_tasks_rebalanceArguments:{
>  “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false}
> TimeoutError("Did expect to read 'ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]' from 
> ubuntu@worker26")
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_standby_replica_test.py",
>  line 79, in test_standby_tasks_rebalance
> self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/base_streams_test.py",
>  line 96, in wait_for_verification
> err_msg="Did expect to read '%s' from %s" % (message, 
> processor.node.account))
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
>  line 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: Did expect to read 'ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]' from ubuntu@worker26
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16141) StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails consistently in 3.7

2024-01-16 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16141:

Priority: Blocker  (was: Major)

> StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ 
> “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails 
> consistently in 3.7
> 
>
> Key: KAFKA-16141
> URL: https://issues.apache.org/jira/browse/KAFKA-16141
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.7.0
>Reporter: Stanislav Kozlovski
>Assignee: Almog Gavra
>Priority: Blocker
>
> {code:java}
> kafkatest.tests.streams.streams_standby_replica_test.StreamsStandbyTask#test_standby_tasks_rebalanceArguments:{
>  “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false}
> TimeoutError("Did expect to read 'ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]' from 
> ubuntu@worker26")
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_standby_replica_test.py",
>  line 79, in test_standby_tasks_rebalance
> self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/base_streams_test.py",
>  line 96, in wait_for_verification
> err_msg="Did expect to read '%s' from %s" % (message, 
> processor.node.account))
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
>  line 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: Did expect to read 'ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]' from ubuntu@worker26
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16141) StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails consistently in 3.7

2024-01-16 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-16141:
---

Assignee: Almog Gavra

> StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ 
> “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails 
> consistently in 3.7
> 
>
> Key: KAFKA-16141
> URL: https://issues.apache.org/jira/browse/KAFKA-16141
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.7.0
>Reporter: Stanislav Kozlovski
>Assignee: Almog Gavra
>Priority: Major
>
> {code:java}
> kafkatest.tests.streams.streams_standby_replica_test.StreamsStandbyTask#test_standby_tasks_rebalanceArguments:{
>  “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false}
> TimeoutError("Did expect to read 'ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]' from 
> ubuntu@worker26")
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_standby_replica_test.py",
>  line 79, in test_standby_tasks_rebalance
> self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/base_streams_test.py",
>  line 96, in wait_for_verification
> err_msg="Did expect to read '%s' from %s" % (message, 
> processor.node.account))
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
>  line 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: Did expect to read 'ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]' from ubuntu@worker26
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PROPOSAL] Add commercial support page on website

2024-01-12 Thread Matthias J. Sax

François,

thanks for starting this initiative. Personally, I don't think it's 
necessarily harmful for the project to add such a new page, however, I 
share the same concerns others raised already.


I understand your motivation that people had issues finding commercial 
support, but I am not sure we can address this issue that way. I am also 
"worried" (for the lack of a better word) that the page might become 
long an unwieldy. In the end, any freelancer/consultant offering Kafka 
services would be able to get on the page, so we might get hundreds of 
entries, what also makes it impossible for users to find what they are 
looking for. Also, the services of different companies might vary 
drastically; should users read all these descriptions? I can also 
imagine that some companies offer their services only in some 
countries/regions making it even harder for user to find what they are 
looking for?


Overall, it sounds more like a search optimization problem, and thus it 
seems out-of-scope what we can solve. As I said, I am not strictly 
against it, but I just don't see much value either.



-Matthias

On 1/11/24 12:55 PM, Francois Papon wrote:

Hi Justine,

You're right, Kafka is a part of my business (training, consulting, 
architecture design, sla...) and most of the time, users/customers said 
that it was hard for them to find a commercial support (in France for my 
case) after searching on the Kafka website (Google didn't help them).


As an ASF member and PMC of several ASF projects, I know that this kind 
of page exist so this is why I made this proposal for the Kafka project 
because I really think that it can help users.


As you suggest, I can submit a PR to be added on the "powered by" page.

Thanks,

François

On 11/01/2024 21:00, Justine Olshan wrote:

Hey François,

My point was that the companies on that page use kafka as part of their
business. If you use Kafka as part of your business feel free to submit a
PR to be added.

I second Chris's point that other projects are not enough to require 
Kafka

having such a support page.

Justine

On Thu, Jan 11, 2024 at 11:57 AM Chris Egerton 
wrote:


Hi François,

Is it an official policy of the ASF that projects provide a listing of
commercial support options for themselves? I understand that other 
projects

have chosen to provide one, but this doesn't necessarily imply that all
projects should do the same, and I can't say I find this point very
convincing as a rebuttal to some of the good-faith concerns raised by 
the

PMC and members of the community so far. However, if there's an official
ASF stance on this topic, then I acknowledge that Apache Kafka should 
align

with it.

Best,

Chris


On Thu, Jan 11, 2024, 14:50 fpapon  wrote:


Hi Justine,

I'm not sure to see the difference between "happy users" and vendors
that advertise their products in some of the company list in the
"powered by" page.

Btw, my initial purpose of my proposal was to help user to find support
for production stuff rather than searching in google.

I don't think this is a bad thing because this is something that 
already

exist in many ASF projects like:

https://hop.apache.org/community/commercial/
https://struts.apache.org/commercial-support.html
https://directory.apache.org/commercial-support.html
https://tomee.apache.org/commercial-support.html
https://plc4x.apache.org/users/commercial-support.html
https://camel.apache.org/community/support/
https://openmeetings.apache.org/commercial-support.html
https://guacamole.apache.org/support/



https://cwiki.apache.org/confluence/display/HADOOP2/Distributions+and+Commercial+Support
https://activemq.apache.org/supporthttps://karaf.apache.org/community.html

https://netbeans.apache.org/front/main/help/commercial-support/
https://royale.apache.org/royale-commercial-support/

https://karaf.apache.org/community.html

As I understand for now, the channel for users to find production
support is:

- The mailing list (u...@kafka.apache.org / d...@kafka.apache.org)

- The official #kafka  ASF Slack channel (may be we can add it on the
website because I didn't find it in the website =>
https://kafka.apache.org/contact)

- Search in google for commercial support only

I can update my PR to mention only the 3 points above for the "get
support" page if people think that having a support page make sense.

regards,

François

On 11/01/2024 19:34, Justine Olshan wrote:

I think there is a difference between the "Powered by" page and a page

for

vendors to advertise their products and services.

The idea is that the companies on that page are "powered by" Kafka.

They

serve as examples of happy users of Kafka.
I don't think it is meant only as a place just for those companies to
advertise.

I'm a little confused by

In this case, I'm ok to say that the commercial support section in 
the

"Get support" is no need as we can use this page.

If you plan to submit for this page, please include a description on

how

your company uses 

Re: [PROPOSAL] Add commercial support page on website

2024-01-12 Thread Matthias J. Sax

François,

thanks for starting this initiative. Personally, I don't think it's 
necessarily harmful for the project to add such a new page, however, I 
share the same concerns others raised already.


I understand your motivation that people had issues finding commercial 
support, but I am not sure we can address this issue that way. I am also 
"worried" (for the lack of a better word) that the page might become 
long an unwieldy. In the end, any freelancer/consultant offering Kafka 
services would be able to get on the page, so we might get hundreds of 
entries, what also makes it impossible for users to find what they are 
looking for. Also, the services of different companies might vary 
drastically; should users read all these descriptions? I can also 
imagine that some companies offer their services only in some 
countries/regions making it even harder for user to find what they are 
looking for?


Overall, it sounds more like a search optimization problem, and thus it 
seems out-of-scope what we can solve. As I said, I am not strictly 
against it, but I just don't see much value either.



-Matthias

On 1/11/24 12:55 PM, Francois Papon wrote:

Hi Justine,

You're right, Kafka is a part of my business (training, consulting, 
architecture design, sla...) and most of the time, users/customers said 
that it was hard for them to find a commercial support (in France for my 
case) after searching on the Kafka website (Google didn't help them).


As an ASF member and PMC of several ASF projects, I know that this kind 
of page exist so this is why I made this proposal for the Kafka project 
because I really think that it can help users.


As you suggest, I can submit a PR to be added on the "powered by" page.

Thanks,

François

On 11/01/2024 21:00, Justine Olshan wrote:

Hey François,

My point was that the companies on that page use kafka as part of their
business. If you use Kafka as part of your business feel free to submit a
PR to be added.

I second Chris's point that other projects are not enough to require 
Kafka

having such a support page.

Justine

On Thu, Jan 11, 2024 at 11:57 AM Chris Egerton 
wrote:


Hi François,

Is it an official policy of the ASF that projects provide a listing of
commercial support options for themselves? I understand that other 
projects

have chosen to provide one, but this doesn't necessarily imply that all
projects should do the same, and I can't say I find this point very
convincing as a rebuttal to some of the good-faith concerns raised by 
the

PMC and members of the community so far. However, if there's an official
ASF stance on this topic, then I acknowledge that Apache Kafka should 
align

with it.

Best,

Chris


On Thu, Jan 11, 2024, 14:50 fpapon  wrote:


Hi Justine,

I'm not sure to see the difference between "happy users" and vendors
that advertise their products in some of the company list in the
"powered by" page.

Btw, my initial purpose of my proposal was to help user to find support
for production stuff rather than searching in google.

I don't think this is a bad thing because this is something that 
already

exist in many ASF projects like:

https://hop.apache.org/community/commercial/
https://struts.apache.org/commercial-support.html
https://directory.apache.org/commercial-support.html
https://tomee.apache.org/commercial-support.html
https://plc4x.apache.org/users/commercial-support.html
https://camel.apache.org/community/support/
https://openmeetings.apache.org/commercial-support.html
https://guacamole.apache.org/support/



https://cwiki.apache.org/confluence/display/HADOOP2/Distributions+and+Commercial+Support
https://activemq.apache.org/supporthttps://karaf.apache.org/community.html

https://netbeans.apache.org/front/main/help/commercial-support/
https://royale.apache.org/royale-commercial-support/

https://karaf.apache.org/community.html

As I understand for now, the channel for users to find production
support is:

- The mailing list (u...@kafka.apache.org / dev@kafka.apache.org)

- The official #kafka  ASF Slack channel (may be we can add it on the
website because I didn't find it in the website =>
https://kafka.apache.org/contact)

- Search in google for commercial support only

I can update my PR to mention only the 3 points above for the "get
support" page if people think that having a support page make sense.

regards,

François

On 11/01/2024 19:34, Justine Olshan wrote:

I think there is a difference between the "Powered by" page and a page

for

vendors to advertise their products and services.

The idea is that the companies on that page are "powered by" Kafka.

They

serve as examples of happy users of Kafka.
I don't think it is meant only as a place just for those companies to
advertise.

I'm a little confused by

In this case, I'm ok to say that the commercial support section in 
the

"Get support" is no need as we can use this page.

If you plan to submit for this page, please include a description on

how

your company uses 

Re: [Kafka Streams], Processor API for KTable and KGroupedStream

2024-01-12 Thread Matthias J. Sax
`KGroupedStream` is just an "intermediate representation" to get a 
better flow in the DSL. It's not a "top level" abstraction like 
KStream/KTable.


For `KTable` there is `transformValue()` -- there is no `transform()` 
because keying must be preserved -- if you want to change the keying you 
 need to use `KTable#groupBy()` (data needs to be repartitioned if you 
change the key).


HTH.

-Matthias

On 1/12/24 11:47 AM, Igor Maznitsa wrote:

Hello

Is there any way in Kafka Streams API to define processors for KTable 
and KGroupedStream like KStream#transform? How to provide a custom 
processor for KTable or KGroupedStream which could for instance provide 
way to not downstream selected events?





Re: [Kafka Streams], Processor API for KTable and KGroupedStream

2024-01-12 Thread Matthias J. Sax
`KGroupedStream` is just an "intermediate representation" to get a 
better flow in the DSL. It's not a "top level" abstraction like 
KStream/KTable.


For `KTable` there is `transformValue()` -- there is no `transform()` 
because keying must be preserved -- if you want to change the keying you 
 need to use `KTable#groupBy()` (data needs to be repartitioned if you 
change the key).


HTH.

-Matthias

On 1/12/24 3:09 AM, Igor Maznitsa wrote:

Hello

Is there any way in Kafka Streams API to have processors for KTable and 
KGroupedStream like KStream#transform? How to provide a complex 
processor for KTable or KGroupedStream which could provide way to not 
downstream events for some business logic?





[jira] [Commented] (KAFKA-16089) Kafka Streams still leaking memory

2024-01-12 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806217#comment-17806217
 ] 

Matthias J. Sax commented on KAFKA-16089:
-

Great job!

> Kafka Streams still leaking memory
> --
>
> Key: KAFKA-16089
> URL: https://issues.apache.org/jira/browse/KAFKA-16089
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.8.0
>Reporter: Lucas Brutschy
>Assignee: Nicholas Telford
>Priority: Critical
> Fix For: 3.8.0
>
> Attachments: fix.png, graphviz (1).svg, unfix.png
>
>
> In 
> [https://github.com/apache/kafka/commit/58d6d2e5922df60cdc5c5c1bcd1c2d82dd2b71e2]
>  a leak was fixed in the release candidate for 3.7.
>  
> However, Kafka Streams still seems to be leaking memory (just slower) after 
> the fix.
>  
> Attached is the `jeprof` output right before a crash after ~11 hours.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14949) Add Streams upgrade tests from AK 3.4

2024-01-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14949.
-
Resolution: Fixed

> Add Streams upgrade tests from AK 3.4
> -
>
> Key: KAFKA-14949
> URL: https://issues.apache.org/jira/browse/KAFKA-14949
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Reporter: Victoria Xia
>Assignee: Mickael Maison
>Priority: Critical
>
> Streams upgrade tests currently only test upgrading from 3.3 and earlier 
> versions 
> ([link|https://github.com/apache/kafka/blob/056657d84d84e116ffc9460872945b4d2b479ff3/tests/kafkatest/tests/streams/streams_application_upgrade_test.py#L30]).
>  We should add 3.4 as an "upgrade_from" version into these tests, in light of 
> the upcoming 3.5 release.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14949) Add Streams upgrade tests from AK 3.4

2024-01-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14949.
-
Resolution: Fixed

> Add Streams upgrade tests from AK 3.4
> -
>
> Key: KAFKA-14949
> URL: https://issues.apache.org/jira/browse/KAFKA-14949
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Reporter: Victoria Xia
>Assignee: Mickael Maison
>Priority: Critical
>
> Streams upgrade tests currently only test upgrading from 3.3 and earlier 
> versions 
> ([link|https://github.com/apache/kafka/blob/056657d84d84e116ffc9460872945b4d2b479ff3/tests/kafkatest/tests/streams/streams_application_upgrade_test.py#L30]).
>  We should add 3.4 as an "upgrade_from" version into these tests, in light of 
> the upcoming 3.5 release.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14949) Add Streams upgrade tests from AK 3.4

2024-01-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-14949:
---

Assignee: Mickael Maison

> Add Streams upgrade tests from AK 3.4
> -
>
> Key: KAFKA-14949
> URL: https://issues.apache.org/jira/browse/KAFKA-14949
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Reporter: Victoria Xia
>Assignee: Mickael Maison
>Priority: Critical
>
> Streams upgrade tests currently only test upgrading from 3.3 and earlier 
> versions 
> ([link|https://github.com/apache/kafka/blob/056657d84d84e116ffc9460872945b4d2b479ff3/tests/kafkatest/tests/streams/streams_application_upgrade_test.py#L30]).
>  We should add 3.4 as an "upgrade_from" version into these tests, in light of 
> the upcoming 3.5 release.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16092) Queues for Kafka

2024-01-08 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16092:

Issue Type: New Feature  (was: Improvement)

> Queues for Kafka
> 
>
> Key: KAFKA-16092
> URL: https://issues.apache.org/jira/browse/KAFKA-16092
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Major
>
> This Jira tracks the development of KIP-932: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14404) Fix & update docs on client configs controlled by Streams

2024-01-05 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803543#comment-17803543
 ] 

Matthias J. Sax commented on KAFKA-14404:
-

It seems [~sujayopensource] lost interest to work on this. So yes, please feel 
free to pick it up.

> Fix & update docs on client configs controlled by Streams
> -
>
> Key: KAFKA-14404
> URL: https://issues.apache.org/jira/browse/KAFKA-14404
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sujay Hegde
>Priority: Major
>  Labels: docs, newbie
>
> There are a handful of client configs that can't be set by Streams users for 
> various reasons, such as the group id, but we seem to have missed a few of 
> them in the documentation 
> [here|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]:
>  the partitioner assignor (Consumer) and partitioner (Producer).
> This section of the docs also just needs to be cleaned up in general as there 
> is overlap between the [Default 
> Values|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#default-values]
>  and [Parameters controlled by Kafka 
> Streams|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]
>  sections, and the table of contents is messed up presumably due to an issue 
> with the section headers.
> We should separate these with one section covering (only) configs where 
> Streams sets a different default but this can still be overridden by the 
> user, and the other section covering the configs that Streams hardcodes and 
> users can never override.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Apache Kafka 3.7.0 Release

2024-01-05 Thread Matthias J. Sax
One more blocker (memory leak) for 3.7: 
https://issues.apache.org/jira/browse/KAFKA-16086



-Matthias

On 1/4/24 9:53 PM, Stanislav Kozlovski wrote:

Thanks Apoorv, I was going to update the mailing thread as well.

Major kudos to Apoorv for the thorough work debugging and getting to the
bottom of this tricky publishing issue, a subtle regression from the work
he did in making the kafka-clients jar shadowed.

On Thu, Jan 4, 2024 at 5:09 PM Apoorv Mittal 
wrote:


Hi Stan,
I have opened the minor PR: https://github.com/apache/kafka/pull/15127 to
fix publishing the dependency. Once discussed and merged in trunk, I'll
update the 3.7 branch as well.

Regards,
Apoorv Mittal
+44 7721681581


On Thu, Jan 4, 2024 at 12:49 PM Matthias J. Sax  wrote:


We found a blocker for 3.7:
https://issues.apache.org/jira/browse/KAFKA-16077

Already having a PR under review to fix it.


-Matthias

On 1/3/24 10:43 AM, Stanislav Kozlovski wrote:

Hey all, happy new year.

Thanks for the heads up Almog. Makes sense.

To give an update - I haven't been able to resolve the gradlewAll

publish

failure, and as such haven't been able to release an RC.
As a minor barrier, I have to also update the year in the NOTICE file,
otherwise the release script won't let me continue -
https://github.com/apache/kafka/pull/15111

Me and Apoorv synced offline and ran a few tests to debug the issue
regarding the clients build. I successfully executed `publish` when
pointing toward a custom jfrog repo with both JDK 8 and 17. Inspecting

the

debug logs, the task that previously failed
`:clients:publishMavenJavaPublicationToMavenRepository'` passed
successfully. Here's a sample of the logs -




https://gist.github.com/stanislavkozlovski/841060cb467ec1d179cc9f293c8702e7


Having read the release.py script a few times, I am not able to see

what

is

different in the setup there. It simply clones the repo anew, gets the

3.7

branch and runs the same command.

At this point, I am contemplating pushing a commit to 3.7 that modifies

the

release.py file that enables debug on the command:
diff --git a/release.py b/release.py
index 43c5809861..e299e10e74 100755
--- a/release.py
+++ b/release.py
@@ -675,7 +675,7 @@ with
open(os.path.expanduser("~/.gradle/gradle.properties")) as f:
   contents = f.read()
   if not user_ok("Going to build and upload mvn artifacts based on

these

settings:\n" + contents + '\nOK (y/n)?: '):
   fail("Retry again later")
-cmd("Building and uploading archives", "./gradlewAll publish",
cwd=kafka_dir, env=jdk8_env, shell=True)
+cmd("Building and uploading archives", "./gradlewAll publish --debug",
cwd=kafka_dir, env=jdk8_env, shell=True)
   cmd("Building and uploading archives", "mvn deploy -Pgpg-signing",
cwd=streams_quickstart_dir, env=jdk8_env, shell=True)

   release_notification_props = { 'release_version': release_version,
(END)

and continuing to debug through that.

Since the release.py script grabs a new copy of origin, we have to

modify

upstream. An alternative is for me to use my local github Kafka repo,

but

that may result in the script pushing a build of that into the remote
servers.

On Tue, Jan 2, 2024 at 8:17 PM Almog Gavra 

wrote:



Hello Stan,

I wanted to give you a heads up that
https://github.com/apache/kafka/pull/15073 (
https://issues.apache.org/jira/browse/KAFKA-16046) was identified as

a

blocker regression and should be merged to trunk by EOD.

Cheers,
Almog

On Tue, Jan 2, 2024 at 4:20 AM Stanislav Kozlovski
 wrote:


Hi Apoorv,

Thanks for taking ownership and looking into this! One more caveat is

that

I believe this first publish is ran with JDK 8, as the release.py

runs

with

both JDK 8 and (if I recall correctly) 17 versions. This seems to

fail

on

the first one - so JDK 8.
Not sure if that is related in any way. And I'm also not sure if it

should

be kafka-clients or just clients.

On Sat, Dec 30, 2023 at 10:48 AM Apoorv Mittal <

apoorvmitta...@gmail.com


wrote:


Hi Stan,
Thanks for looking into the release. I worked with `./gradlewAll
publishToMavenLocal` which generates the respective

`kafka-clients.jar`

and deploys to maven local, I believed that `./gradlewAll publish`

should

just publish the artifacts to remote repository and hence should

always

work as jars successfully gets deployed to local maven.

Though now I set up the remote private maven repository for myself

(on

jfrog) and tried `./gradlewAll publish` on the 3.7 branch and
successfully completed the build with all artifacts uploaded to the

remote

repository. What seems strange to me is the error you mentioned in

the

previous email regarding the reference of the clients jar. I suppose

the

reference should be to `kafka-clients.jar` rather than

`clients.jar`,

I

might be missing if something else gets triggered in the release

pipeline.

Do you think I should set up the remote repository as per the

instru

Re: Apache Kafka 3.7.0 Release

2024-01-04 Thread Matthias J. Sax
We found a blocker for 3.7: 
https://issues.apache.org/jira/browse/KAFKA-16077


Already having a PR under review to fix it.


-Matthias

On 1/3/24 10:43 AM, Stanislav Kozlovski wrote:

Hey all, happy new year.

Thanks for the heads up Almog. Makes sense.

To give an update - I haven't been able to resolve the gradlewAll publish
failure, and as such haven't been able to release an RC.
As a minor barrier, I have to also update the year in the NOTICE file,
otherwise the release script won't let me continue -
https://github.com/apache/kafka/pull/15111

Me and Apoorv synced offline and ran a few tests to debug the issue
regarding the clients build. I successfully executed `publish` when
pointing toward a custom jfrog repo with both JDK 8 and 17. Inspecting the
debug logs, the task that previously failed
`:clients:publishMavenJavaPublicationToMavenRepository'` passed
successfully. Here's a sample of the logs -
https://gist.github.com/stanislavkozlovski/841060cb467ec1d179cc9f293c8702e7

Having read the release.py script a few times, I am not able to see what is
different in the setup there. It simply clones the repo anew, gets the 3.7
branch and runs the same command.

At this point, I am contemplating pushing a commit to 3.7 that modifies the
release.py file that enables debug on the command:
diff --git a/release.py b/release.py
index 43c5809861..e299e10e74 100755
--- a/release.py
+++ b/release.py
@@ -675,7 +675,7 @@ with
open(os.path.expanduser("~/.gradle/gradle.properties")) as f:
  contents = f.read()
  if not user_ok("Going to build and upload mvn artifacts based on these
settings:\n" + contents + '\nOK (y/n)?: '):
  fail("Retry again later")
-cmd("Building and uploading archives", "./gradlewAll publish",
cwd=kafka_dir, env=jdk8_env, shell=True)
+cmd("Building and uploading archives", "./gradlewAll publish --debug",
cwd=kafka_dir, env=jdk8_env, shell=True)
  cmd("Building and uploading archives", "mvn deploy -Pgpg-signing",
cwd=streams_quickstart_dir, env=jdk8_env, shell=True)

  release_notification_props = { 'release_version': release_version,
(END)

and continuing to debug through that.

Since the release.py script grabs a new copy of origin, we have to modify
upstream. An alternative is for me to use my local github Kafka repo, but
that may result in the script pushing a build of that into the remote
servers.

On Tue, Jan 2, 2024 at 8:17 PM Almog Gavra  wrote:


Hello Stan,

I wanted to give you a heads up that
https://github.com/apache/kafka/pull/15073 (
https://issues.apache.org/jira/browse/KAFKA-16046) was identified as a
blocker regression and should be merged to trunk by EOD.

Cheers,
Almog

On Tue, Jan 2, 2024 at 4:20 AM Stanislav Kozlovski
 wrote:


Hi Apoorv,

Thanks for taking ownership and looking into this! One more caveat is

that

I believe this first publish is ran with JDK 8, as the release.py runs

with

both JDK 8 and (if I recall correctly) 17 versions. This seems to fail on
the first one - so JDK 8.
Not sure if that is related in any way. And I'm also not sure if it

should

be kafka-clients or just clients.

On Sat, Dec 30, 2023 at 10:48 AM Apoorv Mittal 
Hi Stan,
Thanks for looking into the release. I worked with `./gradlewAll
publishToMavenLocal` which generates the respective `kafka-clients.jar`
and deploys to maven local, I believed that `./gradlewAll publish`

should

just publish the artifacts to remote repository and hence should always
work as jars successfully gets deployed to local maven.

Though now I set up the remote private maven repository for myself (on
jfrog) and tried `./gradlewAll publish` on the 3.7 branch and
successfully completed the build with all artifacts uploaded to the

remote

repository. What seems strange to me is the error you mentioned in the
previous email regarding the reference of the clients jar. I suppose

the

reference should be to `kafka-clients.jar` rather than `clients.jar`, I
might be missing if something else gets triggered in the release

pipeline.

Do you think I should set up the remote repository as per the

instructions

in `release.py` and try running `./release.py` as that might do

something

different, though I suspect that it should?

[image: Screenshot 2023-12-30 at 9.33.42 AM.png]


Regards,
Apoorv Mittal


On Fri, Dec 29, 2023 at 2:13 AM Colin McCabe 

wrote:



Just to update this thread, everything in KAFKA-14127 is done now. A

few

tasks got moved to a separate umbrella JIRA.

Some folks are going to do more testing, both manual and automated, in
the next week or two. I think this will give us a good indicator of
stability and what we need to fix.

Right now I'm leaning towards just making it GA since that's how most
features work. It's kind of rare for us to do a multi-step rollout for

new

features.

best,
Colin


On Wed, Dec 20, 2023, at 03:43, Mickael Maison wrote:

Hi,

With the current timeline for 3.7, I tend to agree with Viktor that
JBOD support in KRaft is unlikely to receive the extensive testing
this 

[jira] [Updated] (KAFKA-16077) Streams fails to close task after restoration when input partitions are updated

2024-01-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16077:

Priority: Blocker  (was: Critical)

> Streams fails to close task after restoration when input partitions are 
> updated
> ---
>
> Key: KAFKA-16077
> URL: https://issues.apache.org/jira/browse/KAFKA-16077
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Blocker
>
> There is a race condition in the state updater that can cause the following:
>  # We have an active task in the state updater
>  # We get fenced. We recreate the producer, transactions now uninitialized. 
> We ask the state updater to give back the task, add a pending action to close 
> the task clean once it’s handed back
>  # We get a new assignment with updated input partitions. The task is still 
> owned by the state updater, so we ask the state updater again to hand it back 
> and add a pending action to update its input partition
>  # The task is handed back by the state updater. We update its input 
> partitions but forget to close it clean (pending action was overwritten)
>  # Now the task is in an initialized state, but the underlying producer does 
> not have transactions initialized
> This can lead to an exception like this:
> {code:java}
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-org.apache.kafka.streams.errors.StreamsException:
>  Exception caught in process. taskId=1_0, 
> processor=KSTREAM-SOURCE-05, topic=node-name-repartition, 
> partition=0, offset=618798, stacktrace=java.lang.IllegalStateException: 
> TransactionalId stream-soak-test-d647640a-12e5-4e74-a0af-e105d0d0cb67-2: 
> Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:999)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:985)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:311)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:660)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.maybeBeginTransaction(StreamsProducer.java:240)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:258)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:253)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.doJoin(KStreamKTableJoinProcessor.java:130)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:99)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/s

Re: [ANNOUNCE] New Kafka PMC Member: Divij Vaidya

2024-01-02 Thread Matthias J. Sax

Congrats!

On 12/29/23 9:40 AM, Viktor Somogyi-Vass wrote:

Congrats Divij, well deserved!

On Fri, Dec 29, 2023, 09:36 Lucas Brutschy 
wrote:


Congratulations, Divij!

On Fri, Dec 29, 2023 at 1:32 AM Colin McCabe  wrote:


Congratulations, Divij!

best,
Colin

On Thu, Dec 28, 2023, at 11:38, Bruno Cadonna wrote:

Congratulations Divij! Well deserved!

Best,
Bruno

On 12/27/23 12:45 PM, Luke Chen wrote:

Hi, Everyone,

Divij has been a Kafka committer since June, 2023. He has remained

very

active and instructive in the community since becoming a committer.

It's my

pleasure to announce that Divij is now a member of Kafka PMC.

Congratulations Divij!

Luke
on behalf of Apache Kafka PMC







Re: [DISCUSS] Connect Jira component name

2024-01-02 Thread Matthias J. Sax

SGTM.

I found it always odd that it's named `KafkaConnect` :)


-Matthias

On 12/26/23 6:03 PM, Greg Harris wrote:

Hi Connect Developers,

I noticed recently that we had two Jira components: "KafkaConnect" and
"connect", one with >1000 issues and one with 20 issues. I merged the
two tags, leaving the one labeled "KafkaConnect".

"KafkaConnect" doesn't follow the standard naming convention set by
all of the other components, while "connect" does. Should we rename
"KafkaConnect" to "connect" moving forward?

Forgive me for bikeshedding,
Greg


[jira] [Updated] (KAFKA-16055) Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders

2024-01-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16055:

Component/s: streams

> Thread unsafe use of HashMap stored in QueryableStoreProvider#storeProviders
> 
>
> Key: KAFKA-16055
> URL: https://issues.apache.org/jira/browse/KAFKA-16055
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Kohei Nozaki
>Priority: Minor
>
> This was originally raised in [a kafka-users 
> post|https://lists.apache.org/thread/gpct1275bfqovlckptn3lvf683qpoxol].
> There is a HashMap stored in QueryableStoreProvider#storeProviders ([code 
> link|https://github.com/apache/kafka/blob/3.6.1/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java#L39])
>  which can be mutated by a KafkaStreams#removeStreamThread() call. This can 
> be problematic when KafkaStreams#store is called from a separate thread.
> We need to somehow make this part of code thread-safe by replacing it by 
> ConcurrentHashMap or/and using an existing locking mechanism.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16046) Stream Stream Joins fail after restoration with deserialization exceptions

2023-12-23 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16046:

Fix Version/s: 3.7.0

> Stream Stream Joins fail after restoration with deserialization exceptions
> --
>
> Key: KAFKA-16046
> URL: https://issues.apache.org/jira/browse/KAFKA-16046
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Almog Gavra
>Assignee: Almog Gavra
>Priority: Blocker
>  Labels: streams
> Fix For: 3.7.0
>
>
> Before KIP-954, the `KStreamImplJoin` class would always create 
> non-timestamped persistent windowed stores. After that KIP, the default was 
> changed to create timestamped stores. This wasn't compatible because, during 
> restoration, timestamped stores have their changelog values transformed to 
> prepend the timestamp to the value. This caused serialization errors when 
> trying to read from the store because the deserializers did not expect the 
> timestamp to be prepended.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16018) KafkaStreams can go into a zombie state if UncaughtExceptionHandler is specified via the deprecated method

2023-12-20 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798908#comment-17798908
 ] 

Matthias J. Sax commented on KAFKA-16018:
-

Timing is a little unclear to be fair – there is an open discussion if the next 
release (ie, after 3.7 that we are currently rolling out) will be 3.8 or 4.0... 
But even if it's 3.8, it's an open question if it's worth to put a fix into 3.8 
or just wait for 4.0 (not sure how complex a fix would be...)

> KafkaStreams can go into a zombie state if UncaughtExceptionHandler is 
> specified via the deprecated method
> --
>
> Key: KAFKA-16018
> URL: https://issues.apache.org/jira/browse/KAFKA-16018
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
>Reporter: Tommy Becker
>Priority: Major
>
> We have a streams application in which all StreamThreads died due to a lack 
> of disk space. To our surprise, the KafkaStreams instance still reported its 
> state as running. Upon further investigation, it appears this is due to the 
> application setting an UncaughtExceptionHandler via the deprecated method 
> (this application was recently upgraded from 2.4.1): 
> [https://kafka.apache.org/33/javadoc/org/apache/kafka/streams/KafkaStreams.html#setUncaughtExceptionHandler(java.lang.Thread.UncaughtExceptionHandler)]
> The only way a StreamThread failure can cause the KafkaStreams instance to 
> transition to an error state now is via the new 
> StreamsUncaughtExceptionHandler machinery, but when an old 
> UncaughtExceptionHandler is set by the old method this code is effectively 
> bypassed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15448) Streams StandbyTaskUpdateListener

2023-12-19 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15448.
-
Fix Version/s: 3.7.0
 Assignee: Colt McNealy
   Resolution: Fixed

> Streams StandbyTaskUpdateListener
> -
>
> Key: KAFKA-15448
> URL: https://issues.apache.org/jira/browse/KAFKA-15448
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Colt McNealy
>Assignee: Colt McNealy
>Priority: Minor
>  Labels: kip
> Fix For: 3.7.0
>
>
> KIP-869: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-988%3A+Streams+Standby+Update+Listener]
> In addition to the new metrics in KIP-869, it would be great to have a 
> callback that allows for monitoring of Standby Task status. The 
> StateRestoreListener is currently not called for Standby Tasks for good 
> reasons (the API wouldn't make sense for Standby). I've attached an interface 
> which would be nice to have:
>  
> ```
> public interface StandbyTaskUpdateListener {
> ​
> public enum SuspendReason
> { MIGRATED, PROMOTED; }
>  
> /**
>  * Method called upon the creation of the Standby Task.
> *
>  * @param topicPartition the TopicPartition of the Standby Task.
>  * @param storeName the name of the store being watched by this Standby Task.
>  * @param earliestOffset the earliest offset available on the Changelog topic.
>  * @param startingOffset the offset from which the Standby Task starts 
> watching.
>  * @param currentEndOffset the current latest offset on the associated 
> changelog partition.
> */
> void onTaskCreated(final TopicPartition topicPartition,
> final String storeName,
> final long earliestOffset
> final long startingOffset,
> final long currentEndOffset);
> ​
> /**
>  * Method called after restoring a batch of records. In this case the maximum 
> size of the batch is whatever
>  * the value of the MAX_POLL_RECORDS is set to.
> *
>  * This method is called after restoring each batch and it is advised to keep 
> processing to a minimum.
>  * Any heavy processing will hold up recovering the next batch, hence slowing 
> down the restore process as a
>  * whole.
> *
>  * If you need to do any extended processing or connecting to an external 
> service consider doing so asynchronously.
> *
>  * @param topicPartition the TopicPartition containing the values to restore
>  * @param storeName the name of the store undergoing restoration
>  * @param batchEndOffset the inclusive ending offset for the current restored 
> batch for this TopicPartition
>  * @param numRestored the total number of records restored in this batch for 
> this TopicPartition
>  * @param currentEndOffset the current end offset of the changelog topic 
> partition.
> */
> void onBatchRestored(final TopicPartition topicPartition,
> final String storeName,
> final long batchEndOffset,
> final long numRestored,
> final long currentEndOffset);
> ​
> /**
>  * Method called after a Standby Task is closed, either because the task 
> migrated to a new instance or because the task was promoted to an Active task.
> */
> void onTaskSuspended(final TopicPartition topicPartition,
> final String storeName,
> final long storeOffset,
> final long currentEndOffset,
> final SuspendReason reason);
> }
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15448) Streams StandbyTaskUpdateListener

2023-12-19 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15448:

Issue Type: New Feature  (was: Improvement)

> Streams StandbyTaskUpdateListener
> -
>
> Key: KAFKA-15448
> URL: https://issues.apache.org/jira/browse/KAFKA-15448
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Colt McNealy
>Assignee: Colt McNealy
>Priority: Minor
>  Labels: kip
> Fix For: 3.7.0
>
>
> KIP-869: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-988%3A+Streams+Standby+Update+Listener]
> In addition to the new metrics in KIP-869, it would be great to have a 
> callback that allows for monitoring of Standby Task status. The 
> StateRestoreListener is currently not called for Standby Tasks for good 
> reasons (the API wouldn't make sense for Standby). I've attached an interface 
> which would be nice to have:
>  
> ```
> public interface StandbyTaskUpdateListener {
> ​
> public enum SuspendReason
> { MIGRATED, PROMOTED; }
>  
> /**
>  * Method called upon the creation of the Standby Task.
> *
>  * @param topicPartition the TopicPartition of the Standby Task.
>  * @param storeName the name of the store being watched by this Standby Task.
>  * @param earliestOffset the earliest offset available on the Changelog topic.
>  * @param startingOffset the offset from which the Standby Task starts 
> watching.
>  * @param currentEndOffset the current latest offset on the associated 
> changelog partition.
> */
> void onTaskCreated(final TopicPartition topicPartition,
> final String storeName,
> final long earliestOffset
> final long startingOffset,
> final long currentEndOffset);
> ​
> /**
>  * Method called after restoring a batch of records. In this case the maximum 
> size of the batch is whatever
>  * the value of the MAX_POLL_RECORDS is set to.
> *
>  * This method is called after restoring each batch and it is advised to keep 
> processing to a minimum.
>  * Any heavy processing will hold up recovering the next batch, hence slowing 
> down the restore process as a
>  * whole.
> *
>  * If you need to do any extended processing or connecting to an external 
> service consider doing so asynchronously.
> *
>  * @param topicPartition the TopicPartition containing the values to restore
>  * @param storeName the name of the store undergoing restoration
>  * @param batchEndOffset the inclusive ending offset for the current restored 
> batch for this TopicPartition
>  * @param numRestored the total number of records restored in this batch for 
> this TopicPartition
>  * @param currentEndOffset the current end offset of the changelog topic 
> partition.
> */
> void onBatchRestored(final TopicPartition topicPartition,
> final String storeName,
> final long batchEndOffset,
> final long numRestored,
> final long currentEndOffset);
> ​
> /**
>  * Method called after a Standby Task is closed, either because the task 
> migrated to a new instance or because the task was promoted to an Active task.
> */
> void onTaskSuspended(final TopicPartition topicPartition,
> final String storeName,
> final long storeOffset,
> final long currentEndOffset,
> final SuspendReason reason);
> }
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15448) Streams StandbyTaskUpdateListener

2023-12-19 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15448.
-
Fix Version/s: 3.7.0
 Assignee: Colt McNealy
   Resolution: Fixed

> Streams StandbyTaskUpdateListener
> -
>
> Key: KAFKA-15448
> URL: https://issues.apache.org/jira/browse/KAFKA-15448
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Colt McNealy
>Assignee: Colt McNealy
>Priority: Minor
>  Labels: kip
> Fix For: 3.7.0
>
>
> KIP-869: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-988%3A+Streams+Standby+Update+Listener]
> In addition to the new metrics in KIP-869, it would be great to have a 
> callback that allows for monitoring of Standby Task status. The 
> StateRestoreListener is currently not called for Standby Tasks for good 
> reasons (the API wouldn't make sense for Standby). I've attached an interface 
> which would be nice to have:
>  
> ```
> public interface StandbyTaskUpdateListener {
> ​
> public enum SuspendReason
> { MIGRATED, PROMOTED; }
>  
> /**
>  * Method called upon the creation of the Standby Task.
> *
>  * @param topicPartition the TopicPartition of the Standby Task.
>  * @param storeName the name of the store being watched by this Standby Task.
>  * @param earliestOffset the earliest offset available on the Changelog topic.
>  * @param startingOffset the offset from which the Standby Task starts 
> watching.
>  * @param currentEndOffset the current latest offset on the associated 
> changelog partition.
> */
> void onTaskCreated(final TopicPartition topicPartition,
> final String storeName,
> final long earliestOffset
> final long startingOffset,
> final long currentEndOffset);
> ​
> /**
>  * Method called after restoring a batch of records. In this case the maximum 
> size of the batch is whatever
>  * the value of the MAX_POLL_RECORDS is set to.
> *
>  * This method is called after restoring each batch and it is advised to keep 
> processing to a minimum.
>  * Any heavy processing will hold up recovering the next batch, hence slowing 
> down the restore process as a
>  * whole.
> *
>  * If you need to do any extended processing or connecting to an external 
> service consider doing so asynchronously.
> *
>  * @param topicPartition the TopicPartition containing the values to restore
>  * @param storeName the name of the store undergoing restoration
>  * @param batchEndOffset the inclusive ending offset for the current restored 
> batch for this TopicPartition
>  * @param numRestored the total number of records restored in this batch for 
> this TopicPartition
>  * @param currentEndOffset the current end offset of the changelog topic 
> partition.
> */
> void onBatchRestored(final TopicPartition topicPartition,
> final String storeName,
> final long batchEndOffset,
> final long numRestored,
> final long currentEndOffset);
> ​
> /**
>  * Method called after a Standby Task is closed, either because the task 
> migrated to a new instance or because the task was promoted to an Active task.
> */
> void onTaskSuspended(final TopicPartition topicPartition,
> final String storeName,
> final long storeOffset,
> final long currentEndOffset,
> final SuspendReason reason);
> }
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15448) Streams StandbyTaskUpdateListener

2023-12-19 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15448:

Labels: kip  (was: needs-kip)

> Streams StandbyTaskUpdateListener
> -
>
> Key: KAFKA-15448
> URL: https://issues.apache.org/jira/browse/KAFKA-15448
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Colt McNealy
>Priority: Minor
>  Labels: kip
>
> KIP-869: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-988%3A+Streams+Standby+Update+Listener]
> In addition to the new metrics in KIP-869, it would be great to have a 
> callback that allows for monitoring of Standby Task status. The 
> StateRestoreListener is currently not called for Standby Tasks for good 
> reasons (the API wouldn't make sense for Standby). I've attached an interface 
> which would be nice to have:
>  
> ```
> public interface StandbyTaskUpdateListener {
> ​
> public enum SuspendReason
> { MIGRATED, PROMOTED; }
>  
> /**
>  * Method called upon the creation of the Standby Task.
> *
>  * @param topicPartition the TopicPartition of the Standby Task.
>  * @param storeName the name of the store being watched by this Standby Task.
>  * @param earliestOffset the earliest offset available on the Changelog topic.
>  * @param startingOffset the offset from which the Standby Task starts 
> watching.
>  * @param currentEndOffset the current latest offset on the associated 
> changelog partition.
> */
> void onTaskCreated(final TopicPartition topicPartition,
> final String storeName,
> final long earliestOffset
> final long startingOffset,
> final long currentEndOffset);
> ​
> /**
>  * Method called after restoring a batch of records. In this case the maximum 
> size of the batch is whatever
>  * the value of the MAX_POLL_RECORDS is set to.
> *
>  * This method is called after restoring each batch and it is advised to keep 
> processing to a minimum.
>  * Any heavy processing will hold up recovering the next batch, hence slowing 
> down the restore process as a
>  * whole.
> *
>  * If you need to do any extended processing or connecting to an external 
> service consider doing so asynchronously.
> *
>  * @param topicPartition the TopicPartition containing the values to restore
>  * @param storeName the name of the store undergoing restoration
>  * @param batchEndOffset the inclusive ending offset for the current restored 
> batch for this TopicPartition
>  * @param numRestored the total number of records restored in this batch for 
> this TopicPartition
>  * @param currentEndOffset the current end offset of the changelog topic 
> partition.
> */
> void onBatchRestored(final TopicPartition topicPartition,
> final String storeName,
> final long batchEndOffset,
> final long numRestored,
> final long currentEndOffset);
> ​
> /**
>  * Method called after a Standby Task is closed, either because the task 
> migrated to a new instance or because the task was promoted to an Active task.
> */
> void onTaskSuspended(final TopicPartition topicPartition,
> final String storeName,
> final long storeOffset,
> final long currentEndOffset,
> final SuspendReason reason);
> }
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15448) Streams StandbyTaskUpdateListener

2023-12-19 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15448:

Description: 
KIP-869: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-988%3A+Streams+Standby+Update+Listener]

In addition to the new metrics in KIP-869, it would be great to have a callback 
that allows for monitoring of Standby Task status. The StateRestoreListener is 
currently not called for Standby Tasks for good reasons (the API wouldn't make 
sense for Standby). I've attached an interface which would be nice to have:

 

```
public interface StandbyTaskUpdateListener {
​
public enum SuspendReason

{ MIGRATED, PROMOTED; }

 
/**
 * Method called upon the creation of the Standby Task.
*
 * @param topicPartition the TopicPartition of the Standby Task.
 * @param storeName the name of the store being watched by this Standby Task.
 * @param earliestOffset the earliest offset available on the Changelog topic.
 * @param startingOffset the offset from which the Standby Task starts watching.
 * @param currentEndOffset the current latest offset on the associated 
changelog partition.
*/
void onTaskCreated(final TopicPartition topicPartition,
final String storeName,
final long earliestOffset
final long startingOffset,
final long currentEndOffset);
​
/**
 * Method called after restoring a batch of records. In this case the maximum 
size of the batch is whatever
 * the value of the MAX_POLL_RECORDS is set to.
*
 * This method is called after restoring each batch and it is advised to keep 
processing to a minimum.
 * Any heavy processing will hold up recovering the next batch, hence slowing 
down the restore process as a
 * whole.
*
 * If you need to do any extended processing or connecting to an external 
service consider doing so asynchronously.
*
 * @param topicPartition the TopicPartition containing the values to restore
 * @param storeName the name of the store undergoing restoration
 * @param batchEndOffset the inclusive ending offset for the current restored 
batch for this TopicPartition
 * @param numRestored the total number of records restored in this batch for 
this TopicPartition
 * @param currentEndOffset the current end offset of the changelog topic 
partition.
*/
void onBatchRestored(final TopicPartition topicPartition,
final String storeName,
final long batchEndOffset,
final long numRestored,
final long currentEndOffset);
​
/**
 * Method called after a Standby Task is closed, either because the task 
migrated to a new instance or because the task was promoted to an Active task.
*/
void onTaskSuspended(final TopicPartition topicPartition,
final String storeName,
final long storeOffset,
final long currentEndOffset,
final SuspendReason reason);
}
```

  was:
In addition to the new metrics in KIP-869, it would be great to have a callback 
that allows for monitoring of Standby Task status. The StateRestoreListener is 
currently not called for Standby Tasks for good reasons (the API wouldn't make 
sense for Standby). I've attached an interface which would be nice to have:

 

```
public interface StandbyTaskUpdateListener {
​
public enum SuspendReason {
MIGRATED,
PROMOTED;
}
 
/**
* Method called upon the creation of the Standby Task.
*
* @param topicPartition the TopicPartition of the Standby Task.
* @param storeName the name of the store being watched by this Standby Task.
* @param earliestOffset the earliest offset available on the Changelog topic.
* @param startingOffset the offset from which the Standby Task starts watching.
* @param currentEndOffset the current latest offset on the associated changelog 
partition.
*/
void onTaskCreated(final TopicPartition topicPartition,
final String storeName,
final long earliestOffset
final long startingOffset,
final long currentEndOffset);
​
/**
* Method called after restoring a batch of records. In this case the maximum 
size of the batch is whatever
* the value of the MAX_POLL_RECORDS is set to.
*
* This method is called after restoring each batch and it is advised to keep 
processing to a minimum.
* Any heavy processing will hold up recovering the next batch, hence slowing 
down the restore process as a
* whole.
*
* If you need to do any extended processing or connecting to an external 
service consider doing so asynchronously.
*
* @param topicPartition the TopicPartition containing the values to restore
* @param storeName the name of the store undergoing restoration
* @param batchEndOffset the inclusive ending offset for the current restored 
batch for this TopicPartition
* @param numRestored the total number of records restored in this batch for 
this TopicPartition
* @param currentEndOffset the current end offset of the changelog topic 
partition.
*/
void onBatchRestored(final TopicPartition topicPartition,
final String storeName,
final long batchEndOffset,
final long numRestored,
final long currentEndOffset);
​
/**
* Method called after

[jira] [Commented] (KAFKA-16018) KafkaStreams can go into a zombie state if UncaughtExceptionHandler is specified via the deprecated method

2023-12-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798137#comment-17798137
 ] 

Matthias J. Sax commented on KAFKA-16018:
-

Seems this will "auto fix" in 4.0 when we remove the deprecated API?

> KafkaStreams can go into a zombie state if UncaughtExceptionHandler is 
> specified via the deprecated method
> --
>
> Key: KAFKA-16018
> URL: https://issues.apache.org/jira/browse/KAFKA-16018
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
>Reporter: Tommy Becker
>Priority: Major
>
> We have a streams application in which all StreamThreads died due to a lack 
> of disk space. To our surprise, the KafkaStreams instance still reported its 
> state as running. Upon further investigation, it appears this is due to the 
> application setting an UncaughtExceptionHandler via the deprecated method 
> (this application was recently upgraded from 2.4.1): 
> [https://kafka.apache.org/33/javadoc/org/apache/kafka/streams/KafkaStreams.html#setUncaughtExceptionHandler(java.lang.Thread.UncaughtExceptionHandler)]
> The only way a StreamThread failure can cause the KafkaStreams instance to 
> transition to an error state now is via the new 
> StreamsUncaughtExceptionHandler machinery, but when an old 
> UncaughtExceptionHandler is set by the old method this code is effectively 
> bypassed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-9545) Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`

2023-12-12 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-9545.

Resolution: Fixed

> Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`
> --
>
> Key: KAFKA-9545
> URL: https://issues.apache.org/jira/browse/KAFKA-9545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jason Gustafson
>Assignee: Ashwin Pankaj
>Priority: Major
>
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4678/testReport/org.apache.kafka.streams.integration/RegexSourceIntegrationTest/testRegexMatchesTopicsAWhenDeleted/
> {code}
> java.lang.AssertionError: Condition not met within timeout 15000. Stream 
> tasks not updated
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:367)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:415)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:383)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:366)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:337)
>   at 
> org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted(RegexSourceIntegrationTest.java:224)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-9545) Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`

2023-12-12 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-9545.

Resolution: Fixed

> Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`
> --
>
> Key: KAFKA-9545
> URL: https://issues.apache.org/jira/browse/KAFKA-9545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jason Gustafson
>Assignee: Ashwin Pankaj
>Priority: Major
>
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4678/testReport/org.apache.kafka.streams.integration/RegexSourceIntegrationTest/testRegexMatchesTopicsAWhenDeleted/
> {code}
> java.lang.AssertionError: Condition not met within timeout 15000. Stream 
> tasks not updated
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:367)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:415)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:383)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:366)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:337)
>   at 
> org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted(RegexSourceIntegrationTest.java:224)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15027) Implement rack aware assignment for standby tasks

2023-12-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15027:

Labels: kip  (was: )

> Implement rack aware assignment for standby tasks
> -
>
> Key: KAFKA-15027
> URL: https://issues.apache.org/jira/browse/KAFKA-15027
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Hao Li
>Assignee: Hao Li
>Priority: Major
>  Labels: kip
> Fix For: 3.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15024) Add cost function for task/client

2023-12-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15024:

Labels: kip  (was: )

> Add cost function for task/client
> -
>
> Key: KAFKA-15024
> URL: https://issues.apache.org/jira/browse/KAFKA-15024
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Hao Li
>Assignee: Hao Li
>Priority: Major
>  Labels: kip
> Fix For: 3.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15054) Add configs and logic to decide if rack aware assignment should be enabled

2023-12-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15054:

Labels: kip  (was: )

> Add configs and logic to decide if rack aware assignment should be enabled
> --
>
> Key: KAFKA-15054
> URL: https://issues.apache.org/jira/browse/KAFKA-15054
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Hao Li
>Assignee: Hao Li
>Priority: Major
>  Labels: kip
> Fix For: 3.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15025) Implement min-cost flow without balancing tasks for same subtopology

2023-12-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15025:

Labels: kip  (was: )

> Implement min-cost flow without balancing tasks for same subtopology
> 
>
> Key: KAFKA-15025
> URL: https://issues.apache.org/jira/browse/KAFKA-15025
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Hao Li
>Assignee: Hao Li
>Priority: Major
>  Labels: kip
> Fix For: 3.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-15026) Implement min-cost flow balancing tasks for same subtopology

2023-12-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax closed KAFKA-15026.
---

> Implement min-cost flow balancing tasks for same subtopology
> 
>
> Key: KAFKA-15026
> URL: https://issues.apache.org/jira/browse/KAFKA-15026
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Hao Li
>Assignee: Hao Li
>Priority: Major
>  Labels: kip
> Fix For: 3.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15023) Get rack information for source topic partitions for a task

2023-12-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15023:

Labels: kip  (was: )

> Get rack information for source topic partitions for a task
> ---
>
> Key: KAFKA-15023
> URL: https://issues.apache.org/jira/browse/KAFKA-15023
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Hao Li
>Assignee: Hao Li
>Priority: Major
>  Labels: kip
> Fix For: 3.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15026) Implement min-cost flow balancing tasks for same subtopology

2023-12-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15026:

Labels: kip  (was: )

> Implement min-cost flow balancing tasks for same subtopology
> 
>
> Key: KAFKA-15026
> URL: https://issues.apache.org/jira/browse/KAFKA-15026
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Hao Li
>Assignee: Hao Li
>Priority: Major
>  Labels: kip
> Fix For: 3.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15026) Implement min-cost flow balancing tasks for same subtopology

2023-12-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15026.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

> Implement min-cost flow balancing tasks for same subtopology
> 
>
> Key: KAFKA-15026
> URL: https://issues.apache.org/jira/browse/KAFKA-15026
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Hao Li
>Assignee: Hao Li
>Priority: Major
> Fix For: 3.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15022) Support rack aware task assignment in Kafka streams

2023-12-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15022.
-
Fix Version/s: 3.7.0
   3.6.0
   Resolution: Fixed

> Support rack aware task assignment in Kafka streams 
> 
>
> Key: KAFKA-15022
> URL: https://issues.apache.org/jira/browse/KAFKA-15022
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hao Li
>Assignee: Hao Li
>Priority: Major
>  Labels: kip, kip-925
> Fix For: 3.7.0, 3.6.0
>
>
> For KIP-925: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15026) Implement min-cost flow balancing tasks for same subtopology

2023-12-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15026.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

> Implement min-cost flow balancing tasks for same subtopology
> 
>
> Key: KAFKA-15026
> URL: https://issues.apache.org/jira/browse/KAFKA-15026
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Hao Li
>Assignee: Hao Li
>Priority: Major
> Fix For: 3.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15022) Support rack aware task assignment in Kafka streams

2023-12-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15022.
-
Fix Version/s: 3.7.0
   3.6.0
   Resolution: Fixed

> Support rack aware task assignment in Kafka streams 
> 
>
> Key: KAFKA-15022
> URL: https://issues.apache.org/jira/browse/KAFKA-15022
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hao Li
>Assignee: Hao Li
>Priority: Major
>  Labels: kip, kip-925
> Fix For: 3.7.0, 3.6.0
>
>
> For KIP-925: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


<    1   2   3   4   5   6   7   8   9   10   >