[jira] [Updated] (KAFKA-16567) Add New Stream Metrics based on KIP-869

2024-04-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16567:

Priority: Major  (was: Blocker)

> Add New Stream Metrics based on KIP-869
> ---
>
> Key: KAFKA-16567
> URL: https://issues.apache.org/jira/browse/KAFKA-16567
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Walter Hernandez
>Priority: Major
>  Labels: kip
>
> Add the following metrics to the state updater:
>  * restoring-active-tasks: count
>  * restoring-standby-tasks: count
>  * paused-active-tasks: count
>  * paused-standby-tasks: count
>  * idle-ratio: percentage
>  * restore-ratio: percentage
>  * checkpoint-ratio: percentage
>  * restore-records-total: count
>  * restore-records-rate: rate
>  * restore-call-rate: rate



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16567) Add New Stream Metrics based on KIP-869

2024-04-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16567:

Fix Version/s: (was: 4.0.0)

> Add New Stream Metrics based on KIP-869
> ---
>
> Key: KAFKA-16567
> URL: https://issues.apache.org/jira/browse/KAFKA-16567
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Walter Hernandez
>Priority: Blocker
>  Labels: kip
>
> Add the following metrics to the state updater:
>  * restoring-active-tasks: count
>  * restoring-standby-tasks: count
>  * paused-active-tasks: count
>  * paused-standby-tasks: count
>  * idle-ratio: percentage
>  * restore-ratio: percentage
>  * checkpoint-ratio: percentage
>  * restore-records-total: count
>  * restore-records-rate: rate
>  * restore-call-rate: rate



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-22 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839905#comment-17839905
 ] 

Matthias J. Sax commented on KAFKA-16514:
-

Well, in general we can, but the internal flag / config we set on the consumer 
is immutable – configs are in general immutable, so it's not something we can 
just change. Thus, to get some flexibility into `close()` we need a KIP to 
change the consumer.

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Failed to initialize processor KSTREAM-AGGREGATE-0000000001

2024-04-21 Thread Matthias J. Sax
Not sure either, but it sounds like a bug to me. Can you reproduce this 
reliably? What version are you using?


It would be best if you could file a Jira ticket and we can take it from 
there.



-Matthias

On 4/21/24 5:38 PM, Penumarthi Durga Prasad Chowdary wrote:

Hi ,
I have an issue in kafka-streams while constructing kafka-streams state
store  windows(TimeWindow and SessionWindow). While kafka-streams
processing data sometimes intermittent kafka-streams process throwing below
error
ThreadName:
kafka-streams-exec-0-test-store-6d676cf0-3910-4c25-bfad-ea2b98953db3-StreamThread-9
TraceID: unknown  CorelationID: eff36722-1430-4ffb-bf2e-c6e6cf6ae164
  Message: stream-client [ kafka-streams-exec-0-test-store
-6d676cf0-3910-4c25-bfad-ea2b98953db3] Replacing thread in the streams
uncaught exception handler
org.apache.kafka.streams.errors.StreamsException: failed to initialize
processor KSTREAM-AGGREGATE-01
   at
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:115)
   at
org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:986)
   at
org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:271)
   at
org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:716)
   at
org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:901)
   at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:778)
   at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
   at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
  Caused by: java.lang.NullPointerException
   at
org.apache.kafka.streams.kstream.internals.TimestampedTupleForwarder.(TimestampedTupleForwarder.java:46)
   at
org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.init(KStreamSessionWindowAggregate.java:138)
   at
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:107)
   ... 7 more
Here my understanding is state-store is null and at that time
stateStore.flush() gets invoked to send the data to stateStore, this leads
to the above error. This error can be caught inside kafka-streams
setUncaughtExceptionHandler.
   streams.setUncaughtExceptionHandler(throwable -> {
   LOGGER.error("Exception in streams", throwable);
   return
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
   });
I'm uncertain about the exact reason for this issue. Everything seems to be
in order, including the Kafka cluster, and there are no errors in the Kafka
Streams except for a few logs indicating node disconnections.
Is there a better way to handle this error?
When can this issue happen ?
I would like to express my gratitude in advance for any assistance provided.


[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-21 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839441#comment-17839441
 ] 

Matthias J. Sax commented on KAFKA-16514:
-

You are right that there is alway a member-id etc – I am not sure though if 
generating a random group.instance.id would be the right way forward.

Maybe making the Consumer#close() call flexible and allow uses to pass in a 
CloseOption similar to what we do in KafkaStreams would be the cleaner 
approach? An alternative might be (not sure what the exact scope would be) to 
add a new AdminCiient method that allows to pass in a `member.id` to remove a 
consumer from the group?

Another question is: is this "hack" we put into KS to not send a leave group 
request still relevant? A lot of things got improved on the rebalance protocol 
over the years, and it might not be necessary any longer?

Curious to hear what [~ableegoldman] and [~cadonna] think.

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16567) Add New Stream Metrics based on KIP-869

2024-04-21 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839440#comment-17839440
 ] 

Matthias J. Sax commented on KAFKA-16567:
-

I see – this raises a few questions... Given that KIP-869 is not fully 
implemented yet, and the new metrics are not added, I am wondering if we can 
consider the other metric effectively deprecated or not?

[~cadonna] WDYT? Should we push out KAFKA-16336 to 5.0  relaese?

> Add New Stream Metrics based on KIP-869
> ---
>
> Key: KAFKA-16567
> URL: https://issues.apache.org/jira/browse/KAFKA-16567
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Walter Hernandez
>Assignee: Walter Hernandez
>Priority: Blocker
>  Labels: kip
> Fix For: 4.0.0
>
>
> Add the following metrics to the state updater:
>  * restoring-active-tasks: count
>  * restoring-standby-tasks: count
>  * paused-active-tasks: count
>  * paused-standby-tasks: count
>  * idle-ratio: percentage
>  * restore-ratio: percentage
>  * checkpoint-ratio: percentage
>  * restore-records-total: count
>  * restore-records-rate: rate
>  * restore-call-rate: rate



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-19 Thread Matthias J. Sax
One more thing. It might be good to clearly call out, which interfaced a 
user would implement, vs the other ones Kafka Streams implements and 
TaskAssignor only uses.


My understanding is, that users would implement `TaskAssignor`, 
`TaskAssignment`, and `StreamsClientAssignment`.


For `AssignedTask` it seems that users would actually only need to 
instantiate them. Should we add a public constructor?


Also wondering if we should add an empty default implementation for 
`onAssignmentComputed()` as it seems not to be strictly necessary to use 
this method?



-Matthias

On 4/19/24 7:30 PM, Matthias J. Sax wrote:

Great KIP. I have some minor comments/questions:


100 The KIP says: "In the future, additional plugins can use the same 
partition.assignor  prefix". What does this mean?



101 (nit) The KIP says: "Note that the thread-level assignment will 
remain an un-configurable internal implementation detail of the 
partition assignor (see "Rejected Alternatives" for further thoughts and 
reasoning)." -- When I was reading this the first time, I did not 
understand it, and it did only become clear later (eg while reading the 
discussion thread). I think it would be good to be a little bit more 
explicit, because this is not just some minor thing, but a core design 
decision (which I, btw, support).



102 (nit/typo): taskAssignor -> TaskAssignor (somewhere in the text).


103 (nit): "new non-internal package" -> replace 'non-internal' with 
'public' :)



104: Method name `TaskAssignor#onAssignmentComputed()` -> the name seems 
to be a little bit clumsy? I kinda like the original `finalAssignment()` 
-- I would also be happy with `onFinalAssignment` to address Bruno's 
line of thinking (which I think is a good call out). (Btw: 
`finalAssignment` is still used in the text on the KIP and should also 
be updated.)



105: Please remove all `private` variables. We should only show public 
stuff on the KIP. Everything else is an implementation detail.



106: `TaskAssignment#numStreamsClients()` -- why do we need this method? 
Seems calling `assignment()` gives as a collection and we can just call 
size() on it to get the same value? -- Also, why do we explicitly call 
out the overwrite of `toString()`; seems unnecessary?



107 `StreamsClientState#numStreamThreads` JavaDocs says: "Returns the 
number of StreamThreads on this client, which is equal to the number of 
main consumers and represents its overall capacity." -- Given our 
planned thread refactoring, this might not hold correct for long (and I 
am sure we will forget to updated the JavaDocs later). Talking to Lucas 
the plan is to cut down `StreamsThread` to host the consumer (and there 
will be only one, and it won't be configurable any longer), and we would 
introduce a number of configurable "processing threads". Can/should we 
build this API in a forward looking manner?



108: Why do we need 
`StreamsClientAssignment#followupRebalanceDeadline()` -- wondering how 
this would be useful?



109 `StreamsClientState#consumers`: should we rename this to 
`#consumerClientIds()`?



110 (nit) `StreamsClientState#previousl[Active|Standby]Tasks`: JavaDoc 
says 'owned by consumers on this node' -- Should we just say `owned by 
the Streams client`?



111 `StreamsClientState#prevTasksByLag()`: it takes a `String consumer` 
parameter -- not clear what this is -- I guess it's a consumer's 
client.id? If yes, should we rename the parameter `consumerClientId`?



112 `ApplicationState`: what is the reason to have `allTasks()` and 
`stafefulTasks() -- why not have `statelessTasks()` and 
`statefulTasks()` instead? Or all three?



113 `ApplicationState#computeTaskLags()`: I understand the indent/reason 
why we have this one, but it seems to be somewhat difficult to use 
correctly, as it triggers an internal side-effect... Would it be 
possible to replace this method in favor of passing in a `boolean 
computeTaskLag` parameter into #streamClientState() instead, what might 
make it less error prone to use, as it seems the returned 
`StreamsClient` object would be modified when calling #computeTaskTags() 
and thus both are related to each other?



114 nit/typo: `ApplicationState#streamsClientStates()` returns 
`StreamsClientState` not `StreamsClient`.



115 `StreamsAssignorRetryableException`: not sure if I fully understand 
the purpose of this exception.



116 "No actual changes to functionality": allowing to plug in customer 
TaskAssignor sounds like adding new functionality. Can we rephrase this?




117: What happens if the returned assignment is "invalid" -- for 
example, a task might not have been assigned, or is assigned to two 
nodes? Or a standby is assigned to the same node as its active? Or a 
`StreamsClientAssigment` returns an unknown `ProcessId`? (Not sure if 
this list of potential issues is complete or not...)




-Matthias



On 4/18/24 2:05 AM, Bruno 

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-19 Thread Matthias J. Sax

Great KIP. I have some minor comments/questions:


100 The KIP says: "In the future, additional plugins can use the same 
partition.assignor  prefix". What does this mean?



101 (nit) The KIP says: "Note that the thread-level assignment will 
remain an un-configurable internal implementation detail of the 
partition assignor (see "Rejected Alternatives" for further thoughts and 
reasoning)." -- When I was reading this the first time, I did not 
understand it, and it did only become clear later (eg while reading the 
discussion thread). I think it would be good to be a little bit more 
explicit, because this is not just some minor thing, but a core design 
decision (which I, btw, support).



102 (nit/typo): taskAssignor -> TaskAssignor (somewhere in the text).


103 (nit): "new non-internal package" -> replace 'non-internal' with 
'public' :)



104: Method name `TaskAssignor#onAssignmentComputed()` -> the name seems 
to be a little bit clumsy? I kinda like the original `finalAssignment()` 
-- I would also be happy with `onFinalAssignment` to address Bruno's 
line of thinking (which I think is a good call out). (Btw: 
`finalAssignment` is still used in the text on the KIP and should also 
be updated.)



105: Please remove all `private` variables. We should only show public 
stuff on the KIP. Everything else is an implementation detail.



106: `TaskAssignment#numStreamsClients()` -- why do we need this method? 
Seems calling `assignment()` gives as a collection and we can just call 
size() on it to get the same value? -- Also, why do we explicitly call 
out the overwrite of `toString()`; seems unnecessary?



107 `StreamsClientState#numStreamThreads` JavaDocs says: "Returns the 
number of StreamThreads on this client, which is equal to the number of 
main consumers and represents its overall capacity." -- Given our 
planned thread refactoring, this might not hold correct for long (and I 
am sure we will forget to updated the JavaDocs later). Talking to Lucas 
the plan is to cut down `StreamsThread` to host the consumer (and there 
will be only one, and it won't be configurable any longer), and we would 
introduce a number of configurable "processing threads". Can/should we 
build this API in a forward looking manner?



108: Why do we need 
`StreamsClientAssignment#followupRebalanceDeadline()` -- wondering how 
this would be useful?



109 `StreamsClientState#consumers`: should we rename this to 
`#consumerClientIds()`?



110 (nit) `StreamsClientState#previousl[Active|Standby]Tasks`: JavaDoc 
says 'owned by consumers on this node' -- Should we just say `owned by 
the Streams client`?



111 `StreamsClientState#prevTasksByLag()`: it takes a `String consumer` 
parameter -- not clear what this is -- I guess it's a consumer's 
client.id? If yes, should we rename the parameter `consumerClientId`?



112 `ApplicationState`: what is the reason to have `allTasks()` and 
`stafefulTasks() -- why not have `statelessTasks()` and 
`statefulTasks()` instead? Or all three?



113 `ApplicationState#computeTaskLags()`: I understand the indent/reason 
why we have this one, but it seems to be somewhat difficult to use 
correctly, as it triggers an internal side-effect... Would it be 
possible to replace this method in favor of passing in a `boolean 
computeTaskLag` parameter into #streamClientState() instead, what might 
make it less error prone to use, as it seems the returned 
`StreamsClient` object would be modified when calling #computeTaskTags() 
and thus both are related to each other?



114 nit/typo: `ApplicationState#streamsClientStates()` returns 
`StreamsClientState` not `StreamsClient`.



115 `StreamsAssignorRetryableException`: not sure if I fully understand 
the purpose of this exception.



116 "No actual changes to functionality": allowing to plug in customer 
TaskAssignor sounds like adding new functionality. Can we rephrase this?




117: What happens if the returned assignment is "invalid" -- for 
example, a task might not have been assigned, or is assigned to two 
nodes? Or a standby is assigned to the same node as its active? Or a 
`StreamsClientAssigment` returns an unknown `ProcessId`? (Not sure if 
this list of potential issues is complete or not...)




-Matthias



On 4/18/24 2:05 AM, Bruno Cadonna wrote:

Hi Sophie,

Thanks for the clarifications!

(1)
What about replacing Node* with KafkaStreams* or StreamsClient*? I 
prefer KafkaStreams* since that class represents the Kafka Streams 
client. I am also fine with KafkaStreamsClient*. I really would like to 
avoid introducing a new term in Kafka Streams for which we already have 
an equivalent term even if it is used on the brokers since that is a 
different level of abstraction. Additionally, I have never been a big 
fan of the term "instance".


(4)
I think the question is if we need to retrieve assignment metadata by 
task for a Kafka client or if it is enough to iterate over the assigned 
tasks. Could you explain why we cannot add 

[jira] [Resolved] (KAFKA-16486) Integrate metric measurability changes in metrics collector

2024-04-19 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16486.
-
Fix Version/s: 3.8.0
   Resolution: Done

> Integrate metric measurability changes in metrics collector
> ---
>
> Key: KAFKA-16486
> URL: https://issues.apache.org/jira/browse/KAFKA-16486
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16486) Integrate metric measurability changes in metrics collector

2024-04-19 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16486.
-
Fix Version/s: 3.8.0
   Resolution: Done

> Integrate metric measurability changes in metrics collector
> ---
>
> Key: KAFKA-16486
> URL: https://issues.apache.org/jira/browse/KAFKA-16486
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16586) Test TaskAssignorConvergenceTest failing

2024-04-18 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16586:
---

 Summary: Test TaskAssignorConvergenceTest failing
 Key: KAFKA-16586
 URL: https://issues.apache.org/jira/browse/KAFKA-16586
 Project: Kafka
  Issue Type: Test
  Components: streams, unit tests
Reporter: Matthias J. Sax


{code:java}
java.lang.AssertionError: Assertion failed in randomized test. Reproduce with: 
`runRandomizedScenario(-538095696758490522)`.at 
org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.runRandomizedScenario(TaskAssignorConvergenceTest.java:545)
  at 
org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.randomClusterPerturbationsShouldConverge(TaskAssignorConvergenceTest.java:479){code}
This might expose an actual bug (or incorrect test setup) and should be 
reproducible (die not try it myself yet).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16586) Test TaskAssignorConvergenceTest failing

2024-04-18 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16586:
---

 Summary: Test TaskAssignorConvergenceTest failing
 Key: KAFKA-16586
 URL: https://issues.apache.org/jira/browse/KAFKA-16586
 Project: Kafka
  Issue Type: Test
  Components: streams, unit tests
Reporter: Matthias J. Sax


{code:java}
java.lang.AssertionError: Assertion failed in randomized test. Reproduce with: 
`runRandomizedScenario(-538095696758490522)`.at 
org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.runRandomizedScenario(TaskAssignorConvergenceTest.java:545)
  at 
org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.randomClusterPerturbationsShouldConverge(TaskAssignorConvergenceTest.java:479){code}
This might expose an actual bug (or incorrect test setup) and should be 
reproducible (die not try it myself yet).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838829#comment-17838829
 ] 

Matthias J. Sax commented on KAFKA-16514:
-

Thanks for the input. I was not reviewing/voting the original KIP nor the PR. 
Thus, I did just assume there was some mentioning about static groups... As 
there is nothing about it in the KIP as you pointed out, I did some digging and 
the PR reveals why it's only implemented for static members: 
[https://github.com/apache/kafka/pull/12035#discussion_r858263213]

We use admin client "removeMembersFromConsumerGroup" which only works for 
static member, as it take the consumers `group.instance.id` as input. It seems 
it was a pragmatic approach... Re-reading the KIP discussion it seems that 
making it work for regular members would require a change in the consumer API, 
and thus would have been a larger scope KIP (and the idea was to keep the KIP 
scope limited).

Thus, while we might not need a KIP for Kafka Streams, we would need one for 
the consumer to allow KS to use this newly added API... In the mean time, we 
could still to a small PR to update the JavaDocs to call out the current 
limitation (what will not make it a public contract IMHO, so after we get a 
consumer KIP we can still address this limitation w/o another KIP).

Thoughts?

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16573) Streams does not specify where a Serde is needed

2024-04-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16573:

Priority: Minor  (was: Major)

> Streams does not specify where a Serde is needed
> 
>
> Key: KAFKA-16573
> URL: https://issues.apache.org/jira/browse/KAFKA-16573
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Ayoub Omari
>Priority: Minor
>
> Example topology:
> {code:java}
>  builder
>.table("input", Consumed.`with`(Serdes.String(), Serdes.String()))
>.groupBy((key, value) => new KeyValue(value, key))
>.count()
>.toStream()
>.to("output", Produced.`with`(Serdes.String(), Serdes.Long()))
>  {code}
> At runtime, we get the following exception 
> {code:java}
> Please specify a key serde or set one through 
> StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
> org.apache.kafka.common.config.ConfigException: Please specify a key serde or 
> set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
>     at 
> org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1857)
>     at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:92)
>     at 
> org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47)
>     at 
> org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63)
>     at 
> org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde(WrappingNullableUtils.java:90)
>     at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.initStoreSerde(MeteredKeyValueStore.java:188)
>     at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:143)
>     at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
>     at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258){code}
> The error does not give information about the line or the processor causing 
> the issue.
> Here a Grouped was missing inside the groupBy, but because the groupBy api 
> doesn't force to define Grouped, this one can be missed, and it could be 
> difficult to spot on a more complex topology. 
> Also, for someone who needs control over serdes in the topology and doesn't 
> want to define default serdes.
>  
>   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16573) Streams does not specify where a Serde is needed

2024-04-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838827#comment-17838827
 ] 

Matthias J. Sax commented on KAFKA-16573:
-

Thanks for filing this ticket. I think your idea is good; it's for sure an 
improvement over the current state.

> Streams does not specify where a Serde is needed
> 
>
> Key: KAFKA-16573
> URL: https://issues.apache.org/jira/browse/KAFKA-16573
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Ayoub Omari
>Priority: Major
>
> Example topology:
> {code:java}
>  builder
>.table("input", Consumed.`with`(Serdes.String(), Serdes.String()))
>.groupBy((key, value) => new KeyValue(value, key))
>.count()
>.toStream()
>.to("output", Produced.`with`(Serdes.String(), Serdes.Long()))
>  {code}
> At runtime, we get the following exception 
> {code:java}
> Please specify a key serde or set one through 
> StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
> org.apache.kafka.common.config.ConfigException: Please specify a key serde or 
> set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
>     at 
> org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1857)
>     at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:92)
>     at 
> org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47)
>     at 
> org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63)
>     at 
> org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde(WrappingNullableUtils.java:90)
>     at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.initStoreSerde(MeteredKeyValueStore.java:188)
>     at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:143)
>     at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
>     at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258){code}
> The error does not give information about the line or the processor causing 
> the issue.
> Here a Grouped was missing inside the groupBy, but because the groupBy api 
> doesn't force to define Grouped, this one can be missed, and it could be 
> difficult to spot on a more complex topology. 
> Also, for someone who needs control over serdes in the topology and doesn't 
> want to define default serdes.
>  
>   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16280) Expose method to determine Metric Measurability

2024-04-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16280.
-
Resolution: Done

> Expose method to determine Metric Measurability
> ---
>
> Key: KAFKA-16280
> URL: https://issues.apache.org/jira/browse/KAFKA-16280
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 3.8.0
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
>  Labels: kip
> Fix For: 3.8.0
>
>
> The Jira is to track the development of KIP-1019: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1019%3A+Expose+method+to+determine+Metric+Measurability]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16280) Expose method to determine Metric Measurability

2024-04-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16280:

Issue Type: Improvement  (was: Bug)

> Expose method to determine Metric Measurability
> ---
>
> Key: KAFKA-16280
> URL: https://issues.apache.org/jira/browse/KAFKA-16280
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics
>Affects Versions: 3.8.0
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
>  Labels: kip
> Fix For: 3.8.0
>
>
> The Jira is to track the development of KIP-1019: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1019%3A+Expose+method+to+determine+Metric+Measurability]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16280) Expose method to determine Metric Measurability

2024-04-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16280.
-
Resolution: Done

> Expose method to determine Metric Measurability
> ---
>
> Key: KAFKA-16280
> URL: https://issues.apache.org/jira/browse/KAFKA-16280
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 3.8.0
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
>  Labels: kip
> Fix For: 3.8.0
>
>
> The Jira is to track the development of KIP-1019: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1019%3A+Expose+method+to+determine+Metric+Measurability]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1038: Add Custom Error Handler to Producer

2024-04-18 Thread Matthias J. Sax

Thanks for the KIP Alieh! It addresses an important case for error handling.

I agree that using this handler would be an expert API, as mentioned by 
a few people. But I don't think it would be a reason to not add it. It's 
always a tricky tradeoff what to expose to users and to avoid foot guns, 
but we added similar handlers to Kafka Streams, and have good experience 
with it. Hence, I understand, but don't share the concern raised.


I also agree that there is some responsibility by the user to understand 
how such a handler should be implemented to not drop data by accident. 
But it seem unavoidable and acceptable.


While I understand that a "simpler / reduced" API (eg via configs) might 
also work, I personally prefer a full handler. Configs have the same 
issue that they could be miss-used potentially leading to incorrectly 
dropped data, but at the same time are less flexible (and thus maybe 
ever harder to use correctly...?). Base on my experience, there is also 
often weird corner case for which it make sense to also drop records for 
other exceptions, and a full handler has the advantage of full 
flexibility and "absolute power!".


To be fair: I don't know the exact code paths of the producer in 
details, so please keep me honest. But my understanding is, that the KIP 
aims to allow users to react to internal exception, and decide to keep 
retrying internally, swallow the error and drop the record, or raise the 
error?


Maybe the KIP would need to be a little bit more precises what error we 
want to cover -- I don't think this list must be exhaustive, as we can 
always do follow up KIP to also apply the handler to other errors to 
expand the scope of the handler. The KIP does mention examples, but it 
might be good to explicitly state for what cases the handler gets applied?


I am also not sure if CONTINUE and FAIL are enough options? Don't we 
need three options? Or would `CONTINUE` have different meaning depending 
on the type of error? Ie, for a retryable error `CONTINUE` would mean 
keep retrying internally, but for a non-retryable error `CONTINUE` means 
swallow the error and drop the record? This semantic overload seems 
tricky to reason about by users, so it might better to split `CONTINUE` 
into two cases -> `RETRY` and `SWALLOW` (or some better names).


Additionally, should we just ship a `DefaultClientExceptionHandler` 
which would return `FAIL`, for backward compatibility. Or don't have any 
default handler to begin with and allow it to be `null`? I don't see the 
need for a specific `TransactionExceptionHandler`. To me, the goal 
should be to not modify the default behavior at all, but to just allow 
users to change the default behavior if there is a need.


What is missing on the KIP though it, how the handler is passed into the 
producer thought? Would we need a new config which allows to set a 
custom handler? And/or would we allow to pass in an instance via the 
constructor or add a new method to set a handler?



-Matthias

On 4/18/24 10:02 AM, Andrew Schofield wrote:

Hi Alieh,
Thanks for the KIP.

Exception handling in the Kafka producer and consumer is really not ideal.
It’s even harder working out what’s going on with the consumer.

I’m a bit nervous about this KIP and I agree with Chris that it could do with 
additional
motivation. This would be an expert-level interface given how complicated
the exception handling for Kafka has become.

7. The application is not really aware of the batching being done on its behalf.
The ProduceResponse can actually return an array of records which failed
per batch. If you get RecordTooLargeException, and want to retry, you probably
need to remove the offending records from the batch and retry it. This is 
getting fiddly.

8. There is already o.a.k.clients.producer.Callback. I wonder whether an
alternative might be to add a method to the existing Callback interface, such 
as:

   ClientExceptionResponse onException(Exception exception)

It would be called when a ProduceResponse contains an error, but the
producer is going to retry. It tells the producer whether to go ahead with the 
retry
or not. The default implementation would be to CONTINUE, because that’s
just continuing to retry as planned. Note that this is a per-record callback, so
the application would be able to understand which records failed.

By using an existing interface, we already know how to configure it and we know
about the threading model for calling it.


Thanks,
Andrew




On 17 Apr 2024, at 18:17, Chris Egerton  wrote:

Hi Alieh,

Thanks for the KIP! The issue with writing to non-existent topics is
particularly frustrating for users of Kafka Connect and has been the source
of a handful of Jira tickets over the years. My thoughts:

1. An additional detail we can add to the motivation (or possibly rejected
alternatives) section is that this kind of custom retry logic can't be
implemented by hand by, e.g., setting retries to 0 in the producer config
and handling 

Re: [DISCUSS] KIP-1036: Extend RecordDeserializationException exception

2024-04-18 Thread Matthias J. Sax
Andrew, thanks for the details about Consumer internals. That's super 
useful for this discussion! -- And it confirms my understanding.


I don't think we want to use ConsumerRecord type thought, 
because for a DLQ the handler wants to write the message into some DLQ 
topic, and thus needs the key and value, so only 
`ConsumerRecord` would work (or maybe `ByteBuffer>`).


While I would be ok with using `ConsumerRecord`, I don't see a huge 
advantage compared to passing in all fields we are interested in 
one-by-one. In the end, if the data is written into a DLQ topic, the 
`ConsumerRecord` object cannot be reused (but the handler will build a 
`ProducerRecord`), and `ConsumerRecord` would "just" be a container -- I 
don't think it would simplify user-code or provide any other benefit, 
but just add an (unnecessary?) level wrapping/indirection?


The only advantage I would see, is for the case that new interesting 
metadata fields get added to the message format -- for this case, using 
`ConsumerRecord` would automatically include these new fields, and we 
don't need to modify the exception class to add them explicitly. But as 
this happens very rarely, it does not seem to provide a huge benefit.


In the end, I would be fine either way. Curious to hear what others think.


-Matthias



On 4/18/24 8:41 AM, Andrew Schofield wrote:

Hi,
Thanks for the KIP. I think it’s an interesting idea and it seems to work 
nicely with how
the clients work today.

Recently, I’ve been digging in to the record deserialization code in the 
consumer as
part of implementing KIP-932. It’s pretty nasty in there.

There are essentially two kinds of problems that can be encountered when
converting the response from a Fetch RPC into ConsumerRecords.

1) The batch might be corrupt, so you can’t even work out what records are in 
there.
2) Individual records cannot be deserialized.

In the second case, the consumer accumulates records from a Fetch response until
it hits a record which it cannot deserialize. If it has any parsed records 
already to return
to the application, it parks the RecordDeserializationException and returns the 
records
so far. Then, on the next poll, because there are no parsed records waiting, it 
throws
the RecordDeserializationException. And so it continues until all fetched 
records are
processed.

I don’t really think of org.apache.kafka.common.record.Record as an external 
interface.
I think it’s a bit close to the wire to be putting on a user-facing interface.

We do know almost everything about the bad record at the point where the
deserialization fails. I wonder whether actually we could use
ConsumerRecord or even ConsumerRecord :)
in the constructor for the RecordDeserializationException. I don’t really like 
having
a long list of each of the individual items of the record’s parts like 
timestamp and so on.
It’s nicer to have an interface to the record that we can evolve without having 
to change
the constructor for this exception.

Thanks,
Andrew


On 18 Apr 2024, at 15:13, Frédérik Rouleau  
wrote:

Hi,

But I guess my main question is really about what metadata we really

want to add to `RecordDeserializationException`? `Record` expose all
kind of internal (serialization) metadata like `keySize()`,
`valueSize()` and many more. For the DLQ use-case it seems we don't
really want any of these? So I am wondering if just adding
key/value/ts/headers would be sufficient?



I think that key/value/ts/headers, topicPartition and offset are all we
need. I do not see any usage for other metadata. If someone has a use case,
I would like to know it.

So in that case we can directly add the data into the exception. We can
keep ByteBuffer for the local field instead of byte[], that will avoid
memory allocation if users do not require it.
I wonder if we should return the ByteBuffer or directly the byte[] (or both
?) which is more convenient for end users. Any thoughts?
Then we can have something like:

public RecordDeserializationException(TopicPartition partition,
 long offset,
 ByteBuffer key,
 ByteBuffer value,
 Header[] headers,
 long timestamp,
 String message,
 Throwable cause);

public TopicPartition topicPartition();

public long offset();

public long timestamp();

public byte[] key(); // Will allocate the array on call

public byte[] value(); // Will allocate the array on call

public Header[] headers();



Regards,
Fred




Re: Streams group final result: EmitStrategy vs Suppressed

2024-04-18 Thread Matthias J. Sax
The main difference is the internal implementation. Semantically, both 
are equivalent.


suppress() uses an in-memory buffer, while `emitStrategy()` does not, 
but modifies the upstream aggregation operator impl, and waits to send 
results downstream, and thus, it's RocksDB based.



-Matthias


On 4/12/24 10:37 AM, Ayoub wrote:

Hello,

*[Not sure if my email went through as I was not subscribed to this mailing
list. Here is my original email]*

I found that there are two ways to send only the final result of a windowed
groupBy, either using Suppressed
.untilWindowCloses
on the final KTable or EmitStrategy

on
the windowed stream.

I tried to compare both but didn't find differences in the result they give.

Are there any differences apart from the moment they are defined within the
pipeline. And is there any preference on using one or the other ?

Thanks,
Ayoub


Le ven. 12 avr. 2024 à 11:50, Ayoub  a écrit :


Hello,

I found that there are two ways to send only the final result of a
windowed groupBy, either using Suppressed
.untilWindowCloses
on the final KTable or EmitStrategy

 on
the windowed stream.

I tried to compare both but didn't find differences in the result they
give.

Are there any differences apart from the moment they are defined within
the pipeline. And Is there any preference on using one or the other ?

Thanks,
Ayoub





Re: Is there any recommendation about header max size?

2024-04-18 Thread Matthias J. Sax
I don't think that there is any specific recommendation. However, there 
is an overall max-message-size config that you need to keep in mind.


-Matthias

On 4/16/24 9:42 AM, Gabriel Giussi wrote:

I have logic in my service to capture exceptions being thrown during
message processing and produce a new message to a different topic with
information about the error. The idea is to leave the message unmodified,
aka produce the exact same bytes to this new topic, therefore I'm planning
on adding the java exception as a header.
By looking at the documentation it is just an array of bytes and it doesn't
say anything about a max size but is there any recommendation about it?
https://kafka.apache.org/documentation/#recordheader



[jira] [Commented] (KAFKA-16585) No way to forward message from punctuation method in the FixedKeyProcessor

2024-04-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838730#comment-17838730
 ] 

Matthias J. Sax commented on KAFKA-16585:
-

Thanks for raising this ticket. Wondering how we could address it though... 
Given that the "contract" is that the record key is not modified, but there is 
no input record, how could the key be set in a meaningful way? – The only thing 
I can think of right now would be to set `key = null`, but it's still 
semantically questionable...

Can you provide more details what you are trying to do?

> No way to forward message from punctuation method in the FixedKeyProcessor
> --
>
> Key: KAFKA-16585
> URL: https://issues.apache.org/jira/browse/KAFKA-16585
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.2
>Reporter: Stanislav Spiridonov
>Priority: Major
>
> The FixedKeyProcessorContext can forward only FixedKeyRecord. This class 
> doesn't have a public constructor and can be created based on existing 
> records. But such record usually is absent in the punctuation method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16567) Add New Stream Metrics based on KIP-869

2024-04-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838728#comment-17838728
 ] 

Matthias J. Sax commented on KAFKA-16567:
-

Why is this ticket marked as "blocker" for 4.0 release?

> Add New Stream Metrics based on KIP-869
> ---
>
> Key: KAFKA-16567
> URL: https://issues.apache.org/jira/browse/KAFKA-16567
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Walter Hernandez
>Assignee: Walter Hernandez
>Priority: Blocker
>  Labels: kip
> Fix For: 4.0.0
>
>
> Add the following metrics to the state updater:
>  * restoring-active-tasks: count
>  * restoring-standby-tasks: count
>  * paused-active-tasks: count
>  * paused-standby-tasks: count
>  * idle-ratio: percentage
>  * restore-ratio: percentage
>  * checkpoint-ratio: percentage
>  * restore-records-total: count
>  * restore-records-rate: rate
>  * restore-call-rate: rate



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16567) Add New Stream Metrics based on KIP-869

2024-04-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16567:

Component/s: streams

> Add New Stream Metrics based on KIP-869
> ---
>
> Key: KAFKA-16567
> URL: https://issues.apache.org/jira/browse/KAFKA-16567
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Walter Hernandez
>Assignee: Walter Hernandez
>Priority: Blocker
> Fix For: 4.0.0
>
>
> Add the following metrics to the state updater:
>  * restoring-active-tasks: count
>  * restoring-standby-tasks: count
>  * paused-active-tasks: count
>  * paused-standby-tasks: count
>  * idle-ratio: percentage
>  * restore-ratio: percentage
>  * checkpoint-ratio: percentage
>  * restore-records-total: count
>  * restore-records-rate: rate
>  * restore-call-rate: rate



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16567) Add New Stream Metrics based on KIP-869

2024-04-18 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16567:

Labels: kip  (was: )

> Add New Stream Metrics based on KIP-869
> ---
>
> Key: KAFKA-16567
> URL: https://issues.apache.org/jira/browse/KAFKA-16567
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Walter Hernandez
>Assignee: Walter Hernandez
>Priority: Blocker
>  Labels: kip
> Fix For: 4.0.0
>
>
> Add the following metrics to the state updater:
>  * restoring-active-tasks: count
>  * restoring-standby-tasks: count
>  * paused-active-tasks: count
>  * paused-standby-tasks: count
>  * idle-ratio: percentage
>  * restore-ratio: percentage
>  * checkpoint-ratio: percentage
>  * restore-records-total: count
>  * restore-records-rate: rate
>  * restore-call-rate: rate



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16263) Add Kafka Streams docs about available listeners/callback

2024-04-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838727#comment-17838727
 ] 

Matthias J. Sax commented on KAFKA-16263:
-

Yes, these are the handlers this ticket refers do.

> Add Kafka Streams docs about available listeners/callback
> -
>
> Key: KAFKA-16263
> URL: https://issues.apache.org/jira/browse/KAFKA-16263
> Project: Kafka
>  Issue Type: Task
>  Components: docs, streams
>            Reporter: Matthias J. Sax
>Assignee: Kuan Po Tseng
>Priority: Minor
>  Labels: beginner, newbie
>
> Kafka Streams allows to register all kind of listeners and callback (eg, 
> uncaught-exception-handler, restore-listeners, etc) but those are not in the 
> documentation.
> A good place might be 
> [https://kafka.apache.org/documentation/streams/developer-guide/running-app.html]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16336) Remove Deprecated metric standby-process-ratio

2024-04-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838724#comment-17838724
 ] 

Matthias J. Sax commented on KAFKA-16336:
-

The next planned release is 3.8, but we can work on this ticket only for 4.0, 
as it's a breaking change that's only allowed for a major release. – So this 
ticket cannot be picked up yet.

> Remove Deprecated metric standby-process-ratio
> --
>
> Key: KAFKA-16336
> URL: https://issues.apache.org/jira/browse/KAFKA-16336
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>            Reporter: Matthias J. Sax
>Assignee: Walter Hernandez
>Priority: Blocker
> Fix For: 4.0.0
>
>
> Metric "standby-process-ratio" was deprecated in 3.5 release via 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-869%3A+Improve+Streams+State+Restoration+Visibility



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1036: Extend RecordDeserializationException exception

2024-04-18 Thread Matthias J. Sax

Hi,

I am actually not sure if using `Record` is really the right thing? 
While `Record` is technically public API, it does not seem to be 
intended to be exposed to end users?


But I guess my main question is really about what metadata we really 
want to add to `RecordDeserializationException`? `Record` expose all 
kind of internal (serialization) metadata like `keySize()`, 
`valueSize()` and many more. For the DLQ use-case it seems we don't 
really want any of these? So I am wondering if just adding 
key/value/ts/headers would be sufficient?


The motivation section of the KIP is somewhat spare about DLQ details, 
so it's hard to judge what is needed / useful and would would be a leaky 
abstraction?


About "when we cannot construct a `ConsumerRecord` -- I was not really 
digging into it until know, and was just following Kirks comment 
blindly. But looking into the code, we would only not be able to 
construct a `CosumerRecord` when either key or value deserialization 
fails? But as we would pass in byte[] type it would not matter. -- Kirk 
did also mention a corrupted batch, but it seems for this case we might 
not even hit the deserialization code path, but would error out earlier?


I was also looking into the build setup, and I think the idea of the 
import control is to have some sanity check about import dependencies. I 
currently don't see why we should not add an allow rule for Record.


But if we decide to not pass in Record/ConsumerRecord both questions are 
void anyway. Of course, for this case, we would need to add a getter 
method for each metadata field we add (but I think that would be totally 
ok?)


I also seen know, that the old constructor is deprecated, and thus, I 
think using `Optional` a return type is not required (already reflected 
on the wiki page).


Bottom line seems to be: the motivation about what metadata is needed 
for the DLQ use-case is not described in much detail and thus it's hard 
to judge what the right design might be?


The wiki account thing is unfortunately nothing we can fix on our side. 
We did file a ticket with INFRA team, but need to wait for them to 
address it... In the meantime, if you can provide the missing 
information, and what you want to get edited, I can help to update the 
wiki page accordingly.



-Matthias

On 4/16/24 11:18 AM, Sophie Blee-Goldman wrote:

Also ignore everything I said about Streams earlier. I didn't look closely
enough on my first pass over the KIP and thought this was changing the
DeserializationExceptionHandler in Streams. I see now that this is
actually about the consumer client's DeserializationException so everything
I said about using a ByteArray Deserialization and Kafka Streams doesn't
apply here. The important thing is just that it still deserializes one
record
at a time, and essentially throws this when it fails to convert the Record
type into a ConsumerRecord type. So there's always only one record
at a type to consider.

Sorry for any confusion I caused

On Tue, Apr 16, 2024 at 11:15 AM Sophie Blee-Goldman 
wrote:


Ah, thanks for the additional context. I should have looked at the code
before I opened my mouth (so to speak)

In that case, I fully agree that using Record instead of ConsumerRecord
makes sense. It does indeed seem like by definition, if there is a
DeserializationException then there is no ConsumerRecord since this
is where/how it gets thrown:

try {
...
return new ConsumerRecord<>(...);
} catch (RuntimeException e) {
...
throw new RecordDeserializationException(...);
}

As you mentioned the Record is an input to the method so we definitely have
one of those, and imo, it makes sense to use. As far as I can tell it's
just
a regular public interface so exposing it shouldn't be an issue just based
on
the class itself. But I'm still a bit concerned about the checkstyle
complaint.

I'll try to find someone who can explain why or if we should avoid
returning
a Record type here. Other than that, I'd say the KIP LGTM as-is and we
could kick off voting

On Tue, Apr 16, 2024 at 10:47 AM Frédérik Rouleau
 wrote:


Thanks Sophie,

I can write something in the KIP on how KStreams solves that issue, but as
I can't create a Wiki account, I will have to find someone to do this on
my
behalf (if someone can work on solving that wiki account creation, it
would
be great).

The biggest difference between Record and ConsumerRecord is that data are
stored respectively using ByteBuffer and Byte array.

For the Record option, the object already exists in the parsing method, so
it's roughly just a parameter type change in the Exception. The point is
just about exposing the Record class externally. By the way, the name
Record is also making some IDE a bit crazy by confusing it with the new
Java Record feature. An alternative could be to create another wrapper
type
of just include key and value ByteBuffer in the
RecordDeserializationException itself.

For the ConsumerRecord option, it requires to allocate Byte arrays, even

Re: [ANNOUNCE] New Kafka PMC Member: Greg Harris

2024-04-18 Thread Matthias J. Sax

Congrats Greg!

On 4/15/24 10:44 AM, Hector Geraldino (BLOOMBERG/ 919 3RD A) wrote:

Congrats! Well deserved

From: d...@kafka.apache.org At: 04/13/24 14:42:22 UTC-4:00To:  
d...@kafka.apache.org
Subject: [ANNOUNCE] New Kafka PMC Member: Greg Harris

Hi all,

Greg Harris has been a Kafka committer since July 2023. He has remained
very active and instructive in the community since becoming a committer.
It's my pleasure to announce that Greg is now a member of Kafka PMC.

Congratulations, Greg!

Chris, on behalf of the Apache Kafka PMC




Re: [ANNOUNCE] New Kafka PMC Member: Greg Harris

2024-04-18 Thread Matthias J. Sax

Congrats Greg!

On 4/15/24 10:44 AM, Hector Geraldino (BLOOMBERG/ 919 3RD A) wrote:

Congrats! Well deserved

From: dev@kafka.apache.org At: 04/13/24 14:42:22 UTC-4:00To:  
dev@kafka.apache.org
Subject: [ANNOUNCE] New Kafka PMC Member: Greg Harris

Hi all,

Greg Harris has been a Kafka committer since July 2023. He has remained
very active and instructive in the community since becoming a committer.
It's my pleasure to announce that Greg is now a member of Kafka PMC.

Congratulations, Greg!

Chris, on behalf of the Apache Kafka PMC




[jira] [Created] (KAFKA-16575) Automatically remove KTable aggregation result when group becomes empty

2024-04-17 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16575:
---

 Summary: Automatically remove KTable aggregation result when group 
becomes empty
 Key: KAFKA-16575
 URL: https://issues.apache.org/jira/browse/KAFKA-16575
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


Using `KTable.groupBy(...).aggregate(...)` can handle updates (inserts, 
deletes, actual updates) of the input KTable, by calling the provided `Adder` 
and `Subtractor`. However, when all records from the input table (which map to 
the same group/row in the result table) get removed, the result entry is not 
removed automatically.

For example, if we implement a "count", the count would go to zero for a group 
by default, instead of removing the row from the result, if all input record 
for this group got deleted.

Users can let their `Subtractor` return `null` for this case, to actually 
delete the row, but it's not well documented and it seems it should be a 
built-in feature of the table-aggregation to remove "empty groups" from the 
result, instead of relying on "correct" behavior of user-code.

(Also the built-in `count()` does not return `null`, but actually zero...)

An internal counter how many elements are in a group should be sufficient. Of 
course, there is backward compatibility questions we need to answer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16575) Automatically remove KTable aggregation result when group becomes empty

2024-04-17 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16575:
---

 Summary: Automatically remove KTable aggregation result when group 
becomes empty
 Key: KAFKA-16575
 URL: https://issues.apache.org/jira/browse/KAFKA-16575
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


Using `KTable.groupBy(...).aggregate(...)` can handle updates (inserts, 
deletes, actual updates) of the input KTable, by calling the provided `Adder` 
and `Subtractor`. However, when all records from the input table (which map to 
the same group/row in the result table) get removed, the result entry is not 
removed automatically.

For example, if we implement a "count", the count would go to zero for a group 
by default, instead of removing the row from the result, if all input record 
for this group got deleted.

Users can let their `Subtractor` return `null` for this case, to actually 
delete the row, but it's not well documented and it seems it should be a 
built-in feature of the table-aggregation to remove "empty groups" from the 
result, instead of relying on "correct" behavior of user-code.

(Also the built-in `count()` does not return `null`, but actually zero...)

An internal counter how many elements are in a group should be sufficient. Of 
course, there is backward compatibility questions we need to answer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-12 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836671#comment-17836671
 ] 

Matthias J. Sax commented on KAFKA-16514:
-

CloseOption was introduced via 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group]

The reasoning about the design should be on the KIP and corresponding DISCUSS 
thread.

I agree that the JavaDocs are missing a lot of information. Would you be 
interested to do a PR to improve the JavaDocs?

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-10 Thread Matthias J. Sax

Thanks for the KIP. Great discussion.

I am not sure if I understand the proposal from Bruno to hand in the 
processor node id? Isn't this internal (could not even find it quickly). 
We do have a processor name, right? Or do I mix up something?


Another question is about `ProcessingContext` -- it contains a lot of 
(potentially irrelevant?) metadata. We should think carefully about what 
we want to pass in and what not -- removing stuff is hard, but adding 
stuff is easy. It's always an option to create a new interface that only 
exposes stuff we find useful, and allows us to evolve this interface 
independent of others. Re-using an existing interface always has the 
danger to introduce an undesired coupling that could bite us in the 
future. -- It make total sense to pass in `RecordMetadata`, but 
`ProcessingContext` (even if already limited compared to 
`ProcessorContext`) still seems to be too broad? For example, there is 
`getStateStore()` and `schedule()` methods which I think we should not 
expose.


The other interesting question is about "what record gets passed in". 
For the PAPI, passing in the Processor's input record make a lot of 
sense. However, for DSL operators, I am not 100% sure? The DSL often 
uses internal types not exposed to the user, and thus I am not sure if 
users could write useful code for this case? -- In general, I still 
agree that the handler should be implement with a try-catch around 
`Processor.process()` but it might not be too useful for DSL processor. 
Hence, I am wondering if we need to so something more in the DSL? I 
don't have a concrete proposal (a few high level ideas only) and if we 
don't do anything special for the DSL I am ok with moving forward with 
this KIP as-is, but we should be aware of potential limitations for DSL 
users. We can always do a follow up KIP to close gaps when we understand 
the impact better -- covering the DSL would also expand the scope of 
this KIP significantly...


About the metric: just to double check. Do we think it's worth to add a 
new metric? Or could we re-use the existing "dropped record metric"?




-Matthias


On 4/10/24 5:11 AM, Sebastien Viale wrote:

Hi,

You are right, it will simplify types.

We update the KIP

regards

Sébastien *VIALE***

*MICHELIN GROUP* - InfORMATION Technology
*Technical Expert Kafka*

  Carmes / Bâtiment A17 4e / 63100 Clermont-Ferrand


*De :* Bruno Cadonna 
*Envoyé :* mercredi 10 avril 2024 10:38
*À :* dev@kafka.apache.org 
*Objet :* [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception 
handler for exceptions occuring during processing
Warning External sender Do not click on any links or open any 
attachments unless you trust the sender and know the content is safe.


Hi Loïc, Damien, and Sébastien,

Great that we are converging!


3.
Damien and Loïc, I think in your examples the handler will receive
Record because an Record is passed to
the processor in the following code line:
https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152
 


I see that we do not need to pass into the the handler a Record just because we do that for the DeserializationExceptionHandler
and the ProductionExceptionHandler. When those two handlers are called,
the record is already serialized. This is not the case for the
ProcessingExceptionHandler. However, I would propose to use Record
for the record that is passed to the ProcessingExceptionHandler because
it makes the handler API more flexible.


Best,
Bruno

This email was screened for spam and malicious content but exercise 
caution anyway.





On 4/9/24 9:09 PM, Loic Greffier wrote:
 > Hi Bruno and Bill,
 >
 > To complete the Damien's purposes about the point 3.
 >
 > Processing errors are caught and handled by the 
ProcessingErrorHandler, at the precise moment when records are processed 
by processor nodes. The handling will be performed in the "process" 
method of the ProcessorNode, such as:

 >
 > public void process(final Record record) {
 > ...
 >
 > try {
 > ...
 > } catch (final ClassCastException e) {
 > ...
 > } catch (Exception e) {
 > ProcessingExceptionHandler.ProcessingHandlerResponse response = 
this.processingExceptionHandler

 > .handle(internalProcessorContext, (Record) record, e);
 >
 > if (response == 
ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
 > throw new StreamsException("Processing exception handler is set to 
fail upon" +

 > " a processing error. If you would rather have the streaming pipeline" +
 > " continue after a processing error, please set the " +
 > DEFAULT_PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.",
 > e);
 > }
 > }
 > }
 > As you can 

Re: [DISCUSS] KIP-1036: Extend RecordDeserializationException exception

2024-04-10 Thread Matthias J. Sax

Thanks for the KIP Fred.

Couple of nits: it's not clear from the "Public API" section what is new 
and what is existing API w/o going back to the code. For existing 
methods which are not changed, it's also best to actually omit them. -- 
It would also be best to only put the interface itself down, but not the 
implementation (ie, no private members and no method body).


Thus, it might be better to do something like this:

+

public class RecordDeserializationException extends SerializationException {

   // newly added
   public RecordDeserializationException(TopicPartition partition,
 ConsumerRecord 
record,

 String message,
 Throwable cause);

   public ConsumerRecord getConsumerRecord();
}

+

From the description it's not clear to me if you propose to change the 
existing constructor, or propose to add a new constructor. From a 
compatibility POV, we cannot really change the existing constructor (but 
we could deprecate it and remove in the future (and add a new one in 
parallel). But I also agree with Kirk that there could be cases for 
which we cannot pass in a `ConsumerRecord` and thus keeping the old 
constructor could make sense (and change the new getter to return an 
`Optinal`).


Another small thing: in Kafka, getter methods are not using a `get` 
prefix, and thus it should be `consumerRecord()` w/o the "get".




-Matthias


On 4/10/24 4:21 PM, Kirk True wrote:

Hi Fred,

Thanks for the KIP!

Questions/comments:

How do we handle the case where CompletedFetch.parseRecord isn’t able to 
construct a well-formed ConsumerRecord (i.e. the values it needs are 
missing/corrupted/etc.)?
Please change RecordDeserializationException’s getConsumerRecord() method to be 
named consumerRecord() to be consistent.
Should we change the return type of consumerRecord() to be 
Optional> in the cases where even a “raw” 
ConsumerRecord can’t be created?
To avoid the above, does it make sense to include a Record object instead of a 
ConsumerRecord? The former doesn’t include the leaderEpoch or TimestampType, 
but maybe that’s OK?

Thanks,
Kirk


On Apr 10, 2024, at 8:47 AM, Frédérik Rouleau  
wrote:

Hi everyone,

To make implementation of DLQ in consumer easier, I would like to add the
raw ConsumerRecord into the RecordDeserializationException.

Details are in KIP-1036

.

Thanks for your feedback.

Regards,
Fred





[jira] [Created] (KAFKA-16508) Infinte loop if output topic does not exisit

2024-04-10 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16508:
---

 Summary: Infinte loop if output topic does not exisit
 Key: KAFKA-16508
 URL: https://issues.apache.org/jira/browse/KAFKA-16508
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


Kafka Streams supports `ProductionExceptionHandler` to drop records on error 
when writing into an output topic.

However, if the output topic does not exist, the corresponding error cannot be 
skipped over because the handler is not called.

The issue is, that the producer internally retires to fetch the output topic 
metadata until it times out, an a `TimeoutException` (which is a 
`RetriableException`) is returned via the registered `Callback`. However, for 
`RetriableException` there is different code path and the 
`ProductionExceptionHandler` is not called.

In general, Kafka Streams correctly tries to handle as many errors a possible 
internally, and a `RetriableError` falls into this category (and thus there is 
no need to call the handler). However, for this particular case, just retrying 
does not solve the issue – it's unclear if throwing a retryable 
`TimeoutException` is actually the right thing to do for the Producer? Also not 
sure what the right way to address this ticket would be (currently, we cannot 
really detect this case, except if we would do some nasty error message String 
comparison what sounds hacky...)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16508) Infinte loop if output topic does not exisit

2024-04-10 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16508:
---

 Summary: Infinte loop if output topic does not exisit
 Key: KAFKA-16508
 URL: https://issues.apache.org/jira/browse/KAFKA-16508
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


Kafka Streams supports `ProductionExceptionHandler` to drop records on error 
when writing into an output topic.

However, if the output topic does not exist, the corresponding error cannot be 
skipped over because the handler is not called.

The issue is, that the producer internally retires to fetch the output topic 
metadata until it times out, an a `TimeoutException` (which is a 
`RetriableException`) is returned via the registered `Callback`. However, for 
`RetriableException` there is different code path and the 
`ProductionExceptionHandler` is not called.

In general, Kafka Streams correctly tries to handle as many errors a possible 
internally, and a `RetriableError` falls into this category (and thus there is 
no need to call the handler). However, for this particular case, just retrying 
does not solve the issue – it's unclear if throwing a retryable 
`TimeoutException` is actually the right thing to do for the Producer? Also not 
sure what the right way to address this ticket would be (currently, we cannot 
really detect this case, except if we would do some nasty error message String 
comparison what sounds hacky...)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16505) KIP-1034: Dead letter queue in Kafka Streams

2024-04-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16505:

Component/s: streams

> KIP-1034: Dead letter queue in Kafka Streams
> 
>
> Key: KAFKA-16505
> URL: https://issues.apache.org/jira/browse/KAFKA-16505
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Damien Gasparina
>Priority: Major
>
> See KIP: KIP-1034: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16505) KIP-1034: Dead letter queue in Kafka Streams

2024-04-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16505:

Labels: KIP  (was: )

> KIP-1034: Dead letter queue in Kafka Streams
> 
>
> Key: KAFKA-16505
> URL: https://issues.apache.org/jira/browse/KAFKA-16505
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Damien Gasparina
>Priority: Major
>  Labels: KIP
>
> See KIP: KIP-1034: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16502) Fix flaky EOSUncleanShutdownIntegrationTest#shouldWorkWithUncleanShutdownWipeOutStateStore

2024-04-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16502:

Component/s: streams
 unit tests

> Fix flaky 
> EOSUncleanShutdownIntegrationTest#shouldWorkWithUncleanShutdownWipeOutStateStore
> --
>
> Key: KAFKA-16502
> URL: https://issues.apache.org/jira/browse/KAFKA-16502
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Chia-Ping Tsai
>Priority: Major
>
> org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
> Expected ERROR state but driver is on RUNNING ==> expected:  but was: 
> 
>   at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>   at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>   at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
>   at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
>   at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)
>   at 
> app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:396)
>   at 
> app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:444)
>   at 
> app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:393)
>   at 
> app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377)
>   at 
> app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:350)
>   at 
> app//org.apache.kafka.streams.integration.EOSUncleanShutdownIntegrationTest.shouldWorkWithUncleanShutdownWipeOutStateStore(EOSUncleanShutdownIntegrationTest.java:169)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>  Method)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> app//org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
>   at 
> app//org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
>   at 
> java.base@11.0.16.1/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16502) Fix flaky EOSUncleanShutdownIntegrationTest#shouldWorkWithUncleanShutdownWipeOutStateStore

2024-04-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16502:

Labels: flaky-test  (was: )

> Fix flaky 
> EOSUncleanShutdownIntegrationTest#shouldWorkWithUncleanShutdownWipeOutStateStore
> --
>
> Key: KAFKA-16502
> URL: https://issues.apache.org/jira/browse/KAFKA-16502
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Chia-Ping Tsai
>Priority: Major
>  Labels: flaky-test
>
> org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
> Expected ERROR state but driver is on RUNNING ==> expected:  but was: 
> 
>   at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>   at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>   at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
>   at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
>   at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)
>   at 
> app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:396)
>   at 
> app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:444)
>   at 
> app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:393)
>   at 
> app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377)
>   at 
> app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:350)
>   at 
> app//org.apache.kafka.streams.integration.EOSUncleanShutdownIntegrationTest.shouldWorkWithUncleanShutdownWipeOutStateStore(EOSUncleanShutdownIntegrationTest.java:169)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>  Method)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> app//org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
>   at 
> app//org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
>   at 
> java.base@11.0.16.1/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16478) Links for Kafka 3.5.2 release are broken

2024-04-05 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834456#comment-17834456
 ] 

Matthias J. Sax commented on KAFKA-16478:
-

\cc [~mimaison] – seems you remove some older release from the download path 
(3.4.1, 3.5.0, 3.5.1, and 3.6.0). Let's check all links for these releases to 
point to "archive".

> Links for Kafka 3.5.2 release are broken
> 
>
> Key: KAFKA-16478
> URL: https://issues.apache.org/jira/browse/KAFKA-16478
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Affects Versions: 3.5.2
>Reporter: Philipp Trulson
>Priority: Major
>
> While trying to update our setup, I noticed that the download links for the 
> 3.5.2 links are broken. They all point to a different host and also contain 
> an additional `/kafka` in their URL. Compare:
> not working:
> [https://downloads.apache.org/kafka/kafka/3.5.2/RELEASE_NOTES.html]
> working:
> [https://archive.apache.org/dist/kafka/3.5.1/RELEASE_NOTES.html]
> [https://downloads.apache.org/kafka/3.6.2/RELEASE_NOTES.html]
> This goes for all links in the release - archives, checksums, signatures.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (KAFKA-16478) Links for Kafka 3.5.2 release are broken

2024-04-05 Thread Matthias J. Sax (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-16478 ]


Matthias J. Sax deleted comment on KAFKA-16478:
-

was (Author: mjsax):
\cc [~omkreddy] [~stanislavkozlovski] as last RMs – did any of you move 
artifacts and forgot to update the links?

> Links for Kafka 3.5.2 release are broken
> 
>
> Key: KAFKA-16478
> URL: https://issues.apache.org/jira/browse/KAFKA-16478
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Affects Versions: 3.5.2
>Reporter: Philipp Trulson
>Priority: Major
>
> While trying to update our setup, I noticed that the download links for the 
> 3.5.2 links are broken. They all point to a different host and also contain 
> an additional `/kafka` in their URL. Compare:
> not working:
> [https://downloads.apache.org/kafka/kafka/3.5.2/RELEASE_NOTES.html]
> working:
> [https://archive.apache.org/dist/kafka/3.5.1/RELEASE_NOTES.html]
> [https://downloads.apache.org/kafka/3.6.2/RELEASE_NOTES.html]
> This goes for all links in the release - archives, checksums, signatures.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16478) Links for Kafka 3.5.2 release are broken

2024-04-05 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834452#comment-17834452
 ] 

Matthias J. Sax commented on KAFKA-16478:
-

\cc [~omkreddy] [~stanislavkozlovski] as last RMs – did any of you move 
artifacts and forgot to update the links?

> Links for Kafka 3.5.2 release are broken
> 
>
> Key: KAFKA-16478
> URL: https://issues.apache.org/jira/browse/KAFKA-16478
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Affects Versions: 3.5.2
>Reporter: Philipp Trulson
>Priority: Major
>
> While trying to update our setup, I noticed that the download links for the 
> 3.5.2 links are broken. They all point to a different host and also contain 
> an additional `/kafka` in their URL. Compare:
> not working:
> [https://downloads.apache.org/kafka/kafka/3.5.2/RELEASE_NOTES.html]
> working:
> [https://archive.apache.org/dist/kafka/3.5.1/RELEASE_NOTES.html]
> [https://downloads.apache.org/kafka/3.6.2/RELEASE_NOTES.html]
> This goes for all links in the release - archives, checksums, signatures.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16478) Links for Kafka 3.5.2 release are broken

2024-04-05 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834451#comment-17834451
 ] 

Matthias J. Sax commented on KAFKA-16478:
-

[~der_eismann] – Thanks for reporting this. Do you want to do a PR to fix it? 
Should be simple? Just need a small update to 
[https://github.com/apache/kafka-site/blob/asf-site/downloads.html] 

> Links for Kafka 3.5.2 release are broken
> 
>
> Key: KAFKA-16478
> URL: https://issues.apache.org/jira/browse/KAFKA-16478
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Affects Versions: 3.5.2
>Reporter: Philipp Trulson
>Priority: Major
>
> While trying to update our setup, I noticed that the download links for the 
> 3.5.2 links are broken. They all point to a different host and also contain 
> an additional `/kafka` in their URL. Compare:
> not working:
> [https://downloads.apache.org/kafka/kafka/3.5.2/RELEASE_NOTES.html]
> working:
> [https://archive.apache.org/dist/kafka/3.5.1/RELEASE_NOTES.html]
> [https://downloads.apache.org/kafka/3.6.2/RELEASE_NOTES.html]
> This goes for all links in the release - archives, checksums, signatures.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Fix slow processing rate in Kafka streams

2024-04-05 Thread Matthias J. Sax

Perf tuning is always tricky... 350 rec/sec sounds pretty low though.

You would first need to figure out where the bottleneck is. Kafka 
Streams exposes all kind of metrics: 
https://kafka.apache.org/documentation/#kafka_streams_monitoring


Might be good to inspect them as a first step -- maybe something is off 
and gives a first direction.


In general, it would be good to limit it to Kafka network I/O, local 
RocksDB disk I/O, or CPU utilization -- each one could be the bottleneck 
and we would need to first know which one before you can take any action 
to change configurations.


HTH.

-Matthias

On 4/4/24 7:21 PM, Nirmal Das wrote:

Hi All,

My streams application is not processing more than 350 records/sec on a
high load of 3milliom records produced every 2-3 minutes.

My scenarios are as below -
I am on Kafka and streams version of 3.5.1 .
My key-value pair is in protobuf format .
I do a groupbykey followed by TimeWindow of 10 mins with grace period of 6
hours . It is then followed by a aggregate function which stores the first
and last offset of the record along with partition for that message key.

Am I doing something wrong? Am I doing something anti-pattern which is
throttling the system ? How can I improve this?

Regards,
Dev Lover



[jira] [Commented] (KAFKA-16262) Add IQv2 to Kafka Streams documentation

2024-04-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834123#comment-17834123
 ] 

Matthias J. Sax commented on KAFKA-16262:
-

Of course you can work on this ticket.

Did you already read [https://kafka.apache.org/contributing] as a starting 
point?

For this ticket, you would need to do a PR on Github. The docs are in the Kafka 
repo: [https://github.com/apache/kafka/tree/trunk/docs]

Are you familiar with IQv2 and IQv1 and can extend the docs to explain IQv2?

> Add IQv2 to Kafka Streams documentation
> ---
>
> Key: KAFKA-16262
> URL: https://issues.apache.org/jira/browse/KAFKA-16262
> Project: Kafka
>  Issue Type: Task
>  Components: docs, streams
>            Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: beginner, newbie
>
> The new IQv2 API was added many release ago. While it is still not feature 
> complete, we should add it to the docs 
> ([https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html])
>  to make users aware of the new API so they can start to try it out, report 
> issue and provide feedback / feature requests.
> We might still state that IQv2 is not yet feature complete, but should change 
> the docs in a way to position is as the "new API", and have code exmples.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: outerJoin confusion

2024-04-04 Thread Matthias J. Sax

Yeah, that is some quirk of KS runtime...

There is some internal config (for perf reasons) that delays emitting 
results... An alternative to advancing wall-clock time would be to set 
this internal config to zero, to disable the delay.


Maybe we should disable this config when topology test driver is used 
automatically... It's not the first time it did came up.


I opened a PR for it: https://github.com/apache/kafka/pull/15660


-Matthias



On 4/3/24 3:52 PM, Chad Preisler wrote:

Changing the code to this...

assertTrue(outputTopic.isEmpty());
 testDriver.advanceWallClockTime(Duration.ofMillis(2001));
 leftTopic.pipeInput("1", "test string 3", 4002L);
 testDriver.advanceWallClockTime(Duration.ofMillis(2001));
 leftTopic.pipeInput("1", "test string 4", 6004L);

Did appear to fix the issue. Output:

First join result:
Key: 1 Value: test string 1, null
Second join result:
Key: 1 Value: test string 2, null
Key: 1 Value: test string 3, null

Still a little strange that it works the first time without advancing the
wall clock.

On Wed, Apr 3, 2024 at 5:05 PM Shashwat Pandey <
shashwat.pandey@gmail.com> wrote:


I believe you need to advanceWallClockTime

https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#advanceWallClockTime-java.time.Duration-


Regards,
Shashwat Pandey


On Wed, Apr 3, 2024 at 5:05 PM Chad Preisler 
wrote:


Seems like there is some issue with the TopologyTestDriver. I am able to
run the same stream against Kakfa and I'm getting the output I expect.

I'd

appreciate it if someone could confirm that there is an issue with the
TopologyTestDriver. If there is, any suggestions on how to test this type
of join?

On Wed, Apr 3, 2024 at 2:46 PM Chad Preisler 
wrote:


Hello,

I'm confused about the outerJoin and when records are produced with the
following code.

Topology buildTopology() {
 var builder = new StreamsBuilder();
 var leftStream = builder.stream("leftSecondsTopic",
Consumed.with(Serdes.String(), Serdes.String()));
 var rightStream = builder.stream("rightSecondsTopic",
Consumed.with(Serdes.String(), Serdes.String()));

 leftStream.outerJoin(rightStream, (left, right) -> left + ", "

+

right,

JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(2000)))
 .to("outputTopicSeconds");

 return builder.build();
 }

Here is the test driver.

@Test
 public void testSecondsJoinDoesNotWork() {
 Properties props = new Properties();
 props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testSeconds");
 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
 props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
 var app = new KafkaStreamJoinTest();
 var serializer = new StringSerializer();

 try(var testDriver = new

TopologyTestDriver(app.buildTopology(),

 props)) {
 var leftTopic =
testDriver.createInputTopic("leftSecondsTopic",
 serializer, serializer, Instant.ofEpochMilli(0L),
Duration.ZERO);
 leftTopic.pipeInput("1", "test string 1", 0L);
 leftTopic.pipeInput("1", "test string 2", 2001L);

 var outputTopic =
testDriver.createOutputTopic("outputTopicSeconds",
 new StringDeserializer(), new

StringDeserializer());

 assertFalse(outputTopic.isEmpty());
 System.out.println("First join result:");
 outputTopic.readKeyValuesToList()
 .forEach((keyValue)->
 System.out.println("Key: " + keyValue.key + "

Value:

"

+ keyValue.value));

 assertTrue(outputTopic.isEmpty());

 leftTopic.pipeInput("1", "test string 3", 4002L);
 leftTopic.pipeInput("1", "test string 4", 6004L);

 System.out.println("Second join result:");
 outputTopic.readKeyValuesToList()
 .forEach((keyValue)->
 System.out.println("Key: " + keyValue.key + "

Value:

"

+ keyValue.value));

 }
 }

Here is the output:
First join result:
Key: 1 Value: test string 1, null
Second join result:

I would have expected a join to happen with "test string 2" and "test
string 3" being output with a null right value. Why didn't that happen?










Re: [EXTERNAL] Re: Kafka Streams 3.5.1 based app seems to get stalled

2024-04-04 Thread Matthias J. Sax
/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L882C39-L882C66<https://github.com/apache/kafka/blob/f0087ac6a8a7b1005e9588e42b3679146bd3eb13/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L882C39-L882C66>><https://github.com/apache/kafka/blob/f0087ac6a8a7b1005e9588e42b3679146bd3eb13/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L882C39-L882C66<https://github.com/apache/kafka/blob/f0087ac6a8a7b1005e9588e42b3679146bd3eb13/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L882C39-L882C66><https://github.com/apache/kafka/blob/f0087ac6a8a7b1005e9588e42b3679146bd3eb13/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L882C39-L882C66<https://github.com/apache/kafka/blob/f0087ac6a8a7b1005e9588e42b3679146bd3eb13/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L882C39-L882C66>>>


On 3/13/24 2:34 AM, Venkatesh Nagarajan wrote:

Just want to share another variant of the log message which is also related to 
metadata and rebalancing but has a different client reason:

INFO [GroupCoordinator 3]: Preparing to rebalance group  in state 
PreparingRebalance with old generation nnn (__consumer_offsets-nn) (reason: Updating 
metadata for member  during Stable; client reason: triggered followup 
rebalance scheduled for 0) (kafka.coordinator.group.GroupCoordinator)

Thank you.

Kind regards,
Venkatesh

From: Venkatesh Nagarajan 
Date: Wednesday, 13 March 2024 at 12:06 pm
To: users@kafka.apache.org 
Subject: Re: [EXTERNAL] Re: Kafka Streams 3.5.1 based app seems to get stalled
Thanks very much for your important inputs, Matthias.

I will use the default METADATA_MAX_AGE_CONFIG. I set it to 5 hours when I saw 
a lot of such rebalancing related messages in the MSK broker logs:

INFO [GroupCoordinator 2]: Preparing to rebalance group  in state 
PreparingRebalance with old generation  (__consumer_offsets-nn) (reason: Updating 
metadata for member  during Stable; client reason: need to revoke partitions 
and re-join) (kafka.coordinator.group.GroupCoordinator)

I am guessing that the two are unrelated. If you have any suggestions on how to 
reduce such rebalancing, that will be very helpful.

Thank you very much.

Kind regards,
Venkatesh

From: Matthias J. Sax 
Date: Tuesday, 12 March 2024 at 1:31 pm
To: users@kafka.apache.org 
Subject: [EXTERNAL] Re: Kafka Streams 3.5.1 based app seems to get stalled
Without detailed logs (maybe even DEBUG) hard to say.

But from what you describe, it could be a metadata issue? Why are you
setting


METADATA_MAX_AGE_CONFIG (consumer and producer): 5 hours in millis (to make 
rebalances rare)


Refreshing metadata has nothing to do with rebalances, and a metadata
refresh does not trigger a rebalance.



-Matthias


On 3/10/24 5:56 PM, Venkatesh Nagarajan wrote:

Hi all,

A Kafka Streams application sometimes stops consuming events during load 
testing. Please find below the details:

Details of the app:


* Kafka Streams Version: 3.5.1
* Kafka: AWS MSK v3.6.0
* Consumes events from 6 topics
* Calls APIs to enrich events
* Sometimes joins two streams
* Produces enriched events in output topics

Runs on AWS ECS:

* Each task has 10 streaming threads
* Autoscaling based on offset lags and a maximum of 6 ECS tasks
* Input topics have 60 partitions each to match 6 tasks * 10 threads
* Fairly good spread of events across all topic partitions using partitioning 
keys

Settings and configuration:


* At least once semantics
* MAX_POLL_RECORDS_CONFIG: 10
* APPLICATION_ID_CONFIG

// Make rebalances rare and prevent stop-the-world rebalances

* Static membership (using GROUP_INSTANCE_ID_CONFIG)
* METADATA_MAX_AGE_CONFIG (consumer and producer): 5 hours in millis (to make 
rebalances rare)
* MAX_POLL_INTERVAL_MS_CONFIG: 20 minutes in millis
* SESSION_TIMEOUT_MS_CONFIG: 2 minutes in millis

State store related settings:

* TOPOLOGY_OPTIMIZATION_CONFIG: OPTIMIZE
* STATESTORE_CACHE_MAX_BYTES_CONFIG: 300 * 1024 * 1024L
* NUM_STANDBY_REPLICAS_CONFIG: 1


Symptoms:
The symptoms mentioned below occur during load tests:

Scenario# 1:
Steady input event stream

Observations:

* Gradually increasing offset lags which shouldn't happen normally as the 
streaming app is quite fast
* Events get processed

Scenario# 2:
No input events after the load test stops producing events

Observations:

* Offset lag stuck at ~5k
* Stable consumer group
* No events processed
* No errors or messages in the logs


Scenario# 3:
Restart the app when it stops processing events although offset lags are not 
zero

Observations:

* Offset lags start reducing and events start getting processed

Scenario# 4:
Transient errors occur while processing events


* A custom exception handler that implements StreamsUncaughtExceptionHandler 
returns StreamThreadExceptionResponse.REPLA

[jira] [Updated] (KAFKA-16458) Add contains method in KeyValue store interface

2024-04-02 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16458:

Labels: needs-kip  (was: )

> Add contains method in KeyValue store interface
> ---
>
> Key: KAFKA-16458
> URL: https://issues.apache.org/jira/browse/KAFKA-16458
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Ayoub Omari
>Priority: Minor
>  Labels: needs-kip
>
> In some stream processors, we sometimes just want to check if a key exists in 
> the state store or not.
>  
> I find calling .get() and checking if the return value is null a little bit 
> verbose
> {code:java}
> if (store.get(key) != null) {
> }{code}
>  
> But I am not sure if it is on purpose that we would like to keep the store 
> interface simple.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-04-02 Thread Matthias J. Sax

One more thing:

I was just looking into the WIP PR, and it seems we will also need to 
change `StreamsBuilder.scala`. The KIP needs to cover this changes as well.



-Matthias

On 4/1/24 10:33 PM, Bruno Cadonna wrote:

Hi Walker and Matthias,

(2)
That is exactly my point about having a compile time error versus a 
runtime error. The added flexibility as proposed by Matthias sounds good 
to me.


Regarding the Named parameter, I was not aware that the processor that 
writes records to the global state store is named according to the name 
passed in by Consumed. I thought Consumed strictly specifies the names 
of source processors. So I am fine with not having an overload with a 
Named parameter.


Best,
Bruno

On 3/31/24 11:30 AM, Matthias J. Sax wrote:

Two more follow up thoughts:

(1) I am still not a big fan of the boolean parameter we introduce. 
Did you consider to use different method names, like 
`addReadOnlyGlobalStore()` (for the optimized method, that would not 
reprocess data on restore), and maybe add `addModifiableGlobalStore()` 
(not a good name, but we cannot re-use existing `addGlobalStore()` -- 
maybe somebody else has a good idea about a better `addXxxGlobalStore` 
that would describe it well).


(2) I was thinking about Bruno's comment to limit the scope the store 
builder for the optimized case. I think we should actually do 
something about it, because in the end, the runtime (ie, the 
`Processor` we hard wire) would need to pick a store it supports and 
cast to the corresponding store? If the cast fails, we hit a runtime 
exception, but by putting the store we cast to into the signature we 
can actually convert it into a compile time error what seems better. 
-- If we want, we could make it somewhat flexible and support both 
`KeyValueStore` and `TimestampedKeyValueStore` -- ie, the signature 
would be `KeyValueStore` but we explicitly check if the builder gives 
us a `TimestampedKeyValueStore` instance and use it properly.


If putting the signature does not work for some reason, we should at 
least clearly call it out in the JavaDocs what store type is expected.




-Matthias



On 3/28/24 5:05 PM, Walker Carlson wrote:

Hey all,

Thanks for the feedback Bruno, Almog and Matthias!

Almog: I like the idea, but I agree with Matthais. I actually looked at
that ticket a bit when doing this and found that while similar they are
actually pretty unrelated codewise. I would love to see it get taken 
care

of.

Bruno and Matthias: The Named parameter doesn't really make sense to 
me to
put it here. The store in the Store builder is already named through 
what

Matthais described and the processor doesn't actually have a name. That
would be the processor node that gets named via the Named parameter  (in
the DSL) and the internal streams builder uses the consumed object to 
make

a source name. I think we should just keep the Consumed object and used
that for the processor node name.

As for the limitation of the store builder interface I don't think it is
necessary. It could be something we add later if we really want to.

Anyways I think we are getting close enough to consensus that I'm 
going to

open a vote and hopefully we can get it voted on soon!

best,
Walker

On Thu, Mar 28, 2024 at 5:55 AM Matthias J. Sax  
wrote:



Hey,

looking into the API, I am wondering why we would need to add an
overload talking a `Named` parameter?

StreamsBuilder.addGlobalStore() (and .addGlobalTable()) already takes a
`Consumed` parameter that allows to set a name.



2.
I do not understand what you mean with "maximum flexibility". The

built-in processor needs to assume a given state store interface. That
means, users have to provide a state store that offers that 
interface. If

they do not they will get a runtime exception. If we require a store
builder for a given interface, we can catch the mistake at compile 
time.

Let me know whether I misunderstood something.

Yes, we could catch it at runtime. But I guess what I was trying to say
is different: I was trying to say, we should not limit the API to 
always

require a specific store, such that global stores can only be of a
certain type. Global Stores should be allowed to be of any type. Hence,
if we add a built-in processor, it can only be one option, and we 
always

need to support custom processor, and might also want to try to allow
the restore optimization for custom processor (and thus other store
types), not just for our built-in processor (and our built-in stores).
Coupling the optimization to built-in stores would prevent us to apply
the optimization to custom stores.



@Almog: interesting idea. I tend to think that both issues are
orthogonal. If users pick to apply the optimization "added" by this 
KIP,

the bug you mentioned would still apply to global stores, and thus this
KIP is not addressing the issue you mentioned.

Personally, I also think that we don't need a KIP to fix the ticket you
mentioned? In the 

Re: [VOTE] KIP-1020 Move `window.size.ms` and `windowed.inner.serde.class` from `StreamsConfig` to TimeWindowedDe/Serializer class

2024-04-02 Thread Matthias J. Sax

+1 (binding)


-Matthias

On 4/1/24 7:44 PM, Lucia Cerchie wrote:

Hello everyone,

I'd like to call a vote on KIP-1020
.
It has been under discussion since Feb 15, and has received edits to the
KIP and approval by discussion participants.

Best,
Lucia Cerchie



Re: [DISCUSS] KIP-989: RocksDB Iterator Metrics

2024-03-31 Thread Matthias J. Sax

The time window thing was just an idea. Happy to drop it.

For the oldest iterator metric, I would propose something simple like 
`iterator-opened-ms` and it would just be the actual timestamp when the 
iterator was opened. I don't think we need to compute the actual age, 
but user can to this computation themselves?


If we think reporting the age instead of just the timestamp is better, I 
would propose `iterator-max-age-ms`. I should be sufficient to call out 
(as it's kinda "obvious" anyway) that the metric applies to open 
iterator only.


And yes, I was hoping that the code inside MetereXxxStore might already 
be setup in a way that custom stores would inherit the iterator metrics 
automatically -- I am just not sure, and left it as an exercise for 
somebody to confirm :)



Nit: the KIP says it's a store-level metric, but I think it would be 
good to say explicitly that it's recorded with DEBUG level only?




-Matthias


On 3/28/24 2:52 PM, Nick Telford wrote:

Quick addendum:

My suggested metric "oldest-open-iterator-age-seconds" should be
"oldest-open-iterator-age-ms". Milliseconds is obviously a better
granularity for such a metric.

Still accepting suggestions for a better name.

On Thu, 28 Mar 2024 at 13:41, Nick Telford  wrote:


Hi everyone,

Sorry for leaving this for so long. So much for "3 weeks until KIP freeze"!

On Sophie's comments:
1. Would Matthias's suggestion of a separate metric tracking the age of
the oldest open iterator (within the tag set) satisfy this? That way we can
keep iterator-duration-(avg|max) for closed iterators, which can be useful
for performance debugging for iterators that don't leak. I'm not sure what
we'd call this metric, maybe: "oldest-open-iterator-age-seconds"? Seems
like a mouthful.

2. You're right, it makes more sense to provide
iterator-duration-(avg|max). Honestly, I can't remember why I had "total"
before, or why I was computing a rate-of-change over it.

3, 4, 5, 6. Agreed, I'll make all those changes as suggested.

7. Combined with Matthias's point about RocksDB, I'm convinced that this
is the wrong KIP for these. I'll introduce the additional Rocks metrics in
another KIP.

On Matthias's comments:
A. Not sure about the time window. I'm pretty sure all existing avg/max
metrics are since the application was started? Any other suggestions here
would be appreciated.

B. Agreed. See point 1 above.

C. Good point. My focus was very much on Rocks memory leaks when I wrote
the first draft. I can generalise it. My only concern is that it might make
it more difficult to detect Rocks iterator leaks caused *within* our
high-level iterator, e.g. RocksJNI, RocksDB, RocksDBStore, etc. But we
could always provide a RocksDB-specific metric for this, as you suggested.

D. Hmm, we do already have MeteredKeyValueIterator, which automatically
wraps the iterator from inner-stores of MeteredKeyValueStore. If we
implemented these metrics there, then custom stores would automatically
gain the functionality, right? This seems like a pretty logical place to
implement these metrics, since MeteredKeyValueStore is all about adding
metrics to state stores.


I imagine the best way to implement this would be to do so at the
high-level iterator rather than implementing it separately for each
specific iterator implementation for every store type.


Sophie, does MeteredKeyValueIterator fit with your recommendation?

Thanks for your thoughts everyone, I'll update the KIP now.

Nick

On Thu, 14 Mar 2024 at 03:37, Sophie Blee-Goldman 
wrote:


About your last two points: I completely agree that we should try to
make this independent of RocksDB, and should probably adopt a
general philosophy of being store-implementation agnostic unless
there is good reason to focus on a particular store type: eg if it was
only possible to implement for certain stores, or only made sense in
the context of a certain store type but not necessarily stores in general.

While leaking memory due to unclosed iterators on RocksDB stores is
certainly the most common issue, I think Matthias sufficiently
demonstrated that the problem of leaking iterators is not actually
unique to RocksDB, and we should consider including in-memory
stores at the very least. I also think that at this point, we may as well
just implement the metrics for *all* store types, whether rocksdb or
in-memory or custom. Not just because it probably applies to all
store types (leaking iterators are rarely a good thing!) but because
I imagine the best way to implement this would be to do so at the
high-level iterator rather than implementing it separately for each
specific iterator implementation for every store type.

That said, I haven't thought all that carefully about the implementation
yet -- it just strikes me as easiest to do at the top level of the store
hierarchy rather than at the bottom. My gut instinct may very well be
wrong, but that's what it's saying

On Thu, Mar 7

Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-03-31 Thread Matthias J. Sax

Two more follow up thoughts:

(1) I am still not a big fan of the boolean parameter we introduce. Did 
you consider to use different method names, like 
`addReadOnlyGlobalStore()` (for the optimized method, that would not 
reprocess data on restore), and maybe add `addModifiableGlobalStore()` 
(not a good name, but we cannot re-use existing `addGlobalStore()` -- 
maybe somebody else has a good idea about a better `addXxxGlobalStore` 
that would describe it well).


(2) I was thinking about Bruno's comment to limit the scope the store 
builder for the optimized case. I think we should actually do something 
about it, because in the end, the runtime (ie, the `Processor` we hard 
wire) would need to pick a store it supports and cast to the 
corresponding store? If the cast fails, we hit a runtime exception, but 
by putting the store we cast to into the signature we can actually 
convert it into a compile time error what seems better. -- If we want, 
we could make it somewhat flexible and support both `KeyValueStore` and 
`TimestampedKeyValueStore` -- ie, the signature would be `KeyValueStore` 
but we explicitly check if the builder gives us a 
`TimestampedKeyValueStore` instance and use it properly.


If putting the signature does not work for some reason, we should at 
least clearly call it out in the JavaDocs what store type is expected.




-Matthias



On 3/28/24 5:05 PM, Walker Carlson wrote:

Hey all,

Thanks for the feedback Bruno, Almog and Matthias!

Almog: I like the idea, but I agree with Matthais. I actually looked at
that ticket a bit when doing this and found that while similar they are
actually pretty unrelated codewise. I would love to see it get taken care
of.

Bruno and Matthias: The Named parameter doesn't really make sense to me to
put it here. The store in the Store builder is already named through what
Matthais described and the processor doesn't actually have a name. That
would be the processor node that gets named via the Named parameter  (in
the DSL) and the internal streams builder uses the consumed object to make
a source name. I think we should just keep the Consumed object and used
that for the processor node name.

As for the limitation of the store builder interface I don't think it is
necessary. It could be something we add later if we really want to.

Anyways I think we are getting close enough to consensus that I'm going to
open a vote and hopefully we can get it voted on soon!

best,
Walker

On Thu, Mar 28, 2024 at 5:55 AM Matthias J. Sax  wrote:


Hey,

looking into the API, I am wondering why we would need to add an
overload talking a `Named` parameter?

StreamsBuilder.addGlobalStore() (and .addGlobalTable()) already takes a
`Consumed` parameter that allows to set a name.



2.
I do not understand what you mean with "maximum flexibility". The

built-in processor needs to assume a given state store interface. That
means, users have to provide a state store that offers that interface. If
they do not they will get a runtime exception. If we require a store
builder for a given interface, we can catch the mistake at compile time.
Let me know whether I misunderstood something.

Yes, we could catch it at runtime. But I guess what I was trying to say
is different: I was trying to say, we should not limit the API to always
require a specific store, such that global stores can only be of a
certain type. Global Stores should be allowed to be of any type. Hence,
if we add a built-in processor, it can only be one option, and we always
need to support custom processor, and might also want to try to allow
the restore optimization for custom processor (and thus other store
types), not just for our built-in processor (and our built-in stores).
Coupling the optimization to built-in stores would prevent us to apply
the optimization to custom stores.



@Almog: interesting idea. I tend to think that both issues are
orthogonal. If users pick to apply the optimization "added" by this KIP,
the bug you mentioned would still apply to global stores, and thus this
KIP is not addressing the issue you mentioned.

Personally, I also think that we don't need a KIP to fix the ticket you
mentioned? In the end, we need to skip records during restore, and it
seems it does not make sense to make this configurable?



-Matthias


On 3/26/24 4:24 PM, Almog Gavra wrote:

Thanks for the thoughts Bruno!


Do you mean a API to configure restoration instead of boolean flag

reprocessOnRestore?

Yes, this is exactly the type of thing I was musing (but I don't have any
concrete suggestions). It feels like that would give the flexibility to

do

things like the motivation section of the KIP (allow bulk loading of
records without reprocessing) while also solving other limitations.

I'm supportive of the KIP as-is but was hoping somebody with more
experience would have a sudden inspiration for how to solve both issues
with one API! Anyway, I'll slide back into the lurking shadows for now

and

Re: [DISCUSS] KIP-1027 Add MockFixedKeyProcessorContext

2024-03-28 Thread Matthias J. Sax
It seems that `MockRecordMetadata` is a private class, and thus not part 
of the public API. If there are any changes required, we don't need to 
discuss on the KIP.



For `CapturedPunctuator` and `CapturedForward` it's a little bit more 
tricky. My gut feeling is, that the classes might not need to be 
changed, but if we use them within `MockProcessorContext` and 
`MockFixedKeyProcessorContext` it might be weird to keep the current 
nesting... The problem I see is, that it's not straightforward how to 
move the classes w/o breaking compatibility, nor if we duplicate them as 
standalone classes w/o a larger "splash radius". (We would need to add 
new overloads for MockProcessorContext#scheduledPunctuators() and 
MockProcessorContext#forwarded()).


Might be good to hear from others if we think it's worth this larger 
changes to get rid of the nesting, or just accept the somewhat not ideal 
nesting as it technically is not a real issue?



-Matthias


On 3/15/24 1:47 AM, Shashwat Pandey wrote:

Thanks for the feedback Matthias!

The reason I proposed the extension of MockProcessorContext was more to do
with the internals of the class (MockRecordMetadata, CapturedPunctuator and
CapturedForward).

However, I do see your point, I would then think to split
MockProcessorContext and MockFixedKeyProcessorContext, some of the internal
classes should also be extracted i.e. MockRecordMetadata,
CapturedPunctuator and probably a new CapturedFixedKeyForward.

Let me know what you think!


Regards,
Shashwat Pandey


On Mon, Mar 11, 2024 at 10:09 PM Matthias J. Sax  wrote:


Thanks for the KIP Shashwat. Closing this testing gap is great! It did
come up a few time already...

One question: why do you propose to `extend MockProcessorContext`?

Given how the actual runtime context classes are setup, it seems that
the regular context and fixed-key-context are distinct, and thus I
believe both mock-context classes should be distinct, too?

What I mean is that FixedKeyProcessorContext does not extend
ProcessorContext. Both classes have a common parent ProcessINGContext
(note the very similar but different names), but they are "siblings"
only, so why make the mock processor a parent-child relationship?

It seems better to do

public class MockFixedKeyProcessorContext
implements FixedKeyProcessorContext,
   RecordCollector.Supplier


Of course, if there is code we can share between both mock-context we
should so this, but it should not leak into the public API?


-Matthias



On 3/11/24 5:21 PM, Shashwat Pandey wrote:

Hi everyone,

I would like to start the discussion on


https://cwiki.apache.org/confluence/display/KAFKA/KIP-1027%3A+Add+MockFixedKeyProcessorContext


This adds MockFixedKeyProcessorContext to the Kafka Streams Test Utils
library.

Regards,
Shashwat Pandey







Re: [DISCUSS] KIP-1020: Deprecate `window.size.ms` in `StreamsConfig`

2024-03-28 Thread Matthias J. Sax

Thanks. I think you can start a VOTE.

-Matthias


On 3/20/24 9:24 PM, Lucia Cerchie wrote:

thanks Sophie, I've made the updates, would appreciate one more look before
submission

On Wed, Mar 20, 2024 at 8:36 AM Sophie Blee-Goldman 
wrote:


A few minor notes on the KIP but otherwise I think you can go ahead and
call for a vote!

1. Need to update the Motivation section to say "console clients" or
"console consumer/producer" instead of " plain consumer client"
2. Remove the first paragraph under "Public Interfaces" (ie the KIP-writing
instructions) and also list the new config definitions here. You can just
add a code snippet for each class (TimeWindowedDe/Serializer) with the
actual variable definition you'll be adding. Maybe also add a code snippet
for StreamsConfig with the @Deprecated annotation added to the two configs
we're deprecating
3. nit: remove the "questions" under "Compatibility, Deprecation, and
Migration Plan", ie the stuff from the KIP-writing template. Just makes it
easier to read
4. In "Test Plan" we should also have a unit test to make sure the new "
windowed.inner.de/serializer.class" takes preference in the case that both
it and the old "windowed.inner.serde.class" config are both specified

Also, this is more of a question than a suggestion, but is the KIP title
perhaps a bit misleading in that people might assume these configs will no
longer be available for use at all? What do people think about changing it
to something like

*Move `window.size.ms <http://window.size.ms>` and
`windowed.inner.serde.class` from `StreamsConfig` to
TimeWindowedDe/Serializer class*

A bit long/clunky but at least it gets the point across about where folks
can find these configs going forward.

On Mon, Mar 18, 2024 at 6:26 PM Lucia Cerchie

wrote:


Thanks for the discussion!

I've updated the KIP here with what I believe are the relevant pieces,
please let me know if anything is missing:


https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804


On Sun, Mar 17, 2024 at 7:09 PM Sophie Blee-Goldman <

sop...@responsive.dev



wrote:


Sounds good!

@Lucia when you have a moment can you update the KIP with
the new proposal, including the details that Matthias pointed
out in his last response? After that's done I think you can go
ahead and call for a vote whenever you're ready!

On Sat, Mar 16, 2024 at 7:35 PM Matthias J. Sax 

wrote:



Thanks for the summary. Sounds right to me. That is what I would

propose.


As you pointed out, we of course still need to support the current
confis, and we should log a warning when in use (even if the new one

is

in use IMHO) -- but that's more an implementation detail.

I agree that the new config should take preference in case both are
specified. This should be pointed out in the KIP, as it's an

important

contract the user needs to understand.


-Matthias

On 3/14/24 6:18 PM, Sophie Blee-Goldman wrote:


Should we change it do `.serializer` and `.deserialize`?


That's a good point -- if we're going to split this up by defining

the

config
in both the TimeWindowedSerializer and TimeWindowedDeserializer,
then it makes perfect sense to go a step further and actually

define

only the relevant de/serializer class instead of the full serde

Just to put this all together, it sounds like the proposal is to:

1) Deprecate both these configs where they appear in StreamsConfig
(as per the original plan in the KIP, just reiterating it here)

2) Don't "define" either config in any specific client's Config

class,

but just define a String variable with the config name in the

relevant

de/serializer class, and maybe point people to it in the docs

somewhere


3) We would add three new public String variables for three

different

configs across two classes, specifically:

In TimeWindowedSerializer:
- define a constant for "windowed.inner.serializer.class"
In TimeWindowedDeserializer:
- define a constant for "windowed.inner.deserializer.class"
- define a constant for "window.size.ms"

4) Lastly, we would update the windowed de/serializer

implementations

to check for the new configs (ie "

windowed.inner.de/serializer.class

")

and use the provided de/serializer class, if one was given. If the

new

configs are not present, they would fall back to the

original/current

logic (ie that based on the old "windowed.inner.serde.class"

config)


I think that's everything. Does this sound about right for where we

want

to go with these configs?

On Thu, Mar 14, 2024 at 4:58 PM Matthias J. Sax 

wrote:



By "don't add them" do you just mean we would not have any

actual

variables defined anywhere for these configs (eg WINDOW_SIZE_MS)
and simply document -- somewhere -- that one can use the string
"window.size.ms" when configuring a command-line client with a
win

Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-03-28 Thread Matthias J. Sax

Hey,

looking into the API, I am wondering why we would need to add an 
overload talking a `Named` parameter?


StreamsBuilder.addGlobalStore() (and .addGlobalTable()) already takes a 
`Consumed` parameter that allows to set a name.




2.
I do not understand what you mean with "maximum flexibility". The built-in processor needs to assume a given state store interface. That means, users have to provide a state store that offers that interface. If they do not they will get a runtime exception. If we require a store builder for a given interface, we can catch the mistake at compile time. Let me know whether I misunderstood something. 


Yes, we could catch it at runtime. But I guess what I was trying to say 
is different: I was trying to say, we should not limit the API to always 
require a specific store, such that global stores can only be of a 
certain type. Global Stores should be allowed to be of any type. Hence, 
if we add a built-in processor, it can only be one option, and we always 
need to support custom processor, and might also want to try to allow 
the restore optimization for custom processor (and thus other store 
types), not just for our built-in processor (and our built-in stores). 
Coupling the optimization to built-in stores would prevent us to apply 
the optimization to custom stores.




@Almog: interesting idea. I tend to think that both issues are 
orthogonal. If users pick to apply the optimization "added" by this KIP, 
the bug you mentioned would still apply to global stores, and thus this 
KIP is not addressing the issue you mentioned.


Personally, I also think that we don't need a KIP to fix the ticket you 
mentioned? In the end, we need to skip records during restore, and it 
seems it does not make sense to make this configurable?




-Matthias


On 3/26/24 4:24 PM, Almog Gavra wrote:

Thanks for the thoughts Bruno!


Do you mean a API to configure restoration instead of boolean flag

reprocessOnRestore?

Yes, this is exactly the type of thing I was musing (but I don't have any
concrete suggestions). It feels like that would give the flexibility to do
things like the motivation section of the KIP (allow bulk loading of
records without reprocessing) while also solving other limitations.

I'm supportive of the KIP as-is but was hoping somebody with more
experience would have a sudden inspiration for how to solve both issues
with one API! Anyway, I'll slide back into the lurking shadows for now and
let the discussion continue :)

Cheers,
Almog

On Tue, Mar 26, 2024 at 4:22 AM Bruno Cadonna  wrote:


Hi Almog,

Do you mean a API to configure restoration instead of boolean flag
reprocessOnRestore?

Do you already have an idea?

The proposal in the KIP is focused on the processor that updates the
global state whereas in the case of GlobalKTable and source KTable the
issues lies in the deserialization of records from the input topics, but
only if the deserialization error handler is configured to drop the
problematic record. Additionally, for source KTable the source topic
optimization must be turned on to run into the issue. I am wondering how
a unified API for global stores, GlobalKTable, and source KTable might
look like.

While it is an interesting question, I am in favor of deferring this to
a separate KIP.

Best,
Bruno

On 3/26/24 12:49 AM, Almog Gavra wrote:

Hello Folk!

Glad to see improvements to the GlobalKTables in discussion! I think they
deserve more love :)

Scope creep alert (which I'm generally against and certainly still

support

this KIP without but I want to see if there's an elegant way to address
both problems). The KIP mentions that "Now the restore is done by
reprocessing using an instance from the customer processor supplier"

which

I suppose fixed a long-standing bug (
https://issues.apache.org/jira/browse/KAFKA-8037) but only for
GlobalKTables and not for normal KTables that use the source-changelog
optimization. Since this API could be used to signal "I want to reprocess
on restore" I'm wondering whether it makes sense to design this API in a
way that could be extended for KTables as well so a fix for KAFKA-8037
would be possible with the same mechanism. Thoughts?

Cheers,
Almog

On Mon, Mar 25, 2024 at 11:06 AM Walker Carlson
 wrote:


Hey Bruno,

1) I'm actually not sure why that is in there. It certainly doesn't

match

the convention. Best to remove it and match the other methods.

2) Yeah, I thought about it but I'm not convinced it is a necessary
restriction. It might be useful for the already defined processors but

then

they might as well use the `globalTable` method. I think the add state
store option should go for maximum flexibility.

Best,
Walker



On Fri, Mar 22, 2024 at 10:01 AM Bruno Cadonna 

wrote:



Hi Walker,

A couple of follow-up questions.

1.
Why do you propose to explicitly pass a parameter "storeName" in
StreamsBuilder#addGlobalStore?
The StoreBuilder should already provide a name for the store, if I
understand the code 

[jira] [Commented] (KAFKA-16382) Kafka Streams drop NULL values after reset

2024-03-28 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831709#comment-17831709
 ] 

Matthias J. Sax commented on KAFKA-16382:
-

Thanks for clarifying. Seems I did not understand the ticket correclty.
{quote} # We start Kafka, it process the input topic from scratch but 
"optimise" internally nulls. The output topic *still* contains mapped result. 
The delete message *never* reach the output topic.{quote}
You are right that this does not sound correct.

Seems you can reliable reproduce the issue, so it should hopefully not be too 
hard to find a fix. Thanks for reporting it.

> Kafka Streams drop NULL values after reset
> --
>
> Key: KAFKA-16382
> URL: https://issues.apache.org/jira/browse/KAFKA-16382
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Stanislav Spiridonov
>Priority: Major
>
> Kafka Streams (KTable) drops null values after full reset.
> See 
> [https://github.com/foal/Null-Issue/blob/main/src/main/java/NullProblemExample.java]
>  for sample topology
> Step to reproduce (req NULL-IN, NULL-IN-AUX, NULL-OUT topics)
>  # Start example - 1st round
>  # Send to NULL-IN "A1:a" -> NULL-OUT "A1:anull"
>  # Send to NULL-IN-AUX "A1:b" -> NULL-OUT "A1:anull, A1:ab"
>  # Stop application 
>  # Run kafka-streams-application-reset 
> {code:java}
> call bin/windows/kafka-streams-application-reset --application-id 
> nullproblem-example^
>  --input-topics "NULL-IN,NULL-IN-AUX"^
>  --bootstrap-server "localhost:9092"
> {code}
>  # Send to NULL-IN-AUX "A1:" -> NULL-OUT "A1:anull, A1:ab" - it is Ok (no app 
> running yet)
>  # Start example - 2nd round
>  # After initialization -> NULL-OUT *still contains* "A1:anull, A1:ab"
>  # Expected output "A1:anull, A1:ab, A1:"
> The issue is NOT reproduced if application just restarted (skip step 5). 
> The issue is NOT reproduced if internal cache is disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-10409) Refactor Kafka Streams RocksDb iterators

2024-03-28 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831704#comment-17831704
 ] 

Matthias J. Sax commented on KAFKA-10409:
-

I don't think anybody worked on this ticket (not sure why status actually says 
"in progress" – seems to be wrong).

> Refactor Kafka Streams RocksDb iterators 
> -
>
> Key: KAFKA-10409
> URL: https://issues.apache.org/jira/browse/KAFKA-10409
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jorge Esteban Quilcate Otoya
>Priority: Minor
>  Labels: newbie
>
> From [https://github.com/apache/kafka/pull/9137#discussion_r470345513] :
> [~ableegoldman] : 
> > Kind of unrelated, but WDYT about renaming {{RocksDBDualCFIterator}} to 
> > {{RocksDBDualCFAllIterator}} or something on the side? I feel like these 
> > iterators could be cleaned up a bit in general to be more understandable – 
> > for example, it's weird that we do the {{iterator#seek}}-ing in the actual 
> > {{all()}} method but for range queries we do the seeking inside the 
> > iterator constructor.
> and [https://github.com/apache/kafka/pull/9137#discussion_r470361726] :
> > Personally I found the {{RocksDBDualCFIterator}} logic a bit difficult to 
> > follow even before the reverse iteration, so it would be nice to have some 
> > tests specifically covering reverse iterators over multi-column-family 
> > timestamped stores



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15951) MissingSourceTopicException should include topic names

2024-03-28 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831701#comment-17831701
 ] 

Matthias J. Sax commented on KAFKA-15951:
-

Sorry for late reply. I was traveling.

Of course you can take the ticket. Thanks for the PR. I've put it into my 
review backlog.

Also added you as a contributor and assigned the ticket to you. You can also 
self-assign tickets now.

> MissingSourceTopicException should include topic names
> --
>
> Key: KAFKA-15951
> URL: https://issues.apache.org/jira/browse/KAFKA-15951
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>            Reporter: Matthias J. Sax
>Priority: Major
>
> As the title say – we don't include topic names in all cases, what make it 
> hard for users to identify the root cause more clearly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15951) MissingSourceTopicException should include topic names

2024-03-28 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-15951:
---

Assignee: sanghyeok An

> MissingSourceTopicException should include topic names
> --
>
> Key: KAFKA-15951
> URL: https://issues.apache.org/jira/browse/KAFKA-15951
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>            Reporter: Matthias J. Sax
>Assignee: sanghyeok An
>Priority: Major
>
> As the title say – we don't include topic names in all cases, what make it 
> hard for users to identify the root cause more clearly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16263) Add Kafka Streams docs about available listeners/callback

2024-03-28 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831695#comment-17831695
 ] 

Matthias J. Sax commented on KAFKA-16263:
-

Absolutely. If a ticket is not assigned to anybody, feel free to self-assign to 
yourself and pick it up.

> Add Kafka Streams docs about available listeners/callback
> -
>
> Key: KAFKA-16263
> URL: https://issues.apache.org/jira/browse/KAFKA-16263
> Project: Kafka
>  Issue Type: Task
>  Components: docs, streams
>            Reporter: Matthias J. Sax
>Assignee: Kuan Po Tseng
>Priority: Minor
>  Labels: beginner, newbie
>
> Kafka Streams allows to register all kind of listeners and callback (eg, 
> uncaught-exception-handler, restore-listeners, etc) but those are not in the 
> documentation.
> A good place might be 
> [https://kafka.apache.org/documentation/streams/developer-guide/running-app.html]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16263) Add Kafka Streams docs about available listeners/callback

2024-03-28 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-16263:
---

Assignee: Kuan Po Tseng

> Add Kafka Streams docs about available listeners/callback
> -
>
> Key: KAFKA-16263
> URL: https://issues.apache.org/jira/browse/KAFKA-16263
> Project: Kafka
>  Issue Type: Task
>  Components: docs, streams
>            Reporter: Matthias J. Sax
>Assignee: Kuan Po Tseng
>Priority: Minor
>  Labels: beginner, newbie
>
> Kafka Streams allows to register all kind of listeners and callback (eg, 
> uncaught-exception-handler, restore-listeners, etc) but those are not in the 
> documentation.
> A good place might be 
> [https://kafka.apache.org/documentation/streams/developer-guide/running-app.html]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16407) ForeignKey INNER join ignores FK change when its previous value is null

2024-03-27 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831468#comment-17831468
 ] 

Matthias J. Sax commented on KAFKA-16407:
-

Did 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams]
 not fix this?

> ForeignKey INNER join ignores FK change when its previous value is null
> ---
>
> Key: KAFKA-16407
> URL: https://issues.apache.org/jira/browse/KAFKA-16407
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.2, 3.0.2, 3.1.2, 3.2.3, 3.3.2, 3.4.1, 3.5.2, 3.7.0, 
> 3.6.1
>Reporter: Ayoub Omari
>Assignee: Ayoub Omari
>Priority: Major
> Attachments: InnerFKJoinTest.scala, JsonSerde.scala
>
>
> We have two topics : _left-topic[String, LeftRecord]_ and 
> _right-topic[String, String]_
> where _LeftRecord_ :
> {code:scala}
>  case class LeftRecord(foreignKey: String, name: String){code}
> we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
> resulting join value is the value in right-topic.
>  
> *Scenario: Primary key pk1 gets mapped to a new FK after having a null FK*
> {code:scala}
> rightTopic.pipeInput("fk", "1")
> leftTopic.pipeInput("pk1", LeftRecord(null, "pk1"))
> leftTopic.pipeInput("pk1", LeftRecord("fk", "pk1")) {code}
>  
> *+Expected result+*
> {code:scala}
> KeyValue(pk1, 1){code}
>  
> *+Actual result+*
> {code:scala}
> # No output !
> # Logs:
> 20:14:29,723 WARN  
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier
>   - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] 
> topic=[left-topic] partition=[0] offset=[0]
> 20:14:29,728 WARN  
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier
>   - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] 
> topic=[left-topic] partition=[0] offset=[1]
> {code}
>  
> After looking into the code, I believe this is the line behind the issue : 
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L147



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16382) Kafka Streams drop NULL values after reset

2024-03-27 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831208#comment-17831208
 ] 

Matthias J. Sax commented on KAFKA-16382:
-

An application reset, does not touch the output topic. So it's expected that 
output from the first run of the app is still in the output topic after the 
reset.
{quote}The issue is NOT reproduced if internal cache is disabled.
{quote}
That's also sounds like behavior as expected. Caching implies that the output 
might not contain all intermediate results.

I think we can close this ticket as "not a bug" ?

> Kafka Streams drop NULL values after reset
> --
>
> Key: KAFKA-16382
> URL: https://issues.apache.org/jira/browse/KAFKA-16382
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Stanislav Spiridonov
>Priority: Major
>
> Kafka Streams (KTable) drops null values after full reset.
> See 
> [https://github.com/foal/Null-Issue/blob/main/src/main/java/NullProblemExample.java]
>  for sample topology
> Step to reproduce (req NULL-IN, NULL-IN-AUX, NULL-OUT topics)
>  # Start example - 1st round
>  # Send to NULL-IN "A1:a" -> NULL-OUT "A1:anull"
>  # Send to NULL-IN-AUX "A1:b" -> NULL-OUT "A1:anull, A1:ab"
>  # Stop application 
>  # Run kafka-streams-application-reset 
> {code:java}
> call bin/windows/kafka-streams-application-reset --application-id 
> nullproblem-example^
>  --input-topics "NULL-IN,NULL-IN-AUX"^
>  --bootstrap-server "localhost:9092"
> {code}
>  # Send to NULL-IN-AUX "A1:" -> NULL-OUT "A1:anull, A1:ab" - it is Ok (no app 
> running yet)
>  # Start example - 2nd round
>  # After initialization -> NULL-OUT *still contains* "A1:anull, A1:ab"
>  # Expected output "A1:anull, A1:ab, A1:"
> The issue is NOT reproduced if application just restarted (skip step 5). 
> The issue is NOT reproduced if internal cache is disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [ANNOUNCE] New committer: Christo Lolov

2024-03-27 Thread Matthias J. Sax

Congrats!

On 3/26/24 9:39 PM, Christo Lolov wrote:

Thank you everyone!

It wouldn't have been possible without quite a lot of reviews and extremely
helpful inputs from you and the rest of the community! I am looking forward
to working more closely with you going forward :)

On Tue, 26 Mar 2024 at 14:31, Kirk True  wrote:


Congratulations Christo!


On Mar 26, 2024, at 7:27 AM, Satish Duggana 

wrote:


Congratulations Christo!

On Tue, 26 Mar 2024 at 19:20, Ivan Yurchenko  wrote:


Congrats!

On Tue, Mar 26, 2024, at 14:48, Lucas Brutschy wrote:

Congrats!

On Tue, Mar 26, 2024 at 2:44 PM Federico Valeri 

wrote:


Congrats!

On Tue, Mar 26, 2024 at 2:27 PM Mickael Maison <

mickael.mai...@gmail.com> wrote:


Congratulations Christo!

On Tue, Mar 26, 2024 at 2:26 PM Chia-Ping Tsai 

wrote:


Congrats Christo!

Chia-Ping









Re: [ANNOUNCE] New committer: Christo Lolov

2024-03-27 Thread Matthias J. Sax

Congrats!

On 3/26/24 9:39 PM, Christo Lolov wrote:

Thank you everyone!

It wouldn't have been possible without quite a lot of reviews and extremely
helpful inputs from you and the rest of the community! I am looking forward
to working more closely with you going forward :)

On Tue, 26 Mar 2024 at 14:31, Kirk True  wrote:


Congratulations Christo!


On Mar 26, 2024, at 7:27 AM, Satish Duggana 

wrote:


Congratulations Christo!

On Tue, 26 Mar 2024 at 19:20, Ivan Yurchenko  wrote:


Congrats!

On Tue, Mar 26, 2024, at 14:48, Lucas Brutschy wrote:

Congrats!

On Tue, Mar 26, 2024 at 2:44 PM Federico Valeri 

wrote:


Congrats!

On Tue, Mar 26, 2024 at 2:27 PM Mickael Maison <

mickael.mai...@gmail.com> wrote:


Congratulations Christo!

On Tue, Mar 26, 2024 at 2:26 PM Chia-Ping Tsai 

wrote:


Congrats Christo!

Chia-Ping









[jira] [Commented] (KAFKA-16404) Flaky test org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testGetStreamsConfig

2024-03-27 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831202#comment-17831202
 ] 

Matthias J. Sax commented on KAFKA-16404:
-

Sounds like an env issue to me. I would propose to close this as "not a bug".

> Flaky test 
> org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testGetStreamsConfig
> -
>
> Key: KAFKA-16404
> URL: https://issues.apache.org/jira/browse/KAFKA-16404
> Project: Kafka
>  Issue Type: Bug
>Reporter: Igor Soarez
>Priority: Major
>
>  
> {code:java}
> org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testGetStreamsConfig()
>  failed, log available in 
> /home/jenkins/workspace/Kafka_kafka-pr_PR-14903@2/streams/examples/build/reports/testOutput/org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testGetStreamsConfig().test.stdout
> Gradle Test Run :streams:examples:test > Gradle Test Executor 87 > 
> WordCountDemoTest > testGetStreamsConfig() FAILED
>     org.apache.kafka.streams.errors.ProcessorStateException: Error opening 
> store KSTREAM-AGGREGATE-STATE-STORE-03 at location 
> /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03
>         at 
> app//org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:329)
>         at 
> app//org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:69)
>         at 
> app//org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:254)
>         at 
> app//org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175)
>         at 
> app//org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>         at 
> app//org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56)
>         at 
> app//org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>         at 
> app//org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:151)
>         at 
> app//org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
>         at 
> app//org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:151)
>         at 
> app//org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
>         at 
> app//org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
>         at 
> app//org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)
>         at 
> app//org.apache.kafka.streams.TopologyTestDriver.setupTask(TopologyTestDriver.java:530)
>         at 
> app//org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:373)
>         at 
> app//org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:300)
>         at 
> app//org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:276)
>         at 
> app//org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.setup(WordCountDemoTest.java:60)
>         Caused by:
>         org.rocksdb.RocksDBException: While lock file: 
> /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03/LOCK:
>  Resource temporarily unavailable
>             at app//org.rocksdb.RocksDB.open(Native Method)
>             at app//org.rocksdb.RocksDB.open(RocksDB.java:307)
>             at 
> app//org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:323)
>             ... 17 more
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16403) Flaky test org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords

2024-03-27 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831201#comment-17831201
 ] 

Matthias J. Sax commented on KAFKA-16403:
-

Sound like an env issue but not a bug to me? I would tend to close as "not a 
bug"?

> Flaky test 
> org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords
> -
>
> Key: KAFKA-16403
> URL: https://issues.apache.org/jira/browse/KAFKA-16403
> Project: Kafka
>  Issue Type: Bug
>Reporter: Igor Soarez
>Priority: Major
>
> {code:java}
> org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords()
>  failed, log available in 
> /home/jenkins/workspace/Kafka_kafka-pr_PR-14903/streams/examples/build/reports/testOutput/org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords().test.stdout
> Gradle Test Run :streams:examples:test > Gradle Test Executor 82 > 
> WordCountDemoTest > testCountListOfWords() FAILED
>     org.apache.kafka.streams.errors.ProcessorStateException: Error opening 
> store KSTREAM-AGGREGATE-STATE-STORE-03 at location 
> /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:329)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:69)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:254)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>         at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>         at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:151)
>         at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
>         at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:151)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
>         at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)
>         at 
> org.apache.kafka.streams.TopologyTestDriver.setupTask(TopologyTestDriver.java:530)
>         at 
> org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:373)
>         at 
> org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:300)
>         at 
> org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:276)
>         at 
> org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.setup(WordCountDemoTest.java:60)
>         Caused by:
>         org.rocksdb.RocksDBException: Corruption: IO error: No such file or 
> directory: While open a file for random read: 
> /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03/10.ldb:
>  No such file or directory in file 
> /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03/MANIFEST-05
>             at org.rocksdb.RocksDB.open(Native Method)
>             at org.rocksdb.RocksDB.open(RocksDB.java:307)
>             at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:323)
>             ... 17 more
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1020: Deprecate `window.size.ms` in `StreamsConfig`

2024-03-16 Thread Matthias J. Sax

Thanks for the summary. Sounds right to me. That is what I would propose.

As you pointed out, we of course still need to support the current 
confis, and we should log a warning when in use (even if the new one is 
in use IMHO) -- but that's more an implementation detail.


I agree that the new config should take preference in case both are 
specified. This should be pointed out in the KIP, as it's an important 
contract the user needs to understand.



-Matthias

On 3/14/24 6:18 PM, Sophie Blee-Goldman wrote:


Should we change it do `.serializer` and `.deserialize`?


That's a good point -- if we're going to split this up by defining the
config
in both the TimeWindowedSerializer and TimeWindowedDeserializer,
then it makes perfect sense to go a step further and actually define
only the relevant de/serializer class instead of the full serde

Just to put this all together, it sounds like the proposal is to:

1) Deprecate both these configs where they appear in StreamsConfig
(as per the original plan in the KIP, just reiterating it here)

2) Don't "define" either config in any specific client's Config class,
but just define a String variable with the config name in the relevant
de/serializer class, and maybe point people to it in the docs somewhere

3) We would add three new public String variables for three different
configs across two classes, specifically:

In TimeWindowedSerializer:
   - define a constant for "windowed.inner.serializer.class"
In TimeWindowedDeserializer:
   - define a constant for "windowed.inner.deserializer.class"
   - define a constant for "window.size.ms"

4) Lastly, we would update the windowed de/serializer implementations
to check for the new configs (ie "windowed.inner.de/serializer.class")
and use the provided de/serializer class, if one was given. If the new
configs are not present, they would fall back to the original/current
logic (ie that based on the old "windowed.inner.serde.class" config)

I think that's everything. Does this sound about right for where we want
to go with these configs?

On Thu, Mar 14, 2024 at 4:58 PM Matthias J. Sax  wrote:


By "don't add them" do you just mean we would not have any actual
variables defined anywhere for these configs (eg WINDOW_SIZE_MS)
and simply document -- somewhere -- that one can use the string
"window.size.ms" when configuring a command-line client with a
windowed serde?


Yes. That's the idea.




I assume you aren't proposing

to remove the ability to use and understand this config from the
implementations themselves, but correct me if that's wrong.


No, that would effectively break what we fixed with the original KIP :)




Are there any other configs in similar situations that we could look
to for precedent?


Not aware of any others, either.




If these are truly the first/only of their kind, I would vote to just

stick

them in the appropriate class. As for which class to put them in, I
think I'm convinced that "window.size.ms" should only go in the
TimeWindowedDeserializer rather than sticking them both in the
TimeWindowedSerde as I originally suggested. However, I would
even go a step further and not place the "inner.window.class.serde"
in the TimeWindowedSerde class either. To me, it actually makes
the most sense to define it in both the TimeWindowedSerializer
and the TimeWindowedDeserializer.


Not sure either. What you propose is fine with me. However, I am
wondering about the config names... Why is it `serde` for this case?
Should we change it do `.serializer` and `.deserialize`?



-Matthias


On 3/13/24 8:19 PM, Sophie Blee-Goldman wrote:

By "don't add them" do you just mean we would not have any actual
variables defined anywhere for these configs (eg WINDOW_SIZE_MS)
and simply document -- somewhere -- that one can use the string
"window.size.ms" when configuring a command-line client with a
windowed serde? Or something else? I assume you aren't proposing
to remove the ability to use and understand this config from the
implementations themselves, but correct me if that's wrong.

Are there any other configs in similar situations that we could look
to for precedent? I personally am not aware of any but by definition
I suppose these would be hard to discover unless you were actively
looking for them, so I'm wondering if there might be other "shadow
configs" elsewhere in the code base.

If these are truly the first/only of their kind, I would vote to just

stick

them in the appropriate class. As for which class to put them in, I
think I'm convinced that "window.size.ms" should only go in the
TimeWindowedDeserializer rather than sticking them both in the
TimeWindowedSerde as I originally suggested. However, I would
even go a step further and not place the "inner.window.class.serde"
in the TimeWindowedSerde class either. To me, it actually makes
the most

Re: [DISCUSS] KIP-1020: Deprecate `window.size.ms` in `StreamsConfig`

2024-03-14 Thread Matthias J. Sax

By "don't add them" do you just mean we would not have any actual
variables defined anywhere for these configs (eg WINDOW_SIZE_MS)
and simply document -- somewhere -- that one can use the string
"window.size.ms" when configuring a command-line client with a
windowed serde? 


Yes. That's the idea.




I assume you aren't proposing

to remove the ability to use and understand this config from the
implementations themselves, but correct me if that's wrong.


No, that would effectively break what we fixed with the original KIP :)




Are there any other configs in similar situations that we could look
to for precedent?


Not aware of any others, either.




If these are truly the first/only of their kind, I would vote to just stick
them in the appropriate class. As for which class to put them in, I
think I'm convinced that "window.size.ms" should only go in the
TimeWindowedDeserializer rather than sticking them both in the
TimeWindowedSerde as I originally suggested. However, I would
even go a step further and not place the "inner.window.class.serde"
in the TimeWindowedSerde class either. To me, it actually makes
the most sense to define it in both the TimeWindowedSerializer
and the TimeWindowedDeserializer.


Not sure either. What you propose is fine with me. However, I am 
wondering about the config names... Why is it `serde` for this case? 
Should we change it do `.serializer` and `.deserialize`?




-Matthias


On 3/13/24 8:19 PM, Sophie Blee-Goldman wrote:

By "don't add them" do you just mean we would not have any actual
variables defined anywhere for these configs (eg WINDOW_SIZE_MS)
and simply document -- somewhere -- that one can use the string
"window.size.ms" when configuring a command-line client with a
windowed serde? Or something else? I assume you aren't proposing
to remove the ability to use and understand this config from the
implementations themselves, but correct me if that's wrong.

Are there any other configs in similar situations that we could look
to for precedent? I personally am not aware of any but by definition
I suppose these would be hard to discover unless you were actively
looking for them, so I'm wondering if there might be other "shadow
configs" elsewhere in the code base.

If these are truly the first/only of their kind, I would vote to just stick
them in the appropriate class. As for which class to put them in, I
think I'm convinced that "window.size.ms" should only go in the
TimeWindowedDeserializer rather than sticking them both in the
TimeWindowedSerde as I originally suggested. However, I would
even go a step further and not place the "inner.window.class.serde"
in the TimeWindowedSerde class either. To me, it actually makes
the most sense to define it in both the TimeWindowedSerializer
and the TimeWindowedDeserializer.

The reason being that, as discussed above, the only use case for
these configs would be in the console consumer/producer which
only uses the Serializer or Deserializer, and would never actually
be used by/in Streams where we use the Serde version. And while
defining the  "inner.window.class.serde" in two places might seem
redundant, this would mean that all the configs needed to properly
configure the specific class being used by the particular kind of
consumer client -- that is, Deserializer for a console consumer and
Serializer for a console producer -- would be located in that exact
class. I assume this would make them much easier to discover
and be used than having to search for configs defined in classes
you don't even need for the console client, like the Serde form

Just my two cents -- happy to hear other opinions on this

On Mon, Mar 11, 2024 at 6:58 PM Matthias J. Sax  wrote:


Yes, it's used inside `TimeWindowedSerializer` and actually also inside
`TimeWindowDeserializer`.

However, it does IMHO not change that we should remove it from
`StreamsConfig` because both configs are not intended to be used in Java
code... If one writes Java code, they should use

new TimeWindowedSerializer(Serializer)
new TimeWindowDeserializer(Deserializer, Long)
new TimeWindowSerdes(Serde, Long)

and thus they don't need either config.

The configs are only needed for command line tool, that create the
(de)serializer via reflection using the default constructor.

Does this make sense?



The only open question is really, if and where to add them... Strictly
speaking, we don't need either config as public variable as nobody
should use them in Java code. To me, it just feels right/better do make
them public for documentation purpose that these configs exists?

`inner.window.class.serde` has "serde" in it's name, so we could add it
to `TimeWindowSerdes`? For `window.size.ms`, it's only used by the
deserialize to maybe add it there? Just some ideas. -- Or we sidestep
this question and just don't add them; also fine with me.


-Matthias

On 3/1

[jira] [Updated] (KAFKA-16377) Fix flaky HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores

2024-03-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16377:

Component/s: streams
 unit tests

> Fix flaky 
> HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores
> --
>
> Key: KAFKA-16377
> URL: https://issues.apache.org/jira/browse/KAFKA-16377
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Chia-Ping Tsai
>Priority: Major
>
> {quote}
> [2024-03-13T16:07:11.125Z] Gradle Test Run :streams:test > Gradle Test 
> Executor 95 > HighAvailabilityTaskAssignorIntegrationTest > 
> shouldScaleOutWithWarmupTasksAndPersistentStores(String, TestInfo) > 
> "shouldScaleOutWithWarmupTasksAndPersistentStores(String, 
> TestInfo).balance_subtopology" FAILED
> [2024-03-13T16:07:11.125Z] java.lang.AssertionError: the first assignment 
> after adding a node should be unstable while we warm up the state.
> [2024-03-13T16:07:11.125Z] at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.assertFalseNoRetry(HighAvailabilityTaskAssignorIntegrationTest.java:310)
> [2024-03-13T16:07:11.125Z] at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.lambda$shouldScaleOutWithWarmupTasks$7(HighAvailabilityTaskAssignorIntegrationTest.java:237)
> [2024-03-13T16:07:11.125Z] at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:395)
> [2024-03-13T16:07:11.125Z] at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:443)
> [2024-03-13T16:07:11.125Z] at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:392)
> [2024-03-13T16:07:11.125Z] at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:376)
> [2024-03-13T16:07:11.125Z] at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:366)
> [2024-03-13T16:07:11.125Z] at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasks(HighAvailabilityTaskAssignorIntegrationTest.java:232)
> [2024-03-13T16:07:11.125Z] at 
> org.apache.kafka.streams.integration.HighAvailabilityTaskAssignorIntegrationTest.shouldScaleOutWithWarmupTasksAndPersistentStores(HighAvailabilityTaskAssignorIntegrationTest.java:130)
> {quote}
> https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka-pr/branches/PR-15474/runs/12/nodes/9/steps/88/log/?start=0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Personal branches under apache/kafka

2024-03-13 Thread Matthias J. Sax

+1

Should be fine to just delete all of them (as long as nobody raised 
objections).


Not sure if we could enable some protection GitHub side that disallow to 
push into non-existing branches and thus avoids accidental branch creation?



-Matthias

On 3/13/24 11:39 AM, Josep Prat wrote:

Hi Michael,

I think it's a good idea. Only "official" branches should exist in the
upstream repo.
I guess the only exception would be if a massive feature would be done by
different individuals collaborating and they would need a "neutral" place
for the branch to be. But This didn't happen yet and I doubt it will in the
near future.

Best,

---
Josep Prat
Open Source Engineering Director, aivenjosep.p...@aiven.io   |
+491715557497 | aiven.io
Aiven Deutschland GmbH
Alexanderufer 3-7, 10117 Berlin
Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
Amtsgericht Charlottenburg, HRB 209739 B

On Wed, Mar 13, 2024, 19:27 José Armando García Sancio
 wrote:


On Wed, Mar 13, 2024 at 11:02 AM Mickael Maison
 wrote:

What do you think?


I agree. I wouldn't be surprised if these branches (not trunk or
release branches) were created by mistake by the committer.

Thanks,
--
-José





Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-03-13 Thread Matthias J. Sax
If the custom store is a key-value store, yes, we could do this. But the 
interface does not enforce a key-value store, it's just a most generic 
`StateStore` that we pass in, and thus it could be something totally 
unknown to us, and we cannot apply a cast...


The underlying idea is really about 100% flexibility in the PAPI layer.

That's also the reason why all stores need to provide a callback for the 
restore path. Kafka Streams runtime can only read the record from the 
changelog, but it cannot put it into the store, as the runtime only sees 
the `StateStore` interface -- thus, we invoke a store specific callback 
(`StateRestoreCallback` interface) that needs to actually put the data 
into the store for us. For our built-in store, we of course provide 
these callbacks, but the point is, that the runtime does not know 
anything about the nature of the store but is fully agnostic to it, to 
allow the plugin of any custom store with any custom interface (which 
just needs to implement `StateStore`).



Not sure if I understand what you mean by this transformation step?



-Matthias


On 3/12/24 3:04 AM, Lucas Brutschy wrote:

@Matthias:

Thanks, I didn't realize that we need processors for any custom store.
Are we sure we cannot build a generic processor to load data into a
custom key-value store? I'm not sure, but you know the code better
than me.

One other alternative is to allow the user to provide a state
transformer `Function,
ConsumerRecord>` to adapt the state before loading it,
defaulting to identity. This would provide the ability to do
efficient, non-deserializing transformations like  => 

On Thu, Mar 7, 2024 at 7:19 PM Matthias J. Sax  wrote:


@Bruno:

(1), I think you are spot for the ts-extractor: on the restore code
path, we only support record-ts, but there is no need for a custom-ts
because for regular changelog topics KS sets the ts, and thus, the
optimization this KIP proposes required that the global topic follow the
changelog format, ie, the ts must be in the record-ts.

However, for the regular processing path, I am not sure if we can omit
deserializers. The way the PAPI is wired up, seems to require that we
give proper types to _other_ Processor that read from the global state
store. For this reason, the store (which takes `Serdes` with proper
types) is wrapped with a `MeteredStore` (like all others) to do the
Serde work, and this MeteredStore is also exposed to the
global-Processor? Might be good for Walker to dig into this to find out
the details?

If would of course be nice if we could avoid the unnecessary
deserialization on topic read, and re-serialization on global-store put
for this case, but it seems not to be straightforward to do...


(2). Is this about the PAPI/Topology? For this case, we don't have any
config object across the board. We only do this in the DSL. Hence, I
would propose to just follow the existing pattern in this KIP to keep
the API consistent. For the DSL, it could make sense of course. -- Of
course, if we think the PAPI could be improved with config objects, we
could do this in a dedicate KIP.


@Lucas:

The PAPI is unfortunately (by design) much more open and less
restrictive. If a users has a custom state store, we need some
`Processor` code from them, because we cannot provide a built-in
processor for an unknown store. The overload which won't take a
processor would only work for the built-in key-value store, what I
assume would cover most use-cases, however, we should keep the door open
for other use cases. Otherwise, we disallow this optimization for custom
stores. PAPI is really about flexibility, and yes, with great power
comes great responsibility for the users :)

But this actually highlights a different aspect: the overload not
accepting a custom `Processor` but using a built-in processor, should
not accept a generic `StoreBuilder` but should restrict the type to
`StoreBuilder`?


-Matthias



On 3/6/24 1:14 PM, Lucas Brutschy wrote:

Hey Walker

Thanks for the KIP, and congrats on the KiBiKIP ;)

My main point is that I'd vote against introducing
`reprocessOnRestore`. The behavior for `reprocessOnRestore = false` is
just incorrect and should be removed or deprecated. If we think we
need to keep the old behavior around, renaming the methods, e.g., to
`addGlobalReadOnlyStore`, is a great opportunity to deprecate the old
behavior. But at a first glance, the old behavior just looks like a
bug to me and should just be removed.

So for this KIP, I'd keep two variants as you propose and drop the
boolean parameter, but the two variants will be
   1) a copy-restore variant without custom processing, as you propose.
   2) a process-restore variant with custom processing (parameters the
same as before). This should be combined with a clear warning in the
Javadoc of the performance downside of this approach.

Presentation:
1) I wonder if you could make another pass on the motivation section.
I was lacking some context on this problem, and I

Re: [DISCUSS] KIP-990: Capability to SUSPEND Tasks on DeserializationException

2024-03-13 Thread Matthias J. Sax
even commit everything up to -- but not including -- the "bad" record when
PAUSE is triggered. Again, if we rebalance and "lose the pause" then
we'll just attempt to process it again, fail, and end up back in PAUSE. This
is no different than how successful processing works, no? Who cares if a
rebalance happens to strike and causes it to be PAUSED again?

All in all, I feel like these concerns are all essentially "true", but to
me they
just seem like implementation or design decisions and none of them strike
them as posing an unsolvable problem for this feature. But maybe I'm
just lacking in imagination...

Thoughts?


On Fri, Mar 8, 2024 at 5:30 PM Matthias J. Sax  wrote:


Hey Nick,

I am sorry that I have to say that I am not a fan of this KIP. I see way
too many food-guns and complications that can be introduced.

I am also not sure if I understand the motivation. You say, CONTINUE and
FAIL is not good enough, but don't describe in detail why? If we
understand the actual problem better, it might also get clear how
task-pausing would help to address the problem.


The main problem I see, as already mentioned by Sophie, it's about time
synchronization. However, its not limited to joins, but affect all
time-based operations, ie, also all windowed aggregations. If one task
pauses but other keep running, we keep advancing stream-time downstream,
and thus when the task would resume later, there is a very high
probability that records are dropped as window got already closed.

For the runtime itself, we also cannot really do a cascading downstream
pause, because the runtime does not know anything about the semantics of
operators. We don't know if we execute a DSL operator or a PAPI
operator. (We could maybe track all downsteam tasks independent of
semantics, but in the end it might just imply we could also just pause
all task...)

For the "skip record case", it's also not possible to skip over an
offset from outside while the application is running. The offset in
question is cached inside the consumer and the consumer would not go
back to Kafka to re-read the offset (only when a partitions is
re-assigned to a new consumer, the consumer would fetch the offset once
to init itself). -- But even if the consumer would go back to read the
offset, as long as the partition is assigned to a member of the group,
it's not even possible to commit a new offset using some external tool.
Only member of the group are allowed to commit offset, and all tools
that allow to manipulate offsets require that the corresponding
application is stopped, and that the consumer group is empty (and the
tool will join the consumer group as only member and commit offsets).

Of course, we could pause all tasks, but that's kind similar to shut
down? I agree though, that `FAIL` is rather harsh, and it could be a
good thing to introduce a graceful `SHUTDOWN` option (similar to what we
have via the uncaught exception handler)?

If we pause all tasks we would of course need to do this not just for a
single instance, but for all... We do already have
`KafkaStreams#pause()` but it does not include a application wide pause,
but only an instance pause -- the assumption of this feature was, that
an external pause signal would be send to all instances at the same
time. Building it into KS was not done as potentially to complicated...

Other questions: if a task would be paused, would we commit the current
offset? What happens if we re-balance? Would we just lose the "pause"
state, and hit the same error again and just pause again?


Right now, I would rather propose to discard this KIP (or change the
scope drastically to add a "global pause" and/or "global shutdown"
option). Of course, if you can provide convincing answers, I am happy to
move forward with per-task pausing. But my gut feeling is, that even if
we would find technically sound solutions, it would be way too
complicated to use (and maybe also to implement inside KS) for too
little benefits.



-Matthias



On 10/26/23 5:57 AM, Nick Telford wrote:

1.
Woops! I've fixed that now. Thanks for catching that.

2.
I agree, I'll remove the LogAndPause handler so it's clear this is an
advanced feature. I'll also add some documentation to
DeserializationExceptionResponse#SUSPEND that explains the care users
should approach it with.

3a.
This is interesting. My main concern is that there may be situations

where

skipping a single bad record is not the necessary solution, but the Task
should still be resumed without restarting the application. For example,

if

there are several bad records in a row that should be skipped.

3b.
Additionally, a Task may have multiple input topics, so we'd need some

way

to indicate which record to skip.

These can probably be resolved by something like skipAndContinue(TaskId
task, String topic, int recordsToSkip) or even skipAndContinue(TaskId

task,

Map recordsToSkipByTopic)?

4.
Related to 2: I was

[jira] [Commented] (KAFKA-16359) kafka-clients-3.7.0.jar published to Maven Central is defective

2024-03-11 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825496#comment-17825496
 ] 

Matthias J. Sax commented on KAFKA-16359:
-

Just to clarify: we cannot re-publish a 3.7.0 artifact. – We can only fix this 
with 3.7.1 release. Seems we should push out 3.7.1 rather sooner than later.

> kafka-clients-3.7.0.jar published to Maven Central is defective
> ---
>
> Key: KAFKA-16359
> URL: https://issues.apache.org/jira/browse/KAFKA-16359
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.7.0
>Reporter: Jeremy Norris
>Assignee: Apoorv Mittal
>Priority: Critical
>
> The {{kafka-clients-3.7.0.jar}} that has been published to Maven Central is 
> defective: it's {{META-INF/MANIFEST.MF}} bogusly include a {{Class-Path}} 
> element:
> {code}
> Manifest-Version: 1.0
> Class-Path: zstd-jni-1.5.5-6.jar lz4-java-1.8.0.jar snappy-java-1.1.10
> .5.jar slf4j-api-1.7.36.jar
> {code}
> This bogus {{Class-Path}} element leads to compiler warnings for projects 
> that utilize it as a dependency:
> {code}
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/zstd-jni-1.5.5-6.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/lz4-java-1.8.0.jar": 
> no such file or directory
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/snappy-java-1.1.10.5.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/slf4j-api-1.7.36.jar":
>  no such file or directory
> {code}
> Either the {{kafka-clients-3.7.0.jar}} needs to be respun and published 
> without the bogus {{Class-Path}} element in it's {{META-INF/MANIFEST.MF}} or 
> a new release should be published that corrects this defect.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16357) Kafka Client JAR manifest breaks javac linting

2024-03-11 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16357.
-
Resolution: Duplicate

> Kafka Client JAR manifest breaks javac linting
> --
>
> Key: KAFKA-16357
> URL: https://issues.apache.org/jira/browse/KAFKA-16357
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.7.0
> Environment: Linux, JDK 21 (Docker image eclipse-temurin:21-jdk-jammy)
>Reporter: Jacek Wojciechowski
>Priority: Critical
>
> I upgraded kafka-clients from 3.6.1 to 3.7.0 and discovered that my project 
> is not building anymore.
> The reason is that kafka-clients-3.7.0.jar contains the following entry in 
> its JAR manifest file:
> Class-Path: zstd-jni-1.5.5-6.jar lz4-java-1.8.0.jar snappy-java-1.1.10
>  .5.jar slf4j-api-1.7.36.jar
> I'm using Maven repo to keep my dependencies and those files are not in the 
> same directory as kafka-clients-3.7.0.jar, so the paths in the manifest's 
> Class-Path are not correct. It fails my build because we build with javac 
> with all linting options on, in particular -Xlint:-path. It produces the 
> following warnings coming from javac:
> [WARNING] COMPILATION WARNING : 
> [INFO] -
> [WARNING] [path] bad path element 
> "/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/zstd-jni-1.5.5-6.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> "/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/lz4-java-1.8.0.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> "/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/snappy-java-1.1.10.5.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> "/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/slf4j-api-1.7.36.jar":
>  no such file or directory
> Since we have also {{-Werror}} option enabled, it turns warnings into errors 
> and fails our build.
> I think our setup is quite typical: using Maven repo to store dependencies, 
> having linting on and -Werror. Unfortunatelly, it doesn't work with the 
> lastest kafka-clients because of the entries in the manifest's Class-Path. 
> And I think it might affect quite a lot of projects set up in a similar way.
> I don't know what was the reason to add Class-Path entry in the JAR manifest 
> file - but perhaps this effect was not considered.
> It would be great if you removed the Class-Path entry from the JAR manifest 
> file.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16357) Kafka Client JAR manifest breaks javac linting

2024-03-11 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16357.
-
Resolution: Duplicate

> Kafka Client JAR manifest breaks javac linting
> --
>
> Key: KAFKA-16357
> URL: https://issues.apache.org/jira/browse/KAFKA-16357
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.7.0
> Environment: Linux, JDK 21 (Docker image eclipse-temurin:21-jdk-jammy)
>Reporter: Jacek Wojciechowski
>Priority: Critical
>
> I upgraded kafka-clients from 3.6.1 to 3.7.0 and discovered that my project 
> is not building anymore.
> The reason is that kafka-clients-3.7.0.jar contains the following entry in 
> its JAR manifest file:
> Class-Path: zstd-jni-1.5.5-6.jar lz4-java-1.8.0.jar snappy-java-1.1.10
>  .5.jar slf4j-api-1.7.36.jar
> I'm using Maven repo to keep my dependencies and those files are not in the 
> same directory as kafka-clients-3.7.0.jar, so the paths in the manifest's 
> Class-Path are not correct. It fails my build because we build with javac 
> with all linting options on, in particular -Xlint:-path. It produces the 
> following warnings coming from javac:
> [WARNING] COMPILATION WARNING : 
> [INFO] -
> [WARNING] [path] bad path element 
> "/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/zstd-jni-1.5.5-6.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> "/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/lz4-java-1.8.0.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> "/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/snappy-java-1.1.10.5.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> "/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/slf4j-api-1.7.36.jar":
>  no such file or directory
> Since we have also {{-Werror}} option enabled, it turns warnings into errors 
> and fails our build.
> I think our setup is quite typical: using Maven repo to store dependencies, 
> having linting on and -Werror. Unfortunatelly, it doesn't work with the 
> lastest kafka-clients because of the entries in the manifest's Class-Path. 
> And I think it might affect quite a lot of projects set up in a similar way.
> I don't know what was the reason to add Class-Path entry in the JAR manifest 
> file - but perhaps this effect was not considered.
> It would be great if you removed the Class-Path entry from the JAR manifest 
> file.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16357) Kafka Client JAR manifest breaks javac linting

2024-03-11 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16357:

Priority: Critical  (was: Major)

> Kafka Client JAR manifest breaks javac linting
> --
>
> Key: KAFKA-16357
> URL: https://issues.apache.org/jira/browse/KAFKA-16357
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.7.0
> Environment: Linux, JDK 21 (Docker image eclipse-temurin:21-jdk-jammy)
>Reporter: Jacek Wojciechowski
>Priority: Critical
>
> I upgraded kafka-clients from 3.6.1 to 3.7.0 and discovered that my project 
> is not building anymore.
> The reason is that kafka-clients-3.7.0.jar contains the following entry in 
> its JAR manifest file:
> Class-Path: zstd-jni-1.5.5-6.jar lz4-java-1.8.0.jar snappy-java-1.1.10
>  .5.jar slf4j-api-1.7.36.jar
> I'm using Maven repo to keep my dependencies and those files are not in the 
> same directory as kafka-clients-3.7.0.jar, so the paths in the manifest's 
> Class-Path are not correct. It fails my build because we build with javac 
> with all linting options on, in particular -Xlint:-path. It produces the 
> following warnings coming from javac:
> [WARNING] COMPILATION WARNING : 
> [INFO] -
> [WARNING] [path] bad path element 
> "/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/zstd-jni-1.5.5-6.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> "/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/lz4-java-1.8.0.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> "/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/snappy-java-1.1.10.5.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> "/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/slf4j-api-1.7.36.jar":
>  no such file or directory
> Since we have also {{-Werror}} option enabled, it turns warnings into errors 
> and fails our build.
> I think our setup is quite typical: using Maven repo to store dependencies, 
> having linting on and -Werror. Unfortunatelly, it doesn't work with the 
> lastest kafka-clients because of the entries in the manifest's Class-Path. 
> And I think it might affect quite a lot of projects set up in a similar way.
> I don't know what was the reason to add Class-Path entry in the JAR manifest 
> file - but perhaps this effect was not considered.
> It would be great if you removed the Class-Path entry from the JAR manifest 
> file.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Kafka Streams 3.5.1 based app seems to get stalled

2024-03-11 Thread Matthias J. Sax

Without detailed logs (maybe even DEBUG) hard to say.

But from what you describe, it could be a metadata issue? Why are you 
setting



METADATA_MAX_AGE_CONFIG (consumer and producer): 5 hours in millis (to make 
rebalances rare)


Refreshing metadata has nothing to do with rebalances, and a metadata 
refresh does not trigger a rebalance.




-Matthias


On 3/10/24 5:56 PM, Venkatesh Nagarajan wrote:

Hi all,

A Kafka Streams application sometimes stops consuming events during load 
testing. Please find below the details:

Details of the app:


   *   Kafka Streams Version: 3.5.1
   *   Kafka: AWS MSK v3.6.0
   *   Consumes events from 6 topics
   *   Calls APIs to enrich events
   *   Sometimes joins two streams
   *   Produces enriched events in output topics

Runs on AWS ECS:

   *   Each task has 10 streaming threads
   *   Autoscaling based on offset lags and a maximum of 6 ECS tasks
   *   Input topics have 60 partitions each to match 6 tasks * 10 threads
   *   Fairly good spread of events across all topic partitions using 
partitioning keys

Settings and configuration:


   *   At least once semantics
   *   MAX_POLL_RECORDS_CONFIG: 10
   *   APPLICATION_ID_CONFIG

// Make rebalances rare and prevent stop-the-world rebalances

   *   Static membership (using GROUP_INSTANCE_ID_CONFIG)
   *   METADATA_MAX_AGE_CONFIG (consumer and producer): 5 hours in millis (to 
make rebalances rare)
   *   MAX_POLL_INTERVAL_MS_CONFIG: 20 minutes in millis
   *   SESSION_TIMEOUT_MS_CONFIG: 2 minutes in millis

State store related settings:

   *   TOPOLOGY_OPTIMIZATION_CONFIG: OPTIMIZE
   *   STATESTORE_CACHE_MAX_BYTES_CONFIG: 300 * 1024 * 1024L
   *   NUM_STANDBY_REPLICAS_CONFIG: 1


Symptoms:
The symptoms mentioned below occur during load tests:

Scenario# 1:
Steady input event stream

Observations:

   *   Gradually increasing offset lags which shouldn't happen normally as the 
streaming app is quite fast
   *   Events get processed

Scenario# 2:
No input events after the load test stops producing events

Observations:

   *   Offset lag stuck at ~5k
   *   Stable consumer group
   *   No events processed
   *   No errors or messages in the logs


Scenario# 3:
Restart the app when it stops processing events although offset lags are not 
zero

Observations:

   *   Offset lags start reducing and events start getting processed

Scenario# 4:
Transient errors occur while processing events


   *   A custom exception handler that implements 
StreamsUncaughtExceptionHandler returns 
StreamThreadExceptionResponse.REPLACE_THREAD in the handle method
   *   If transient errors keep occurring occasionally and threads get 
replaced, the problem of the app stalling disappears.
   *   But if transient errors don't occur, the app tends to stall and I need 
to manually restart it


Summary:

   *   It appears that some streaming threads stall after processing for a 
while.
   *   It is difficult to change log level for Kafka Streams from ERROR to INFO 
as it starts producing a lot of log messages especially during load tests.
   *   I haven't yet managed to push Kafka streams metrics into AWS OTEL 
collector to get more insights.

Can you please let me know if any Kafka Streams config settings need changing? 
Should I reduce the values of any of these settings to help trigger rebalancing 
early and hence assign partitions to members that are active:


   *   METADATA_MAX_AGE_CONFIG: 5 hours in millis (to make rebalances rare)
   *   MAX_POLL_INTERVAL_MS_CONFIG: 20 minutes in millis
   *   SESSION_TIMEOUT_MS_CONFIG: 2 minutes in millis

Should I get rid of static membership – this may increase rebalancing but may 
be okay if it can prevent stalled threads from appearing as active members

Should I try upgrading Kafka Streams to v3.6.0 or v3.7.0? Hoping that v3.7.0 
will be compatible with AWS MSK v3.6.0.


Thank you very much.

Kind regards,
Venkatesh

UTS CRICOS Provider Code: 00099F DISCLAIMER: This email message and any 
accompanying attachments may contain confidential information. If you are not 
the intended recipient, do not read, use, disseminate, distribute or copy this 
message or attachments. If you have received this message in error, please 
notify the sender immediately and delete this message. Any views expressed in 
this message are those of the individual sender, except where the sender 
expressly, and with authority, states them to be the views of the University of 
Technology Sydney. Before opening any attachments, please check them for 
viruses and defects. Think. Green. Do. Please consider the environment before 
printing this email.



Re: [DISCUSS] KIP-1027 Add MockFixedKeyProcessorContext

2024-03-11 Thread Matthias J. Sax
Thanks for the KIP Shashwat. Closing this testing gap is great! It did 
come up a few time already...


One question: why do you propose to `extend MockProcessorContext`?

Given how the actual runtime context classes are setup, it seems that 
the regular context and fixed-key-context are distinct, and thus I 
believe both mock-context classes should be distinct, too?


What I mean is that FixedKeyProcessorContext does not extend 
ProcessorContext. Both classes have a common parent ProcessINGContext 
(note the very similar but different names), but they are "siblings" 
only, so why make the mock processor a parent-child relationship?


It seems better to do

public class MockFixedKeyProcessorContext
  implements FixedKeyProcessorContext,
 RecordCollector.Supplier


Of course, if there is code we can share between both mock-context we 
should so this, but it should not leak into the public API?



-Matthias



On 3/11/24 5:21 PM, Shashwat Pandey wrote:

Hi everyone,

I would like to start the discussion on
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1027%3A+Add+MockFixedKeyProcessorContext

This adds MockFixedKeyProcessorContext to the Kafka Streams Test Utils
library.

Regards,
Shashwat Pandey



Re: [DISCUSS] KIP-1020: Deprecate `window.size.ms` in `StreamsConfig`

2024-03-11 Thread Matthias J. Sax
Yes, it's used inside `TimeWindowedSerializer` and actually also inside 
`TimeWindowDeserializer`.


However, it does IMHO not change that we should remove it from 
`StreamsConfig` because both configs are not intended to be used in Java 
code... If one writes Java code, they should use


  new TimeWindowedSerializer(Serializer)
  new TimeWindowDeserializer(Deserializer, Long)
  new TimeWindowSerdes(Serde, Long)

and thus they don't need either config.

The configs are only needed for command line tool, that create the 
(de)serializer via reflection using the default constructor.


Does this make sense?



The only open question is really, if and where to add them... Strictly 
speaking, we don't need either config as public variable as nobody 
should use them in Java code. To me, it just feels right/better do make 
them public for documentation purpose that these configs exists?


`inner.window.class.serde` has "serde" in it's name, so we could add it 
to `TimeWindowSerdes`? For `window.size.ms`, it's only used by the 
deserialize to maybe add it there? Just some ideas. -- Or we sidestep 
this question and just don't add them; also fine with me.



-Matthias

On 3/11/24 10:53 AM, Lucia Cerchie wrote:

PS-- I was re-reading the PR that originated this discussion and realized
that `window.inner.serde.class` is used here
<https://github.com/a0x8o/kafka/blob/master/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java#L44>
in KStreams. This goes against removing it, yes?

On Mon, Mar 11, 2024 at 10:40 AM Lucia Cerchie 
wrote:


Sophie, I'll add a paragraph about removing `windowed.inner.serde.class`
to the KIP. I'll also add putting it in the `TimeWindowedSerde` class with
some add'tl guidance on the docs addition.

Also, I double-checked setting window.size.ms on the client and it
doesn't throw an error at all, in response to Matthias's question. Changing
the KIP in response to that.

On Sun, Mar 10, 2024 at 6:04 PM Sophie Blee-Goldman 
wrote:


Thanks for responding Matthias -- you got there first, but I was going to
say exactly the same thing as in your most reply. In other words, I see
the
`windowed.inner.serde.class` as being in the same boat as the `
window.size.ms` config, so whatever we do with one we should do for the
other.

I do agree with removing these from StreamsConfig, but defining them in
ConsumerConfig feels weird as well. There's really no great answer here.

My only concern about adding it to the corresponding
serde/serializer/deserializer class is that it might be difficult for
people to find them. I generally assume that people tend not to look at
the
serde/serializer/deserializer classes/implementations. But maybe in this
case, someone who actually needed these configs would be likely to be
motivated enough to find them by looking at the class? And with sufficient
documentation, it's likely not a problem. So, I'm +1 on putting it into
the
TimeWindowedSerde class

(I would personally stick them into the serde class, rather than the
serializer and/or deserializer, but I could be convinced either way)

On Fri, Mar 1, 2024 at 3:00 PM Matthias J. Sax  wrote:


One more thought after I did some more digging on the related PR.

Should we do the same thing for `windowed.inner.serde.class`?


Both config belong to windowed serdes (which KS provide) but the KS code
itself does never use them (and in fact, disallows to use them and would
throw an error is used). Both are intended for plain consumer use cases
for which the window serdes are used.

The question to me is, should we add them back somewhere else? It does
not really belong into `ConsumerConfig` either, but maybe we could add
them to the corresponding serde or (de)serialize classes?


-Matthias


On 2/21/24 2:41 PM, Matthias J. Sax wrote:

Thanks for the KIP. Sounds like a nice cleanup.


window.size.ms  is not a true KafkaStreams config, and results in an
error when set from a KStreams application


What error?


Given that the configs is used by `TimeWindowedDeserializer` I am
wondering if we should additionally add

public class TimeWindowedDeserializer {

  public static final String WINDOW_SIZE_MS_CONFIG = "

window.size.ms

";

}



-Matthias


On 2/15/24 6:32 AM, Lucia Cerchie wrote:

Hey everyone,

I'd like to discuss KIP-1020
<



https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804

,

which would move to deprecate `window.size.ms` in `StreamsConfig`

since `

window.size.ms` is a client config.

Thanks in advance!

Lucia Cerchie








--

[image: Confluent] <https://www.confluent.io>
Lucia Cerchie
Developer Advocate
Follow us: [image: Blog]
<https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog>[image:
Twitter] <https://twitter.com/ConfluentInc>[image: Slack]
<https://slackpass.io/confluentcommunity>[image: YouTube]
<https://youtube.com/

[jira] [Resolved] (KAFKA-16360) Release plan of 3.x kafka releases.

2024-03-11 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16360.
-
Resolution: Invalid

Please don't use Jira to ask questions. Jira tickets are for bug reports and 
features only.

Question should be asked on the user and/or dev mailing lists: 
https://kafka.apache.org/contact

> Release plan of 3.x kafka releases.
> ---
>
> Key: KAFKA-16360
> URL: https://issues.apache.org/jira/browse/KAFKA-16360
> Project: Kafka
>  Issue Type: Improvement
>Reporter: kaushik srinivas
>Priority: Major
>
> KIP 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-833%3A+Mark+KRaft+as+Production+Ready#KIP833:MarkKRaftasProductionReady-ReleaseTimeline]
>  mentions ,
> h2. Kafka 3.7
>  * January 2024
>  * Final release with ZK mode
> But we see in Jira, some tickets are marked for 3.8 release. Does apache 
> continue to make 3.x releases having zookeeper and kraft supported 
> independent of pure kraft 4.x releases ?
> If yes, how many more releases can be expected on 3.x release line ?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16360) Release plan of 3.x kafka releases.

2024-03-11 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16360.
-
Resolution: Invalid

Please don't use Jira to ask questions. Jira tickets are for bug reports and 
features only.

Question should be asked on the user and/or dev mailing lists: 
https://kafka.apache.org/contact

> Release plan of 3.x kafka releases.
> ---
>
> Key: KAFKA-16360
> URL: https://issues.apache.org/jira/browse/KAFKA-16360
> Project: Kafka
>  Issue Type: Improvement
>Reporter: kaushik srinivas
>Priority: Major
>
> KIP 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-833%3A+Mark+KRaft+as+Production+Ready#KIP833:MarkKRaftasProductionReady-ReleaseTimeline]
>  mentions ,
> h2. Kafka 3.7
>  * January 2024
>  * Final release with ZK mode
> But we see in Jira, some tickets are marked for 3.8 release. Does apache 
> continue to make 3.x releases having zookeeper and kraft supported 
> independent of pure kraft 4.x releases ?
> If yes, how many more releases can be expected on 3.x release line ?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16366) Refactor KTable source optimization

2024-03-11 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16366:
---

 Summary: Refactor KTable source optimization
 Key: KAFKA-16366
 URL: https://issues.apache.org/jira/browse/KAFKA-16366
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


Kafka Streams DSL offers an optimization to re-use an input topic as table 
changelog, in favor of creating a dedicated changelog topic.

So far, the Processor API did not support any such feature, and thus when the 
DSL compiles down into a Topology, we needed to access topology internal stuff 
to allow for this optimization.

With KIP-813 (merged for AK 3.8), we added `Topology#addReadOnlyStateStore` as 
public API, and thus we should refactor the DSL compilation code, to use this 
public API to build the `Topology` instead of internal APIs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16366) Refactor KTable source optimization

2024-03-11 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16366:
---

 Summary: Refactor KTable source optimization
 Key: KAFKA-16366
 URL: https://issues.apache.org/jira/browse/KAFKA-16366
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


Kafka Streams DSL offers an optimization to re-use an input topic as table 
changelog, in favor of creating a dedicated changelog topic.

So far, the Processor API did not support any such feature, and thus when the 
DSL compiles down into a Topology, we needed to access topology internal stuff 
to allow for this optimization.

With KIP-813 (merged for AK 3.8), we added `Topology#addReadOnlyStateStore` as 
public API, and thus we should refactor the DSL compilation code, to use this 
public API to build the `Topology` instead of internal APIs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-955: Add stream-table join on foreign key

2024-03-08 Thread Matthias J. Sax
ds
per sec at
  peak hours. For the reason above even if "fat" messages are
required the
  performance will be better with aggregateJoin based on
KIP-955 design when
  messages are aggregated per stream event and not per table update.

Disclaimer: I did not test both solutions side by side for performance. For
now I am just using design observations for performance/scalability
projections.

Any additions to pros/cons? Any other solution alternatives?

Regards,

Igor



On Thu, Aug 10, 2023 at 7:58 PM Matthias J. Sax  wrote:


Thanks. Seems we are on the same page now what the requirement are?
That's good progress!



This solution was considered when in KIP-213 for the existing
table-table FK join. There is a discussion on disadvantages of using this
approach in the article related to KIP-213 and I think the same
disadvantages will apply to this KIP.


I am not sure. The difference (to me) is, that for KIP-213, if we
aggregate the right input table, we would need to split the "fat result"
record to flatten the individual result record we want to have. But for
your case it seems, you want to have a single "fat" result record in the
end, so the aggregation is not a workaround but a requirement anyway? If
we go with KIP-955, your use case requires an aggregation (right?)
because for each input stream record, you want one result record (not
multiple?).



I see FK join as the common operation in data manipulation so it would
be nice to have a shortcut for it and not to try to design it from

existing

functionality all the time.


Well, yes and no? In the end, a stream-table join is a _enrichment_
join, ie, for each left input stream event, we emit one (or none, if it
does not match for inner join) result record. A stream-FK-table-join
would imply that we emit multiple result records, what is (at least to
me) a somewhat weird behavior, because it's kinda "flatMap" as join
side-effect. (Or we add in an aggregation, and again, have a single
composed operator with "weird" semantics.) It does not appeal
semantically clean to me to do it this way.



Consider the real use case I discussed at the
beginning when a business entity has 25 children


Not sure if I fully understand? Are you saying a single stream record
would join with 25 table rows? And that's why you think you cannot
aggregate those 25 rows because such a "fat row" would be too large? If
this is the case, (and I am correct about my understanding that your use
case needs an aggregation step anyway), than this issue does not go way,
because you build a single "fat" result record containing all these 25
rows as final result anyway.



This solution similarly to mine is "mudding the water" by providing a
hybrid outcome join + aggregate. At list with my proposal we could
potentially control it with the flag, or maybe create some special
aggregate that could be chained after (don't know how to do it yet :-))


Why would it mud the waters if you combine multiple operators? If you
apply an aggregation and a join operator, both operators provide
well-know and clean semantics? To me, "muddying the waters" means to
have a single operator that does "too much" at once (and adding a config
makes it even worse IMHO, as it now actually merged even more things
into a single operator).

  From my POV, a good DSL is a tool set of operators each doing one (well
defined) thing, and you can combine them to do complex stuff. Building
"fat uber" operators is the opposite of it IMHO.

I am still on the fence if KIP-955 propose a well-defined operator or
not, because it seems it's either a flatMap+join or join+aggregation --
for both cases, I am wondering why we would want to combine them into a
single operator?

To me, there are two good argument for adding a new operator:

   (1) It's not possible to combine existing operators to semantically
express the same at all.

   (2) Adding the operator provides significant performance improvements
compared to combining a set of existing operators.

Do we think one of both cases apply?


Lets call a stream-fk-joins that emits multiple result records the
"flatMapJoin" and the stream-fk-join that emit a single "fat" result
record the "aggregationJoin".

If my understanding is correct, and you need an aggregation anyway,
adding a flatMapJoin that need an additional aggregation downstream does
not work anyway, because the aggregation cannot know when to start a new
aggregation... Assume there is two input event both with orderId1; the
first joins to two table rows, emitting two flatMapJoin result records,
and the second joins to three table rows, emitting three flatMapJoin
record. How would the downstream aggregation know, to put records 1+2
and record 3+4+5 together to get back to the original two input records?

If flatMapJoin does not work, and we go with aggregationJoin, I w

Re: [DISCUSS] KIP-990: Capability to SUSPEND Tasks on DeserializationException

2024-03-08 Thread Matthias J. Sax

Hey Nick,

I am sorry that I have to say that I am not a fan of this KIP. I see way 
too many food-guns and complications that can be introduced.


I am also not sure if I understand the motivation. You say, CONTINUE and 
FAIL is not good enough, but don't describe in detail why? If we 
understand the actual problem better, it might also get clear how 
task-pausing would help to address the problem.



The main problem I see, as already mentioned by Sophie, it's about time 
synchronization. However, its not limited to joins, but affect all 
time-based operations, ie, also all windowed aggregations. If one task 
pauses but other keep running, we keep advancing stream-time downstream, 
and thus when the task would resume later, there is a very high 
probability that records are dropped as window got already closed.


For the runtime itself, we also cannot really do a cascading downstream 
pause, because the runtime does not know anything about the semantics of 
operators. We don't know if we execute a DSL operator or a PAPI 
operator. (We could maybe track all downsteam tasks independent of 
semantics, but in the end it might just imply we could also just pause 
all task...)


For the "skip record case", it's also not possible to skip over an 
offset from outside while the application is running. The offset in 
question is cached inside the consumer and the consumer would not go 
back to Kafka to re-read the offset (only when a partitions is 
re-assigned to a new consumer, the consumer would fetch the offset once 
to init itself). -- But even if the consumer would go back to read the 
offset, as long as the partition is assigned to a member of the group, 
it's not even possible to commit a new offset using some external tool. 
Only member of the group are allowed to commit offset, and all tools 
that allow to manipulate offsets require that the corresponding 
application is stopped, and that the consumer group is empty (and the 
tool will join the consumer group as only member and commit offsets).


Of course, we could pause all tasks, but that's kind similar to shut 
down? I agree though, that `FAIL` is rather harsh, and it could be a 
good thing to introduce a graceful `SHUTDOWN` option (similar to what we 
have via the uncaught exception handler)?


If we pause all tasks we would of course need to do this not just for a 
single instance, but for all... We do already have 
`KafkaStreams#pause()` but it does not include a application wide pause, 
but only an instance pause -- the assumption of this feature was, that 
an external pause signal would be send to all instances at the same 
time. Building it into KS was not done as potentially to complicated...


Other questions: if a task would be paused, would we commit the current 
offset? What happens if we re-balance? Would we just lose the "pause" 
state, and hit the same error again and just pause again?



Right now, I would rather propose to discard this KIP (or change the 
scope drastically to add a "global pause" and/or "global shutdown" 
option). Of course, if you can provide convincing answers, I am happy to 
move forward with per-task pausing. But my gut feeling is, that even if 
we would find technically sound solutions, it would be way too 
complicated to use (and maybe also to implement inside KS) for too 
little benefits.




-Matthias



On 10/26/23 5:57 AM, Nick Telford wrote:

1.
Woops! I've fixed that now. Thanks for catching that.

2.
I agree, I'll remove the LogAndPause handler so it's clear this is an
advanced feature. I'll also add some documentation to
DeserializationExceptionResponse#SUSPEND that explains the care users
should approach it with.

3a.
This is interesting. My main concern is that there may be situations where
skipping a single bad record is not the necessary solution, but the Task
should still be resumed without restarting the application. For example, if
there are several bad records in a row that should be skipped.

3b.
Additionally, a Task may have multiple input topics, so we'd need some way
to indicate which record to skip.

These can probably be resolved by something like skipAndContinue(TaskId
task, String topic, int recordsToSkip) or even skipAndContinue(TaskId task,
Map recordsToSkipByTopic)?

4.
Related to 2: I was thinking that users implementing their own handler may
want to be able to determine which Processors (i.e. which Subtopology/task
group) are being affected, so they can programmatically make a decision on
whether it's safe to PAUSE. ProcessorContext, which is already a parameter
to DeserializationExceptionHandler provides the TaskId of the failed Task,
but doesn't provide metadata on the Processors that Task executes.

Since TaskIds are non-deterministic (they can change when you modify your
topology, with no influence over how they're assigned), a user cannot use
TaskId alone to determine which Processors would be affected.

What do you think would be the best way to provide this information to

[jira] [Updated] (KAFKA-14748) Relax non-null FK left-join requirement

2024-03-07 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14748:

Fix Version/s: 3.7.0

> Relax non-null FK left-join requirement
> ---
>
> Key: KAFKA-14748
> URL: https://issues.apache.org/jira/browse/KAFKA-14748
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>            Reporter: Matthias J. Sax
>Assignee: Florin Akermann
>Priority: Major
>  Labels: kip
> Fix For: 3.7.0
>
>
> Kafka Streams enforces a strict non-null-key policy in the DSL across all 
> key-dependent operations (like aggregations and joins).
> This also applies to FK-joins, in particular to the ForeignKeyExtractor. If 
> it returns `null`, it's treated as invalid. For left-joins, it might make 
> sense to still accept a `null`, and add the left-hand record with an empty 
> right-hand-side to the result.
> KIP-962: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-12317) Relax non-null key requirement for left/outer KStream joins

2024-03-07 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12317:

Fix Version/s: 3.7.0

> Relax non-null key requirement for left/outer KStream joins
> ---
>
> Key: KAFKA-12317
> URL: https://issues.apache.org/jira/browse/KAFKA-12317
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>            Reporter: Matthias J. Sax
>Assignee: Florin Akermann
>Priority: Major
>  Labels: kip
> Fix For: 3.7.0
>
>
> Currently, for a stream-streams and stream-table/globalTable join 
> KafkaStreams drops all stream records with a `null`{-}key (`null`-join-key 
> for stream-globalTable), because for a `null`{-}(join)key the join is 
> undefined: ie, we don't have an attribute the do the table lookup (we 
> consider the stream-record as malformed). Note, that we define the semantics 
> of _left/outer_ join as: keep the stream record if no matching join record 
> was found.
> We could relax the definition of _left_ stream-table/globalTable and 
> _left/outer_ stream-stream join though, and not drop `null`-(join)key stream 
> records, and call the ValueJoiner with a `null` "other-side" value instead: 
> if the stream record key (or join-key) is `null`, we could treat is as 
> "failed lookup" instead of treating the stream record as corrupted.
> If we make this change, users that want to keep the current behavior, can add 
> a `filter()` before the join to drop `null`-(join)key records from the stream 
> explicitly.
> Note that this change also requires to change the behavior if we insert a 
> repartition topic before the join: currently, we drop `null`-key record 
> before writing into the repartition topic (as we know they would be dropped 
> later anyway). We need to relax this behavior for a left stream-table and 
> left/outer stream-stream join. User need to be aware (ie, we might need to 
> put this into the docs and JavaDocs), that records with `null`-key would be 
> partitioned randomly.
> KIP-962: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15576) Add 3.6.0 to broker/client and streams upgrade/compatibility tests

2024-03-07 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15576.
-
Resolution: Fixed

> Add 3.6.0 to broker/client and streams upgrade/compatibility tests
> --
>
> Key: KAFKA-15576
> URL: https://issues.apache.org/jira/browse/KAFKA-15576
> Project: Kafka
>  Issue Type: Task
>Reporter: Satish Duggana
>Priority: Major
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15576) Add 3.6.0 to broker/client and streams upgrade/compatibility tests

2024-03-07 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15576.
-
Resolution: Fixed

> Add 3.6.0 to broker/client and streams upgrade/compatibility tests
> --
>
> Key: KAFKA-15576
> URL: https://issues.apache.org/jira/browse/KAFKA-15576
> Project: Kafka
>  Issue Type: Task
>Reporter: Satish Duggana
>Priority: Major
> Fix For: 3.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16025) Streams StateDirectory has orphaned locks after rebalancing, blocking future rebalancing

2024-03-07 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16025:

Fix Version/s: 3.7.1

> Streams StateDirectory has orphaned locks after rebalancing, blocking future 
> rebalancing
> 
>
> Key: KAFKA-16025
> URL: https://issues.apache.org/jira/browse/KAFKA-16025
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
> Environment: Linux
>Reporter: Sabit
>Assignee: Sabit
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>
> Hello,
>  
> We are encountering an issue where during rebalancing, we see streams threads 
> on one client get stuck in rebalancing. Upon enabling debug logs, we saw that 
> some tasks were having issues initializing due to failure to grab a lock in 
> the StateDirectory:
>  
> {{2023-12-14 22:51:57.352000Z stream-thread 
> [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: 
> stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] 
> Failed to lock the state directory for task 0_51; will retry}}
>  
> We were able to reproduce this behavior reliably on 3.4.0. This is the 
> sequence that triggers the bug.
> Assume in a streams consumer group, there are 5 instances (A, B, C, D, E), 
> each with 5 threads (1-5), and the consumer is using stateful tasks which 
> have state stores on disk. There are 10 active tasks and 10 standby tasks.
>  # Instance A is deactivated
>  # As an example, lets say task 0_1, previously on instance B, moves to 
> instance C
>  # Task 0_1 leaves behind it's state directory on Instance B's disk, 
> currently unused, and no lock for it exists in Instance B's StateDirectory 
> in-memory lock tracker
>  # Instance A is re-activated
>  # Streams thread 1 on Instance B is asked to re-join the consumer group due 
> to a new member being added
>  # As part of re-joining, thread 1 lists non-empty state directories in order 
> to report the offset's it has in it's state stores as part of it's metadata. 
> Thread 1 sees that the directory for 0_1 is not empty.
>  # The cleanup thread on instance B runs. The cleanup thread locks state 
> store 0_1, sees the directory for 0_1 was last modified more than 
> `state.cleanup.delay.ms` ago, deletes it, and unlocks it successfully
>  # Thread 1 takes a lock on directory 0_1 due to it being found not-empty 
> before, unaware that the cleanup has run between the time of the check and 
> the lock. It tracks this lock in it's own in-memory store, in addition to 
> StateDirectory's in-memory lock store
>  # Thread 1 successfully joins the consumer group
>  # After every consumer in the group joins the group, assignments are 
> calculated, and then every consumer calls sync group to receive the new 
> assignments
>  # Thread 1 on Instance B calls sync group but gets an error - the group 
> coordinator has triggered a new rebalance and all members must rejoin the 
> group
>  # Thread 1 again lists non-empty state directories in order to report the 
> offset's it has in it's state stores as part of it's metadata. Prior to doing 
> so, it clears it's in-memory store tracking the locks it has taken for the 
> purpose of gathering rebalance metadata
>  # Thread 1 no longer takes a lock on 0_1 as it is empty
>  # However, that lock on 0_1 owned by Thread 1 remains in StateDirectory
>  # All consumers re-join and sync successfully, receiving their new 
> assignments
>  # Thread 2 on Instance B is assigned task 0_1
>  # Thread 2 cannot take a lock on 0_1 in the StateDirectory because it is 
> still being held by Thread 1
>  # Thread 2 remains in rebalancing state, and cannot make progress on task 
> 0_1, or any other tasks it has assigned.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-989: RocksDB Iterator Metrics

2024-03-07 Thread Matthias J. Sax
Seems I am late to this party. Can we pick this up again aiming for 3.8 
release? I think it would be a great addition. Few comments:



- I think it does make sense to report `iterator-duration-avg` and 
`iterator-duration-max` for all *closed* iterators -- it just seems to 
be a useful metric (wondering if this would be _overall_ or bounded to 
some time window?)


- About the duration iterators are currently open, I believe the only 
useful way is to report the "oldest iterator", ie, the smallest iterator 
open-time, of all currently open-iterator? We all agree that in general, 
leaking iterator would bump the count metric, and if there is a few ones 
which are not closed and open for a long time, it seem sufficient to 
detect the single oldest one for alerting purpose?


- What I don't like about the KIP is it focus on RocksDB. I don't think 
we should build on the internal RocksDB counters as proposed (I guess, 
we could still expose them, similar to other RocksDB metrics which we 
expose already). However, for this new metric, we should track it 
ourselves and thus make it independent of RocksDB -- in the end, an 
in-memory store could also leak memory (and kill a JVM with an 
out-of-memory error) and we should be able to track it.


- Not sure if we would like to add support for custom stores, to allow 
them to register their iterators with this metric? Or would this not be 
necessary, because custom stores could just register a custom metric 
about it to begin with?




-Matthias

On 10/25/23 4:41 PM, Sophie Blee-Goldman wrote:


  If we used "iterator-duration-max", for
example, would it not be confusing that it includes Iterators that are
still open, and therefore the duration is not yet known?



1. Ah, I think I understand your concern better now -- I totally agree that
a
  "iterator-duration-max" metric would be confusing/misleading. I was
thinking about it a bit differently, something more akin to the
"last-rebalance-seconds-ago" consumer metric. As the name suggests,
that basically just tracks how long the consumer has gone without
rebalancing -- it doesn't purport to represent the actual duration between
rebalances, just the current time since the last one.  The hard part is
really
in choosing a name that reflects this -- maybe you have some better ideas
but off the top of my head, perhaps something like "iterator-lifetime-max"?

2. I'm not quite sure how to interpret the "iterator-duration-total" metric
-- what exactly does it mean to add up all the iterator durations? For
some context, while this is not a hard-and-fast rule, in general you'll
find that Kafka/Streams metrics tend to come in pairs of avg/max or
rate/total. Something that you might measure the avg for usually is
also useful to measure the max, whereas a total metric is probably
also useful as a rate but not so much as an avg. I actually think this
is part of why it feels like it makes so much sense to include a "max"
version of this metric, as Lucas suggested, even if the name of
"iterator-duration-max" feels misleading. Ultimately the metric names
are up to you, but for this reason, I would personally advocate for
just going with an "iterator-duration-avg" and "iterator-duration-max"

I did see your example in which you mention one could monitor the
rate of change of the "-total" metric. While this does make sense to
me, if the only way to interpret a metric is by computing another
function over it, then why not just make that computation the metric
and cut out the middle man? And in this case, to me at least, it feels
much easier to understand a metric like "iterator-duration-max" vs
something like "iterator-duration-total-rate"

3. By the way, can you add another column to the table with the new metrics
that lists the recording level? My suggestion would be to put the
"number-open-iterators" at INFO and the other two at DEBUG. See
the following for my reasoning behind this recommendation

4. I would change the "Type" entry for the "number-open-iterators" from
"Value" to "Gauge". This helps justify the "INFO" level for this metric,
since unlike the other metrics which are "Measurables", the current
timestamp won't need to be retrieved on each recording

5. Can you list the tags that would be associated with each of these
metrics (either in the table, or separately above/below if they will be
the same for all)

6. Do you have a strong preference for the name "number-open-iterators"
or would you be alright in shortening this to "num-open-iterators"? The
latter is more in line with the naming scheme used elsewhere in Kafka
for similar kinds of metrics, and a shorter name is always nice.

7. With respect to the rocksdb cache metrics, those sound useful but
if it was me, I would probably save them for a separate KIP mainly just
because the KIP freeze deadline is in a few weeks, and I wouldn't want
to end up blocking all the new metrics just because there was ongoing
debate about a subset of them. That said, you do have 3 full 

Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-03-07 Thread Matthias J. Sax

@Bruno:

(1), I think you are spot for the ts-extractor: on the restore code 
path, we only support record-ts, but there is no need for a custom-ts 
because for regular changelog topics KS sets the ts, and thus, the 
optimization this KIP proposes required that the global topic follow the 
changelog format, ie, the ts must be in the record-ts.


However, for the regular processing path, I am not sure if we can omit 
deserializers. The way the PAPI is wired up, seems to require that we 
give proper types to _other_ Processor that read from the global state 
store. For this reason, the store (which takes `Serdes` with proper 
types) is wrapped with a `MeteredStore` (like all others) to do the 
Serde work, and this MeteredStore is also exposed to the 
global-Processor? Might be good for Walker to dig into this to find out 
the details?


If would of course be nice if we could avoid the unnecessary 
deserialization on topic read, and re-serialization on global-store put 
for this case, but it seems not to be straightforward to do...



(2). Is this about the PAPI/Topology? For this case, we don't have any 
config object across the board. We only do this in the DSL. Hence, I 
would propose to just follow the existing pattern in this KIP to keep 
the API consistent. For the DSL, it could make sense of course. -- Of 
course, if we think the PAPI could be improved with config objects, we 
could do this in a dedicate KIP.



@Lucas:

The PAPI is unfortunately (by design) much more open and less 
restrictive. If a users has a custom state store, we need some 
`Processor` code from them, because we cannot provide a built-in 
processor for an unknown store. The overload which won't take a 
processor would only work for the built-in key-value store, what I 
assume would cover most use-cases, however, we should keep the door open 
for other use cases. Otherwise, we disallow this optimization for custom 
stores. PAPI is really about flexibility, and yes, with great power 
comes great responsibility for the users :)


But this actually highlights a different aspect: the overload not 
accepting a custom `Processor` but using a built-in processor, should 
not accept a generic `StoreBuilder` but should restrict the type to 
`StoreBuilder`?



-Matthias



On 3/6/24 1:14 PM, Lucas Brutschy wrote:

Hey Walker

Thanks for the KIP, and congrats on the KiBiKIP ;)

My main point is that I'd vote against introducing
`reprocessOnRestore`. The behavior for `reprocessOnRestore = false` is
just incorrect and should be removed or deprecated. If we think we
need to keep the old behavior around, renaming the methods, e.g., to
`addGlobalReadOnlyStore`, is a great opportunity to deprecate the old
behavior. But at a first glance, the old behavior just looks like a
bug to me and should just be removed.

So for this KIP, I'd keep two variants as you propose and drop the
boolean parameter, but the two variants will be
  1) a copy-restore variant without custom processing, as you propose.
  2) a process-restore variant with custom processing (parameters the
same as before). This should be combined with a clear warning in the
Javadoc of the performance downside of this approach.

Presentation:
1) I wonder if you could make another pass on the motivation section.
I was lacking some context on this problem, and I think the nature of
the restore issue only became clear to me when I read through the
comments in the JIRA ticket you linked.
2) If we decide to keep the parameter `reprocessOnRestore`, the
Javadoc on it should be extended. This is a somewhat subtle issue, and
I don't think `restore by reprocessing` is enough of an explanation.

Nits:

`{@link ValueTransformer ValueTransformer}` -> `{@link
ValueTransformer ValueTransformers}`
`user defined` -> `user-defined`

Cheers,
Lucas

On Wed, Mar 6, 2024 at 9:55 AM Bruno Cadonna  wrote:


Hi Walker,

Thanks for the KIP!

Great that you are going to fix this long-standing issue!

1.
I was wondering if we need the timestamp extractor as well as the key
and value deserializer in Topology#addGlobalStore() that do not take a
ProcessorSupplier? What about Consumed in StreamsBuilder#addGlobalStore()?
Since those methods setup a global state store that does not process any
records, do they still need to deserialize records and extract
timestamps? Name might still be needed, right?

2.
  From an API point of view, it might make sense to put all
processor-related arguments into a parameter object. Something like:
GlobalStoreParameters.globalStore().withKeySerde(keySerde).disableReprocessOnRestore()
Just an idea, open for discussion.

3.
Could you please go over the KIP and correct typos and other mistakes in
the KIP?


Best,
Bruno



On 3/2/24 1:43 AM, Matthias J. Sax wrote:

Thanks for the KIP Walker.

Fixing this issue, and providing users some flexibility to opt-in/out on
"restore reprocessing" is certainly a good improvement.

  From an API design POV, I like the idea to not

[jira] [Updated] (KAFKA-16350) StateUpdater does not init transaction after canceling task close action

2024-03-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16350:

Summary: StateUpdater does not init transaction after canceling task close 
action  (was: StateUpdated does not init transaction after canceling task close 
action)

> StateUpdater does not init transaction after canceling task close action
> 
>
> Key: KAFKA-16350
> URL: https://issues.apache.org/jira/browse/KAFKA-16350
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>            Reporter: Matthias J. Sax
>Priority: Major
> Attachments: 
> tyh5pkfmgwfoe-org.apache.kafka.streams.integration.EosIntegrationTest-shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once_v2,
>  processing threads true]-1-output.txt
>
>
> With EOSv2, we use a thread producer shared across all tasks. We init tx on 
> the producer with each _task_ (due to EOSv1 which uses a producer per task), 
> and have a guard in place to only init tx a single time.
> If we hit an error, we close the producer and create a new one, which is 
> still not initialized for transaction. At the same time, with state updater, 
> we schedule a "close task" action on error.
> For each task we get back, we do cancel the "close task" action, to actually 
> keep the task. If this happens for _all_ tasks, we don't have any task in 
> state CRATED at hand, and thus we never init the producer for transactions, 
> because we assume this was already done.
> On the first `send` request, we crash with an IllegalStateException:{{{}{}}}
> {code:java}
> Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION 
> {code}
> This bug is exposed via EOSIntegrationTest (logs attached).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16350) StateUpdated does not init transaction after canceling task close action

2024-03-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16350:

Attachment: 
tyh5pkfmgwfoe-org.apache.kafka.streams.integration.EosIntegrationTest-shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once_v2,
 processing threads true]-1-output.txt

> StateUpdated does not init transaction after canceling task close action
> 
>
> Key: KAFKA-16350
> URL: https://issues.apache.org/jira/browse/KAFKA-16350
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>            Reporter: Matthias J. Sax
>Priority: Major
> Attachments: 
> tyh5pkfmgwfoe-org.apache.kafka.streams.integration.EosIntegrationTest-shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once_v2,
>  processing threads true]-1-output.txt
>
>
> With EOSv2, we use a thread producer shared across all tasks. We init tx on 
> the producer with each _task_ (due to EOSv1 which uses a producer per task), 
> and have a guard in place to only init tx a single time.
> If we hit an error, we close the producer and create a new one, which is 
> still not initialized for transaction. At the same time, with state updater, 
> we schedule a "close task" action on error.
> For each task we get back, we do cancel the "close task" action, to actually 
> keep the task. If this happens for _all_ tasks, we don't have any task in 
> state CRATED at hand, and thus we never init the producer for transactions, 
> because we assume this was already done.
> On the first `send` request, we crash with an IllegalStateException:{{{}{}}}
> {code:java}
> Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION 
> {code}
> This bug is exposed via EOSIntegrationTest (logs attached).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16350) StateUpdated does not init transaction after canceling task close action

2024-03-06 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16350:
---

 Summary: StateUpdated does not init transaction after canceling 
task close action
 Key: KAFKA-16350
 URL: https://issues.apache.org/jira/browse/KAFKA-16350
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Matthias J. Sax


With EOSv2, we use a thread producer shared across all tasks. We init tx on the 
producer with each _task_ (due to EOSv1 which uses a producer per task), and 
have a guard in place to only init tx a single time.

If we hit an error, we close the producer and create a new one, which is still 
not initialized for transaction. At the same time, with state updater, we 
schedule a "close task" action on error.

For each task we get back, we do cancel the "close task" action, to actually 
keep the task. If this happens for _all_ tasks, we don't have any task in state 
CRATED at hand, and thus we never init the producer for transactions, because 
we assume this was already done.

On the first `send` request, we crash with an IllegalStateException:{{{}{}}}
{code:java}
Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION 
{code}
This bug is exposed via EOSIntegrationTest (logs attached).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16350) StateUpdated does not init transaction after canceling task close action

2024-03-06 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16350:
---

 Summary: StateUpdated does not init transaction after canceling 
task close action
 Key: KAFKA-16350
 URL: https://issues.apache.org/jira/browse/KAFKA-16350
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Matthias J. Sax


With EOSv2, we use a thread producer shared across all tasks. We init tx on the 
producer with each _task_ (due to EOSv1 which uses a producer per task), and 
have a guard in place to only init tx a single time.

If we hit an error, we close the producer and create a new one, which is still 
not initialized for transaction. At the same time, with state updater, we 
schedule a "close task" action on error.

For each task we get back, we do cancel the "close task" action, to actually 
keep the task. If this happens for _all_ tasks, we don't have any task in state 
CRATED at hand, and thus we never init the producer for transactions, because 
we assume this was already done.

On the first `send` request, we crash with an IllegalStateException:{{{}{}}}
{code:java}
Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION 
{code}
This bug is exposed via EOSIntegrationTest (logs attached).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


<    1   2   3   4   5   6   7   8   9   10   >