[jira] [Updated] (KAFKA-15309) Add custom error handler to Producer

2023-08-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15309:

Description: 
The producer collects multiple records into batches, and a single record 
specific error might fail the whole batch (eg, `RecordTooLargeException`).

This ticket suggests to add a per-record error handler, that allows user to opt 
into skipping bad records without failing the whole batch (similar to Kafka 
Streams `ProductionExceptionHandler`).

The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.

Another example for which a production exception handler could be useful, if a 
user tries to write into a non-existing topic, which returns a retryable error 
code; with infinite retries the producer would hang retrying forever. A handler 
could help to break the infinite retry loop.

  was:
The producer collects multiple records into batches, and a single record 
specific error might fail the whole batch (eg, `RecordTooLargeException`).

This ticket suggests to add a per-record error handler, that allows user to opt 
into skipping bad records without failing the whole batch (similar to Kafka 
Streams `ProductionExceptionHandler`).

The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.

Another example for which a production exception handler would be useful, if a 
user tries to write into a non-existing topic, which returns a retryable error 
code; with infinite retries the producer would hang retrying forever.


> Add custom error handler to Producer
> 
>
> Key: KAFKA-15309
> URL: https://issues.apache.org/jira/browse/KAFKA-15309
> Project: Kafka
>  Issue Type: New Feature
>  Components: producer 
>            Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> The producer collects multiple records into batches, and a single record 
> specific error might fail the whole batch (eg, `RecordTooLargeException`).
> This ticket suggests to add a per-record error handler, that allows user to 
> opt into skipping bad records without failing the whole batch (similar to 
> Kafka Streams `ProductionExceptionHandler`).
> The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
> https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.
> Another example for which a production exception handler could be useful, if 
> a user tries to write into a non-existing topic, which returns a retryable 
> error code; with infinite retries the producer would hang retrying forever. A 
> handler could help to break the infinite retry loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15309) Add custom error handler to Producer

2023-08-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15309:

Description: 
The producer collects multiple records into batches, and a single record 
specific error might fail the whole batch (eg, `RecordTooLargeException`).

This ticket suggests to add a per-record error handler, that allows user to opt 
into skipping bad records without failing the whole batch (similar to Kafka 
Streams `ProductionExceptionHandler`).

The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.

Another example for which a production exception handler would be useful, if a 
user tries to write into a non-existing topic, which returns a retryable error 
code; with infinite retries the producer would hang retrying forever.

  was:
The producer collects multiple records into batches, and a single record 
specific error might fail the whole batch (eg, `RecordTooLargeException`).

This ticket suggest to add a per-record error handler, that allows user to opt 
into skipping bad records without failing the whole batch (similar to Kafka 
Streams `ProductionExceptionHandler`).

The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.


> Add custom error handler to Producer
> 
>
> Key: KAFKA-15309
> URL: https://issues.apache.org/jira/browse/KAFKA-15309
> Project: Kafka
>  Issue Type: New Feature
>  Components: producer 
>            Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> The producer collects multiple records into batches, and a single record 
> specific error might fail the whole batch (eg, `RecordTooLargeException`).
> This ticket suggests to add a per-record error handler, that allows user to 
> opt into skipping bad records without failing the whole batch (similar to 
> Kafka Streams `ProductionExceptionHandler`).
> The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
> https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.
> Another example for which a production exception handler would be useful, if 
> a user tries to write into a non-existing topic, which returns a retryable 
> error code; with infinite retries the producer would hang retrying forever.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15309) Add custom error handler to Producer

2023-08-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15309:

Description: 
The producer collects multiple records into batches, and a single record 
specific error might fail the whole batch (eg, `RecordTooLargeException`).

This ticket suggest to add a per-record error handler, that allows user to opt 
into skipping bad records without failing the whole batch (similar to Kafka 
Streams `ProductionExceptionHandler`).

The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.

  was:
The producer batches up multiple records into batches, and a single record 
specific error might fail the whole batch.

This ticket suggest to add a per-record error handler, that allows user to opt 
into skipping bad records without failing the whole batch (similar to Kafka 
Streams `ProductionExceptionHandler`).

The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.


> Add custom error handler to Producer
> 
>
> Key: KAFKA-15309
> URL: https://issues.apache.org/jira/browse/KAFKA-15309
> Project: Kafka
>  Issue Type: New Feature
>  Components: producer 
>            Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> The producer collects multiple records into batches, and a single record 
> specific error might fail the whole batch (eg, `RecordTooLargeException`).
> This ticket suggest to add a per-record error handler, that allows user to 
> opt into skipping bad records without failing the whole batch (similar to 
> Kafka Streams `ProductionExceptionHandler`).
> The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
> https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15309) Add custom error handler to Producer

2023-08-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15309:

Description: 
The producer batches up multiple records into batches, and a single record 
specific error might fail the whole batch.

This ticket suggest to add a per-record error handler, that allows user to opt 
into skipping bad records without failing the whole batch (similar to Kafka 
Streams `ProductionExceptionHandler`).

The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.

  was:
The producer batches up multiple records into batches, and a single record 
specific error might fail the whole batch.

This ticket suggest to add a per-record error handler, that allows user to opt 
into skipping bad records without failing the whole batch (similar to Kafka 
Streams `ProductionExceptionHandler`.


> Add custom error handler to Producer
> 
>
> Key: KAFKA-15309
> URL: https://issues.apache.org/jira/browse/KAFKA-15309
> Project: Kafka
>  Issue Type: New Feature
>  Components: producer 
>            Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> The producer batches up multiple records into batches, and a single record 
> specific error might fail the whole batch.
> This ticket suggest to add a per-record error handler, that allows user to 
> opt into skipping bad records without failing the whole batch (similar to 
> Kafka Streams `ProductionExceptionHandler`).
> The fix of https://issues.apache.org/jira/browse/KAFKA-9279 caused 
> https://issues.apache.org/jira/browse/KAFKA-15259 which inspired this ticket.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once

2023-08-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751267#comment-17751267
 ] 

Matthias J. Sax commented on KAFKA-15259:
-

I did sync up with [~cegerton] who worked on 
https://issues.apache.org/jira/browse/KAFKA-9279, and we come up with this 
idea: adding a "production exception handler" to the producer that would allow 
KS to tell the producer to not fail the TX but skip the record: 
https://issues.apache.org/jira/browse/KAFKA-15259 

If we cannot do K15259, an alternative might be, to add an internal producer 
config that allow Kafka Streams to disable the pro-active abort of a TX. This 
would be safe, because Kafka Streams is actually a good citizen and calls 
`producer.flush()` and evaluates all callbacks before trying to commit – the 
issue K9279 addresses is actually bad user behavior to not check for async 
errors before committing.

> Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once
> 
>
> Key: KAFKA-15259
> URL: https://issues.apache.org/jira/browse/KAFKA-15259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Tomonari Yamashita
>Priority: Major
> Attachments: Reproducer.java, app_at_least_once.log, 
> app_exactly_once.log
>
>
> [Problem]
>  - Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once.
>  -- "CONTINUE will signal that Streams should ignore the issue and continue 
> processing"(1), so Kafka Streams should continue processing even if using 
> execute_once when ProductionExceptionHandlerResponse.CONTINUE used.
>  -- However, if using execute_once, Kafka Streams does not continue 
> processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down 
> as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 
> [Environment]
>  - Kafka Streams 3.5.1
> [Reproduction procedure]
>  # Create "input-topic" topic and "output-topic"
>  # Put several messages on "input-topic"
>  # Execute a simple Kafka streams program that transfers too large messages 
> from "input-topic" to "output-topic" with execute_once and returns 
> ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the 
> producer. Please refer to the reproducer program (attached file: 
> Reproducer.java).
>  # ==> However, Kafka Streams does not continue processing due to rollback 
> despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread 
> shutdown as the default 
> behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to 
> the debug log (attached file: app_exactly_once.log).
>  ## My excepted behavior is that Kafka Streams should continue processing 
> even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE 
> used.
> [As far as my investigation]
>  - FYI, if using at_least_once instead of execute_once, Kafka Streams 
> continue processing without rollback when 
> ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the 
> debug log (attached file: app_at_least_once.log).
> - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka 
> Streams 3.2.0, as rollback occurs.
> (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler
>  - 
> [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler]
> (2) Transaction abort and shutdown occur
> {code:java}
> 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Exception occurred during message 
> send:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
> stream-task [0_0] Error encountered sending record to topic output-topic for 
> task 0_0 due to:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which i

[jira] [Created] (KAFKA-15309) Add custom error handler to Producer

2023-08-04 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15309:
---

 Summary: Add custom error handler to Producer
 Key: KAFKA-15309
 URL: https://issues.apache.org/jira/browse/KAFKA-15309
 Project: Kafka
  Issue Type: New Feature
  Components: producer 
Reporter: Matthias J. Sax


The producer batches up multiple records into batches, and a single record 
specific error might fail the whole batch.

This ticket suggest to add a per-record error handler, that allows user to opt 
into skipping bad records without failing the whole batch (similar to Kafka 
Streams `ProductionExceptionHandler`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15309) Add custom error handler to Producer

2023-08-04 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15309:
---

 Summary: Add custom error handler to Producer
 Key: KAFKA-15309
 URL: https://issues.apache.org/jira/browse/KAFKA-15309
 Project: Kafka
  Issue Type: New Feature
  Components: producer 
Reporter: Matthias J. Sax


The producer batches up multiple records into batches, and a single record 
specific error might fail the whole batch.

This ticket suggest to add a per-record error handler, that allows user to opt 
into skipping bad records without failing the whole batch (similar to Kafka 
Streams `ProductionExceptionHandler`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Testing FixedKeyProcessor implementation using unit tests

2023-08-04 Thread Matthias J. Sax
Thanks for filing a ticket for it: 
https://issues.apache.org/jira/browse/KAFKA-15242




On 7/14/23 1:06 AM, EXT.Zlatibor.Veljkovic wrote:

Hi Matthias,

Here's the repro of the project that has these issues 
https://github.com/zveljkovic/kafka-repro.

Please look at the:
Topology definition: 
https://github.com/zveljkovic/kafka-repro/blob/master/src/main/java/com/example/demo/DemoApplication.java
FixedKeyProcessor: 
https://github.com/zveljkovic/kafka-repro/blob/master/src/main/java/com/example/demo/MyFixedKeyProcessor.java
Test of FixedKeyProcessor: 
https://github.com/zveljkovic/kafka-repro/blob/master/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java

Test is where I am having issues.

Thanks,
Zed


-Original Message-
From: Matthias J. Sax 
Sent: Tuesday, July 11, 2023 1:13 AM
To: dev@kafka.apache.org
Subject: Re: Testing FixedKeyProcessor implementation using unit tests

External email:Be careful with links and attachments


Not sure right now, but could be a bug.

Can you maybe share the full stack trace and the test program?

-Matthias

On 7/10/23 3:47 AM, EXT.Zlatibor.Veljkovic wrote:

Hi, I am using kafka-streams-test-utils and have problem with testing 
FixedKeyProcessor [KIP-820 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-820%3A+Extend+KStream+process+with+new+Processor+API#KIP820:ExtendKStreamprocesswithnewProcessorAPI-InfrastructureforFixedKeyRecords].

Using mock processor context to get the forwarded message doesn't work.

class org.apache.kafka.streams.processor.api.MockProcessorContext cannot be 
cast to class org.apache.kafka.streams.processor.api.FixedKeyProcessorContext

Anything I can do to get forwarded records?

Thanks,
Zed



[jira] [Commented] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once

2023-08-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751263#comment-17751263
 ] 

Matthias J. Sax commented on KAFKA-15259:
-

Thanks for digging into it – that's a good fine – I was already wondering, 
because I could not see any code changes between 3.1 and 3.2 in Kafka Streams 
that would explain it.

And yes, if the producer goes into error-state, it is impossible for KS to 
"revert" it – thus, I am not sure right now how we could fix it... In the end, 
Kafka Streams does `producer.flush()` and evaluates if there are any errors, 
detect the `RecordTooLargeException`, executed the handler which returns 
`CONTINUE` what is respected. If it would not be respected, the 
`RecordTooLargeException` would be re-thrown right away. But because Kafka 
Streams does `CONTINUE` it actually tries to commit, but cannot because the 
producer is already in error state.

I high level idea would be, trying to remember the input record offset, and 
after we failed and the task is restarted, has an implicit filter that drops 
the input message right away based on the offset we did remember. But such a 
thing would needs to get very careful design...

> Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once
> 
>
> Key: KAFKA-15259
> URL: https://issues.apache.org/jira/browse/KAFKA-15259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Tomonari Yamashita
>Priority: Major
> Attachments: Reproducer.java, app_at_least_once.log, 
> app_exactly_once.log
>
>
> [Problem]
>  - Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once.
>  -- "CONTINUE will signal that Streams should ignore the issue and continue 
> processing"(1), so Kafka Streams should continue processing even if using 
> execute_once when ProductionExceptionHandlerResponse.CONTINUE used.
>  -- However, if using execute_once, Kafka Streams does not continue 
> processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down 
> as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 
> [Environment]
>  - Kafka Streams 3.5.1
> [Reproduction procedure]
>  # Create "input-topic" topic and "output-topic"
>  # Put several messages on "input-topic"
>  # Execute a simple Kafka streams program that transfers too large messages 
> from "input-topic" to "output-topic" with execute_once and returns 
> ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the 
> producer. Please refer to the reproducer program (attached file: 
> Reproducer.java).
>  # ==> However, Kafka Streams does not continue processing due to rollback 
> despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread 
> shutdown as the default 
> behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to 
> the debug log (attached file: app_exactly_once.log).
>  ## My excepted behavior is that Kafka Streams should continue processing 
> even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE 
> used.
> [As far as my investigation]
>  - FYI, if using at_least_once instead of execute_once, Kafka Streams 
> continue processing without rollback when 
> ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the 
> debug log (attached file: app_at_least_once.log).
> - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka 
> Streams 3.2.0, as rollback occurs.
> (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler
>  - 
> [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler]
> (2) Transaction abort and shutdown occur
> {code:java}
> 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Exception occurred during message 
> send:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread 
> [jav

[jira] [Updated] (KAFKA-15308) Wipe Stores upon OffsetOutOfRangeException in ALOS

2023-08-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15308:

Affects Version/s: (was: 3.4.0)
   (was: 3.5.0)

> Wipe Stores upon OffsetOutOfRangeException in ALOS
> --
>
> Key: KAFKA-15308
> URL: https://issues.apache.org/jira/browse/KAFKA-15308
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.0
>Reporter: Colt McNealy
>Priority: Minor
>
> As per this [Confluent Community Slack 
> Thread|https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1690843733272449?thread_ts=1690663361.858559=C48AHTCUQ],
>  Streams currently does not wipe away RocksDB state upon encountering an 
> `OffsetOutOfRangeException` in ALOS.
>  
> `OffsetOutOfRangeException` is a rare case that occurs when a standby task 
> requests offsets that no longer exist in the topic. We should wipe the store 
> for three reasons:
>  # Not wiping the store can be a violation of ALOS since some of the 
> now-missing offsets could have contained tombstone records.
>  # Wiping the store has no performance cost since we need to replay the 
> entirety of what's in the changelog topic anyways.
>  # I have heard (not yet confirmed myself) that we wipe the store in EOS 
> anyways, so fixing this bug could remove a bit of complexity from supporting 
> EOS and ALOS.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-955: Add stream-table join on foreign key

2023-08-04 Thread Matthias J. Sax
owever
in many cases for various architecture reasons there is a desire to remove
JDBC queries from the data source and replace it with CDC streaming data to
Kafka. So in that case assembling data entities from Kafka topics instead
of JDBC would be beneficial.

Please let me know what you think.

Regards,

Igor

On Tue, Jul 25, 2023 at 5:53 PM Matthias J. Sax  wrote:


Igor,

thanks for the KIP. Interesting proposal. I am wondering a little bit
about the use-case and semantics, and if it's really required to add
what you propose? Please correct me if I am wrong.

In the end, a stream-table join is a "stream enrichment" (via a table
lookup). Thus, it's inherently a 1:1 join (in contrast to a FK
table-table join which is a n:1 join).

If this assumption is correct, and you have data for which the table
side join attribute is in the value, you could actually repartition the
table data using the join attribute as the PK of the table.

If my assumption is incorrect, and you say you want to have a 1:n join
(note that I intentionally reversed from n:1 to 1:n), I would rather
object, because it seems to violate the idea to "enrich" a stream, what
means that each input record produced an output record, not multiple?

Also note: for a FK table-table join, we use the forgeinKeyExtractor to
get the join attribute from the left input table (which corresponds to
the KStream in your KIP; ie, it's a n:1 join), while you propose to use
the foreignKeyExtractor to be applied to the KTable (which is the right
input, and thus it would be a 1:n join).

Maybe you can clarify the use case a little bit. For the current KIP
description I only see the 1:1 join case, what would mean we might not
need such a feature?


-Matthias


On 7/24/23 11:36 AM, Igor Fomenko wrote:

Hello developers of the Kafka Streams,

I would like to start discussion on KIP-955: Add stream-table join on
foreign key
<

https://cwiki.apache.org/confluence/display/KAFKA/KIP-955%3A+Add+stream-table+join+on+foreign+key


This KIP proposes the new API to join KStrem with KTable based on foreign
key relation.
Ths KIP was inspired by one of my former projects to integrate RDBMS
databases with data consumers using Change Data Capture and Kafka.
If we had the capability in Kafka Stream to join KStream with KTable on
foreign key this would simplify our implementation significantly.

Looking forward to your feedback and discussion.

Regards,

Igor







Re: [DISCUSS] KIP-962 Relax non-null key requirement in Kafka Streams

2023-08-04 Thread Matthias J. Sax

Guozhang,

thanks for pointing out ValueJoinerWithKey. In the end, it's just a 
documentation change, ie, point out that the passed in key could be 
`null` and similar?


-Matthias


On 8/2/23 3:20 PM, Guozhang Wang wrote:

Thanks Florin for the writeup,

One quick thing I'd like to bring up is that in KIP-149
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner)
we introduced ValueJoinerWithKey which is aimed to enhance
ValueJoiner. It would have a benefit for this KIP such that
implementers can distinguish "null-key" v.s. "not-null-key but
null-value" scenarios.

Hence I'd suggest we also include the semantic changes with
ValueJoinerWithKey, which can help distinguish these two scenarios,
and also document that if users apply ValueJoiner only, they may not
have this benefit, and hence we suggest users to use the former.


Guozhang

On Mon, Jul 31, 2023 at 12:11 PM Florin Akermann
 wrote:


https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams


[jira] [Created] (KAFKA-15307) Kafka Streams configuration docs outdate

2023-08-04 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15307:
---

 Summary: Kafka Streams configuration docs outdate
 Key: KAFKA-15307
 URL: https://issues.apache.org/jira/browse/KAFKA-15307
 Project: Kafka
  Issue Type: Task
  Components: docs, streams
Reporter: Matthias J. Sax


[https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html]
 need to be updated.

It's missing a lot of newly added config, and still lists already removed 
configs.

For deprecated configs, we could consider to also remove them, or add a 
"deprecated config" section and keep the for the time being.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15307) Kafka Streams configuration docs outdate

2023-08-04 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15307:
---

 Summary: Kafka Streams configuration docs outdate
 Key: KAFKA-15307
 URL: https://issues.apache.org/jira/browse/KAFKA-15307
 Project: Kafka
  Issue Type: Task
  Components: docs, streams
Reporter: Matthias J. Sax


[https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html]
 need to be updated.

It's missing a lot of newly added config, and still lists already removed 
configs.

For deprecated configs, we could consider to also remove them, or add a 
"deprecated config" section and keep the for the time being.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15302) Stale value returned when using store.all() in punctuation function.

2023-08-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751236#comment-17751236
 ] 

Matthias J. Sax commented on KAFKA-15302:
-

Thanks for reporting this issue. – When you call `store.all()` you get a 
iterator back that is build over the cache as well as RocksDB. For the 
underlying RocksDB iterator, it provides an immutable snapshot, thus any later 
writes into RocksDB are not visible to the iterator. Thus, if the cache is 
flushed, and we try to read the key from the cache and cannot find it, we go to 
the underlying RocksDB iterator which cannot see the write. This should explain 
it.

What I am wondering though right now is, why the cache would get flushed to 
begin with? – There should not be an explicit `store.flush()` call because we 
only flush before a `commit()` what happens on the same thread; we might also 
`evict()` during a `put()` if the cache overflows, but there is no `put()` call 
in between; the third case I could find is, when a new `StreamThread` is added 
and we need to resize the cache (this would indeed be an concurrent operation; 
could adding/removing a thread explain what you observe?

Otherwise we would need to do more digging while the cache is flushed begin 
with? If we flush incorrectly and can avoid the flush we should be able to fix 
it. If we flush correctly, we might need to have a guard inside the caching 
layer itself and suppress the flush if there is an open iterator (what does 
actually not sound like a great solution, but maybe it would be the correct way 
forward.)

> Stale value returned when using store.all() in punctuation function.
> 
>
> Key: KAFKA-15302
> URL: https://issues.apache.org/jira/browse/KAFKA-15302
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Jinyong Choi
>Priority: Major
>
> When using the store.all() function within the Punctuation function of 
> this.context.schedule, the previous value is returned. In other words, even 
> though the value has been stored from 1 to 2, it doesn't return 2; instead, 
> it returns 1.
> In the provided test code, you can see the output 'BROKEN !!!', and while 
> this doesn't occur 100% of the time, by adding logs, it's evident that during 
> the while loop after all() is called, the cache is flushed. As a result, the 
> named cache holds a null value, causing the return of a value from RocksDB. 
> This is observed as the value after the .get() call is different from the 
> expected value. This is possibly due to the consistent read functionality of 
> RocksDB, although the exact cause is not certain.
> Of course, if you perform {{store.flush()}} before {{all()}} there won't be 
> any errors.
>  
>  * test code (forked from balajirrao and modified for this)
> [https://github.com/jinyongchoi/kafka-streams-multi-runner/|https://github.com/jinyongchoi/kafka-streams-multi-runner/tree/main]
>  
> {code:java}
> private void forwardAll(final long timestamp) {
> //
>     System.err.println("forwardAll Start");    KeyValueIterator Integer> kvList = this.kvStore.all();
>     while (kvList.hasNext()) {
>         KeyValue entry = kvList.next();
>         final Record msg = new Record<>(entry.key, 
> entry.value, context.currentSystemTimeMs());
>         final Integer storeValue = this.kvStore.get(entry.key);        if 
> (entry.value != storeValue) {
>             System.err.println("[" + instanceId + "]" + "!!! BROKEN !!! Key: 
> " + entry.key + " Expected in stored(Cache or Store) value: " + storeValue + 
> " but KeyValueIterator value: " + entry.value);
>             throw new RuntimeException("Broken!");
>         }        this.context.forward(msg);
>     }
>     kvList.close();
> }
> {code}
>  * log file (add log in stream source)
>  
> {code:java}
> # console log
> sbt clean "worker/assembly"; sbt "worker/assembly"; sbt "coordinator / run 1"
> [info] welcome to sbt 1.8.2 (Ubuntu Java 11.0.20)
> ...
> [info] running Coordinator 1
> appid: 95108c48-7c69-4eeb-adbd-9d091bd84933
> [0] starting instance +1
> forwardAll Start
> [0]!!! BROKEN !!! Key: 636398 Expected in stored(Cache or Store) value: 2 but 
> KeyValueIterator value: 1
> # log file
> ...
> 01:05:00.382 
> [95108c48-7c69-4eeb-adbd-9d091bd84933-67de276e-fce4-4621-99c1-aea7849262d2-StreamThread-1]
>  INFO  o.a.k.s.state.internals.NamedCache -- Named cache 0_0-Counts stats on 
> flush: #hits=5628524, #misses=5636397, #overwrites=63

[jira] [Commented] (KAFKA-15303) Foreign key joins no longer triggered by events on the right side of the join after deployment with a new compatible Avro schema

2023-08-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17751212#comment-17751212
 ] 

Matthias J. Sax commented on KAFKA-15303:
-

Thanks for filing this ticket.

Kafka Streams as only limited support for schema evolution, because Kafka 
Streams is schema agnostic – the runtime does not even know what format is 
used, and thus cannot reason about it. I updated the ticket as "improvement" 
because it's not a bug: the system works as designed.

In the end, Kafka Streams uses whatever Serde you provide, so it's not clear if 
we even could fix it on our end? Maybe you could put some hack into the Serde 
you provide to fix it? It's unfortunately not possible atm to get the original 
raw bytes right now (that would allow so avoid the re-serialization to begin 
with).

> Foreign key joins no longer triggered by events on the right side of the join 
> after deployment with a new compatible Avro schema
> 
>
> Key: KAFKA-15303
> URL: https://issues.apache.org/jira/browse/KAFKA-15303
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Charles-Eddy
>Priority: Major
> Attachments: image (1).png
>
>
> Hello everyone, I am currently working on a project that uses Kafka Streams 
> (version 3.4.0) with a Kafka broker (version 2.7.0) managed by Amazon MSK.
> Our goal is to join offer information from our sellers with additional data 
> from various input topics, and then feed the resulting joined information 
> into an output topic.
> Our application is deployed in Kubernetes using the StatefulSet feature, with 
> one EBS volume per Kafka Streams pod and 5 Streams Threads per pod.
> We are using avro to serialize / deserialize input topics and storing in the 
> state stores of Kafka streams.
> We have encountered a bug in Kafka Streams that prevents us from deploying 
> new versions of Kafka Streams containing new compatible Avro schemas of our 
> input topics.
> The symptom is that after deploying our new version, which contains no 
> changes in topology but only changes to the Avro schema used, we discard 
> every event coming from the right part of the join concerned by these Avro 
> schema changes until we receive something from the left part of the join.
> As a result, we are losing events and corrupting our output topics and stores 
> with outdated information.
> After checking the local help for the priority to assign, I have assigned it 
> as *CRITICAL* because we are losing data (for example, tombstones are not 
> propagated to the following joins, so some products are still visible on our 
> website when they should not be).
> Please feel free to change the priority if you think it is not appropriate.
>  
> *The bug:*
> After multiple hours of investigation we found out that the bug is located in 
> the foreign key join feature and specifically in this class: 
> *SubscriptionResolverJoinProcessorSupplier* in the left part of a foreign key 
> join. 
> This class and his method process(...) is computing a hash from the local 
> store via a serialization of a deserialized value from the left state store 
> and comparing it with the hash of the original message from the 
> subscription-response-topic. 
>  
> It means that when we deploy a new version of our kafka streams instance with 
> a new compatible avro schema from the left side of a join, every join 
> triggered by the right part of the join are invalidated until we receive all 
> the events again on the left side. Every join triggered by the right part of 
> the join are discarded because all the hashes computed by kafka streams are 
> different now from the original messages.
>  
> *How to reproduce it:*
> If we take a working and a non-working workflow, it will do something like 
> this:
> +Normal non-breaking+ workflow from the left part of the FK join:
>  # A new offer event occurs. The offer is received and stored (v1).
>  # A subscription registration is sent with the offer-hash (v1).
>  # The subscription is saved to the store with the v1 offer-hash.
>  # Product data is searched for.
>  # If product data is found, a subscription response is sent back, including 
> the v1 offer-hash.
>  # The offer data in the store is searched for and the offer hashes between 
> the store (v1) and response event (also v1) are compared.
>  # Finally, the join result is sent.
> New product event from the right part of the FK join:
>  # The product is received and stor

[jira] [Updated] (KAFKA-15303) Foreign key joins no longer triggered by events on the right side of the join after deployment with a new compatible Avro schema

2023-08-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15303:

Issue Type: Improvement  (was: Bug)

> Foreign key joins no longer triggered by events on the right side of the join 
> after deployment with a new compatible Avro schema
> 
>
> Key: KAFKA-15303
> URL: https://issues.apache.org/jira/browse/KAFKA-15303
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Charles-Eddy
>Priority: Critical
> Attachments: image (1).png
>
>
> Hello everyone, I am currently working on a project that uses Kafka Streams 
> (version 3.4.0) with a Kafka broker (version 2.7.0) managed by Amazon MSK.
> Our goal is to join offer information from our sellers with additional data 
> from various input topics, and then feed the resulting joined information 
> into an output topic.
> Our application is deployed in Kubernetes using the StatefulSet feature, with 
> one EBS volume per Kafka Streams pod and 5 Streams Threads per pod.
> We are using avro to serialize / deserialize input topics and storing in the 
> state stores of Kafka streams.
> We have encountered a bug in Kafka Streams that prevents us from deploying 
> new versions of Kafka Streams containing new compatible Avro schemas of our 
> input topics.
> The symptom is that after deploying our new version, which contains no 
> changes in topology but only changes to the Avro schema used, we discard 
> every event coming from the right part of the join concerned by these Avro 
> schema changes until we receive something from the left part of the join.
> As a result, we are losing events and corrupting our output topics and stores 
> with outdated information.
> After checking the local help for the priority to assign, I have assigned it 
> as *CRITICAL* because we are losing data (for example, tombstones are not 
> propagated to the following joins, so some products are still visible on our 
> website when they should not be).
> Please feel free to change the priority if you think it is not appropriate.
>  
> *The bug:*
> After multiple hours of investigation we found out that the bug is located in 
> the foreign key join feature and specifically in this class: 
> *SubscriptionResolverJoinProcessorSupplier* in the left part of a foreign key 
> join. 
> This class and his method process(...) is computing a hash from the local 
> store via a serialization of a deserialized value from the left state store 
> and comparing it with the hash of the original message from the 
> subscription-response-topic. 
>  
> It means that when we deploy a new version of our kafka streams instance with 
> a new compatible avro schema from the left side of a join, every join 
> triggered by the right part of the join are invalidated until we receive all 
> the events again on the left side. Every join triggered by the right part of 
> the join are discarded because all the hashes computed by kafka streams are 
> different now from the original messages.
>  
> *How to reproduce it:*
> If we take a working and a non-working workflow, it will do something like 
> this:
> +Normal non-breaking+ workflow from the left part of the FK join:
>  # A new offer event occurs. The offer is received and stored (v1).
>  # A subscription registration is sent with the offer-hash (v1).
>  # The subscription is saved to the store with the v1 offer-hash.
>  # Product data is searched for.
>  # If product data is found, a subscription response is sent back, including 
> the v1 offer-hash.
>  # The offer data in the store is searched for and the offer hashes between 
> the store (v1) and response event (also v1) are compared.
>  # Finally, the join result is sent.
> New product event from the right part of the FK join:
>  # The product is received and stored.
>  # All related offers in the registration store are searched for.
>  # A subscription response is sent for each offer, including their offer hash 
> (v1).
>  # The offer data in the store is searched for and the offer hashes between 
> the store (v1) and response event (also v1) are compared.
>  # Finally, the join result is sent.
>  
> +Breaking workflow:+ 
> The offer serializer is changed to offer v2
> New product event from the right part of the FK join: 
>  # The product is received and stored.
>  # All related offers in the registration store are searched for.
>  # A subscription resp

[jira] [Updated] (KAFKA-15303) Foreign key joins no longer triggered by events on the right side of the join after deployment with a new compatible Avro schema

2023-08-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15303:

Priority: Major  (was: Critical)

> Foreign key joins no longer triggered by events on the right side of the join 
> after deployment with a new compatible Avro schema
> 
>
> Key: KAFKA-15303
> URL: https://issues.apache.org/jira/browse/KAFKA-15303
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Charles-Eddy
>Priority: Major
> Attachments: image (1).png
>
>
> Hello everyone, I am currently working on a project that uses Kafka Streams 
> (version 3.4.0) with a Kafka broker (version 2.7.0) managed by Amazon MSK.
> Our goal is to join offer information from our sellers with additional data 
> from various input topics, and then feed the resulting joined information 
> into an output topic.
> Our application is deployed in Kubernetes using the StatefulSet feature, with 
> one EBS volume per Kafka Streams pod and 5 Streams Threads per pod.
> We are using avro to serialize / deserialize input topics and storing in the 
> state stores of Kafka streams.
> We have encountered a bug in Kafka Streams that prevents us from deploying 
> new versions of Kafka Streams containing new compatible Avro schemas of our 
> input topics.
> The symptom is that after deploying our new version, which contains no 
> changes in topology but only changes to the Avro schema used, we discard 
> every event coming from the right part of the join concerned by these Avro 
> schema changes until we receive something from the left part of the join.
> As a result, we are losing events and corrupting our output topics and stores 
> with outdated information.
> After checking the local help for the priority to assign, I have assigned it 
> as *CRITICAL* because we are losing data (for example, tombstones are not 
> propagated to the following joins, so some products are still visible on our 
> website when they should not be).
> Please feel free to change the priority if you think it is not appropriate.
>  
> *The bug:*
> After multiple hours of investigation we found out that the bug is located in 
> the foreign key join feature and specifically in this class: 
> *SubscriptionResolverJoinProcessorSupplier* in the left part of a foreign key 
> join. 
> This class and his method process(...) is computing a hash from the local 
> store via a serialization of a deserialized value from the left state store 
> and comparing it with the hash of the original message from the 
> subscription-response-topic. 
>  
> It means that when we deploy a new version of our kafka streams instance with 
> a new compatible avro schema from the left side of a join, every join 
> triggered by the right part of the join are invalidated until we receive all 
> the events again on the left side. Every join triggered by the right part of 
> the join are discarded because all the hashes computed by kafka streams are 
> different now from the original messages.
>  
> *How to reproduce it:*
> If we take a working and a non-working workflow, it will do something like 
> this:
> +Normal non-breaking+ workflow from the left part of the FK join:
>  # A new offer event occurs. The offer is received and stored (v1).
>  # A subscription registration is sent with the offer-hash (v1).
>  # The subscription is saved to the store with the v1 offer-hash.
>  # Product data is searched for.
>  # If product data is found, a subscription response is sent back, including 
> the v1 offer-hash.
>  # The offer data in the store is searched for and the offer hashes between 
> the store (v1) and response event (also v1) are compared.
>  # Finally, the join result is sent.
> New product event from the right part of the FK join:
>  # The product is received and stored.
>  # All related offers in the registration store are searched for.
>  # A subscription response is sent for each offer, including their offer hash 
> (v1).
>  # The offer data in the store is searched for and the offer hashes between 
> the store (v1) and response event (also v1) are compared.
>  # Finally, the join result is sent.
>  
> +Breaking workflow:+ 
> The offer serializer is changed to offer v2
> New product event from the right part of the FK join: 
>  # The product is received and stored.
>  # All related offers in the registration store are searched for.
>  # A subscription response is sent 

[jira] [Commented] (KAFKA-15116) Kafka Streams processing blocked during rebalance

2023-08-02 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750506#comment-17750506
 ] 

Matthias J. Sax commented on KAFKA-15116:
-

Are you saying you are using a custom store? For this case, it's your 
responsibility to make sure it works with Kafka Streams. If you violate 
assumptions Kafka Streams makes (one of them is, that a task can access a store 
independently of all other tasks), all bets are off unfortunately. Thus, you 
would need to change your store that one task cannot block any other task to 
make progress.
{quote}It is the behaviour we have observed and validated with a fix to the 
core streams code. 
{quote}
Are you referring to 
{quote}The fix is to add a check for rebalancing in the while loop in runOnce. 
This checks if a rebalancing is in progress and sets the numIterations to 0 to 
stop processing of messages. When it has rebalanced it sets numIterations back 
to 1.
{quote}
from further above? It's not clear to me why this would help? In the end, when 
a rebalance starts, we might continue processing until we need to hand off a 
partition. For this case, we need to commit pending transactions first, and 
would start a new transaction for the remaining partitions we have afterwards.
{quote}Committing the open transactions is fine (if possible). The problem is 
the un committed transactions due to the rebalancing.
{quote}
Not sure if I can follow. An open and uncommitted transaction is the same 
thing... When a rebalance starts, Kafka Streams would commit all pending 
transactions first, and thus there should not be any pending transactions. Or 
course, as said above, a new TX might get started right away for all partitions 
we did not need to hand off and processing would continue right away.
{quote}If we have two un committed transactions for the same partition key we 
end up in the blocking state because the second message cannot be processed 
because the first message hasn't been committed. 
{quote}
What do you mean by "two un-committed transaction for the same partition key" – 
if there are two messages with the same key, they should be in the same input 
topic partition (the only exception would be some stateless processing but in 
your case state is involved) what ensures that a single task (and thus a single 
thread) processes all record with the a key, and thus the is only one 
transaction for this key. If you use a custom state store and violate this 
assumption, and put two record into different partitions and they are 
potentially processed by two threads, and thus you create a deadlock on the 
state store when both thread try to access the same row for this key, it's an 
incorrect usage of Kafka Streams. 
{quote}The old behaviour sounds like it would solve our problem. Is there a 
configuration option to switch this back on?
{quote}
[~ableegoldman] might know if it's possible to switch of cooperative 
rebalancing, but again, it seem the issue is how you use Kafka Streams (maybe I 
am wrong) – you should never block in a Processor (and for your case maybe even 
end up in a deadlock until some timeout hits, if I understood what you are 
saying correctly). – Also, even if it's possible to disable cooperative 
rebalancing, the old behavior is effectively deprecated and eager rebalancing 
will be completely removed in a future release.
{quote}To answer your question "How should the system know if there is a 
dependency?": Through configuration. I don't think anything that we are trying 
to do is going against how Kafka is designed. It might be non optimal and 
legacy but it does feel like something that streams should be flexible enough 
to handle. Why can't we chose to "stop the world"?
{quote}
That is conceptually possible – and even for cooperative rebalancing we could 
`pause()` all partitions and not process anything. But again, from what I think 
to understand so far, the issue is blocking in user-code, not how Kafka Streams 
works.

> Kafka Streams processing blocked during rebalance
> -
>
> Key: KAFKA-15116
> URL: https://issues.apache.org/jira/browse/KAFKA-15116
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.0
>Reporter: David Gammon
>Priority: Major
>
> We have a Kafka Streams application that simply takes a messages, processes 
> it and then produces an event out the other side. The complexity is that 
> there is a requirement that all events with the same partition key must be 
> committed before the next message  is processed.
> This works most of the time flawlessly but we have started to see problems 
> during deployments where the first message blocks the second message during a 
> rebala

[jira] [Commented] (KAFKA-15297) Cache flush order might not be topological order

2023-08-02 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750504#comment-17750504
 ] 

Matthias J. Sax commented on KAFKA-15297:
-

The ticket description contains an example to reproduce it (and there is also a 
png attachment visualizing the topology). 
{quote}which in turn *should* reflect the topological order of the attached 
processor nodes.
{quote}
That's not always the case unfortunately.

> Cache flush order might not be topological order 
> -
>
> Key: KAFKA-15297
> URL: https://issues.apache.org/jira/browse/KAFKA-15297
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Bruno Cadonna
>Priority: Major
> Attachments: minimal_example.png
>
>
> The flush order of the state store caches in Kafka Streams might not 
> correspond to the topological order of the state stores in the topology. The 
> order depends on how the processors and state stores are added to the 
> topology. 
> In some cases downstream state stores might be flushed before upstream state 
> stores. That means, that during a commit records in upstream caches might end 
> up in downstream caches that have already been flushed during the same 
> commit. If a crash happens at that point, those records in the downstream 
> caches are lost. Those records are lost for two reasons:
> 1. Records in caches are only changelogged after they are flushed from the 
> cache. However, the downstream caches have already been flushed and they will 
> not be flushed again during the same commit.
> 2. The offsets of the input records that caused the records that now are 
> blocked in the downstream caches are committed during the same commit and so 
> they will not be re-processed after the crash.
> An example for a topology where the flush order of the caches is wrong is the 
> following:
> {code:java}
> final String inputTopic1 = "inputTopic1";
> final String inputTopic2 = "inputTopic2";
> final String outputTopic1 = "outputTopic1";
> final String processorName = "processor1";
> final String stateStoreA = "stateStoreA";
> final String stateStoreB = "stateStoreB";
> final String stateStoreC = "stateStoreC";
> streamsBuilder.stream(inputTopic2, Consumed.with(Serdes.String(), 
> Serdes.String()))
> .process(
> () -> new Processor() {
> private ProcessorContext context;
> @Override
> public void init(ProcessorContext context) {
> this.context = context;
> }
> @Override
> public void process(Record record) {
> context.forward(record);
> }
> @Override
> public void close() {}
> },
> Named.as("processor1")
> )
> .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String()));
> streamsBuilder.stream(inputTopic1, Consumed.with(Serdes.String(), 
> Serdes.String()))
> .toTable(Materialized. byte[]>>as(stateStoreA).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
> .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreB).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
> .mapValues(value -> value, Materialized. KeyValueStore byte[]>>as(stateStoreC).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
> .toStream()
> .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String()));
> final Topology topology = streamsBuilder.build(streamsConfiguration);
> topology.connectProcessorAndStateStores(processorName, stateStoreC);
> {code}
> This code results in the attached topology.
> In the topology {{processor1}} is connected to {{stateStoreC}}. If 
> {{processor1}} is added to the topology before the other processors, i.e., if 
> the right branch of the topology is added before the left branch as in the 
> code above, the cache of {{stateStoreC}} is flushed before the caches of 
> {{stateStoreA}} and {{stateStoreB}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-12829) Remove Deprecated methods under Topology

2023-08-02 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750503#comment-17750503
 ] 

Matthias J. Sax commented on KAFKA-12829:
-

It's actually unclear when 4.0 will come along, so it seem pre-mature to work 
on a PR now, as it would outdate over time. – I am sure there will be at least 
3.7 after the next release which is 3.6, and to me it seems not unlikely that 
the might even be a 3.8 one.

> Remove Deprecated methods under Topology
> 
>
> Key: KAFKA-12829
> URL: https://issues.apache.org/jira/browse/KAFKA-12829
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Josep Prat
>Priority: Blocker
> Fix For: 4.0.0
>
>
> The following methods were deprecated in version 2.7:
>  * org.apache.kafka.streams.Topology#addProcessor(java.lang.String, 
> org.apache.kafka.streams.processor.ProcessorSupplier, java.lang.String...) 
>  * 
> org.apache.kafka.streams.Topology#addGlobalStore(org.apache.kafka.streams.state.StoreBuilder,
>  java.lang.String, org.apache.kafka.common.serialization.Deserializer, 
> org.apache.kafka.common.serialization.Deserializer, java.lang.String, 
> java.lang.String, org.apache.kafka.streams.processor.ProcessorSupplier)
>  * 
> org.apache.kafka.streams.Topology#addGlobalStore(org.apache.kafka.streams.state.StoreBuilder,
>  java.lang.String, org.apache.kafka.streams.processor.TimestampExtractor, 
> org.apache.kafka.common.serialization.Deserializer, 
> org.apache.kafka.common.serialization.Deserializer, java.lang.String, 
> java.lang.String, org.apache.kafka.streams.processor.ProcessorSupplier) 
>  
> See KAFKA-10605 and KIP-478.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15299) Support left stream-table join on foreign key

2023-08-02 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15299:

Labels: kip  (was: )

> Support left stream-table join on foreign key
> -
>
> Key: KAFKA-15299
> URL: https://issues.apache.org/jira/browse/KAFKA-15299
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Igor Fomenko
>Assignee: Igor Fomenko
>Priority: Major
>  Labels: kip
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> KIP-955: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-955%3A+Add+stream-table+join+on+foreign+key
> Currently in Kafka Streams DSL, KStream to KTable joins could only be 
> performed with the keys. However in practice it is often required to join the 
> messages in Kafka topics using message field as a "foreign key" with the 
> following pattern:  
>  
> streamX.leftJoin(tableY, RecordTableY::getForegnKey, 
> joiner).to("output-topic-name")
>  
> The left loin on foreign key operation will result in a stream of messages 
> from two topics joined on foreign key where each output message is produced 
> for each event on the input stream.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [QUESTION] What is the difference between sequence and offset for a Record?

2023-08-01 Thread Matthias J. Sax

The _offset_ is the position of the record in the partition.

The _sequence number_ is a unique ID that allows broker to de-duplicate 
messages. It requires the producer to implement the idempotency protocol 
(part of Kafka transactions); thus, sequence numbers are optional and as 
long as you don't want to support idempotent writes, you don't need to 
worry about them. (If you want to dig into details, checkout KIP-98 that 
is the original KIP about Kafka TX).


HTH,
  -Matthias

On 8/1/23 2:19 AM, tison wrote:

Hi,

I'm wringing a Kafka API Rust codec library[1] to understand how Kafka
models its concepts and how the core business logic works.

During implementing the codec for Records[2], I saw a twins of fields
"sequence" and "offset". Both of them are calculated by
baseOffset/baseSequence + offset delta. Then I'm a bit confused how to deal
with them properly - what's the difference between these two concepts
logically?

Also, to understand how the core business logic works, I write a simple
server based on my codec library, and observe that the server may need to
update offset for records produced. How does Kafka set the correct offset
for each produced records? And how does Kafka maintain the calculation for
offset and sequence during these modifications?

I'll appreciate if anyone can answer the question or give some insights :D

Best,
tison.

[1] https://github.com/tisonkun/kafka-api
[2] https://kafka.apache.org/documentation/#messageformat



Re: [VOTE] KIP-759: Unneeded repartition canceling

2023-07-31 Thread Matthias J. Sax

+1 (binding)

On 7/11/23 11:16 AM, Shay Lin wrote:

Hi all,

I'd like to call a vote on KIP-759: Unneeded repartition canceling
The KIP has been under discussion for quite some time(two years). This is a
valuable optimization for advanced users. I hope we can push this toward
the finish line this time.

Link to the KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling

Best,
Shay



[jira] [Commented] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once

2023-07-27 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17748367#comment-17748367
 ] 

Matthias J. Sax commented on KAFKA-15259:
-

Thanks for getting back – I was just asking a general question :) 

The DQL feature seems to be independent though, and I agree that it could be 
useful to add to Kafka Streams.

About the handler: looking into the stacktrace, it seem that the issue is 
actually happening during commit, in particular when offset are written:
sendOffsetsToTransaction(KafkaProducer.java:757)
This is a totally different code path. The `ProductionExceptionHandler` is 
covering the `producer.send()` code path only. – Looking into the code of both 
3.1 and 3.2, the behavior should be the same: for the call to 
`sendOffsetsToTransactions` the handler won't be triggered.

And for this case, we also cannot trigger the handler, because there is nothing 
to be dropped on the floor – Kafka Streams tries to write offsets to commit a 
TX and we cannot skip writing offsets.
{quote}Our additional testing revealed that "continue" worked in Kafka Streams 
3.1.2, but no longer works since Kafka Streams 3.2.0, as rollback occurs.
{quote}
Did you test this for `send()` or the commit case? For the `send()` case it 
should work for both versions; for the commit-case it should not work for 
either version (and is something that cannot be fixed).

Curious to hear about your findings.

> Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once
> 
>
> Key: KAFKA-15259
> URL: https://issues.apache.org/jira/browse/KAFKA-15259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Tomonari Yamashita
>Priority: Major
> Attachments: Reproducer.java, app_at_least_once.log, 
> app_exactly_once.log
>
>
> [Problem]
>  - Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once.
>  -- "CONTINUE will signal that Streams should ignore the issue and continue 
> processing"(1), so Kafka Streams should continue processing even if using 
> execute_once when ProductionExceptionHandlerResponse.CONTINUE used.
>  -- However, if using execute_once, Kafka Streams does not continue 
> processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down 
> as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 
> [Environment]
>  - Kafka Streams 3.5.1
> [Reproduction procedure]
>  # Create "input-topic" topic and "output-topic"
>  # Put several messages on "input-topic"
>  # Execute a simple Kafka streams program that transfers too large messages 
> from "input-topic" to "output-topic" with execute_once and returns 
> ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the 
> producer. Please refer to the reproducer program (attached file: 
> Reproducer.java).
>  # ==> However, Kafka Streams does not continue processing due to rollback 
> despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread 
> shutdown as the default 
> behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to 
> the debug log (attached file: app_exactly_once.log).
>  ## My excepted behavior is that Kafka Streams should continue processing 
> even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE 
> used.
> [As far as my investigation]
>  - FYI, if using at_least_once instead of execute_once, Kafka Streams 
> continue processing without rollback when 
> ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the 
> debug log (attached file: app_at_least_once.log).
> - "continue" worked in Kafka Streams 3.1.2, but no longer works since Kafka 
> Streams 3.2.0, as rollback occurs.
> (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler
>  - 
> [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler]
> (2) Transaction abort and shutdown occur
> {code:java}
> 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Exception occurred during message 
> send:
> org.apache.kafka.common.errors.RecordTooL

Re: Consuming an entire partition with control messages

2023-07-27 Thread Matthias J. Sax
Well, `kafka-consumer-group.sh` can only display the difference between 
"committed offset" and "end offset". It cannot know what the "right" 
offset to be committed is. It's really the responsibility of the 
consumers to commit correctly.


-Matthias

On 7/27/23 1:03 AM, Vincent Maurin wrote:
Thank you Matthias for your answer, I open an issue on the aiokafka 
project as follow up, let's see how we can resolve it there 
https://github.com/aio-libs/aiokafka/issues/911


As mentioned in the issue, some tools like kafka-consumer-groups.sh also 
display a lag of "1" in this kind of situation


Best regards,

Vincent

On 13/06/2023 17:27, Matthias J. Sax wrote:

Sounds like a bug in aiokafka library to me.

If the last message in a topic partition is a tx-marker, the consumer 
should step over it, and report the correct position after the marker.


The official KafkaConsumer (ie, the Java one), does the exact same thing.


-Matthias

On 5/30/23 8:41 AM, Vincent Maurin wrote:

Hello !

I am working on an exactly once stream processors in Python, using
aiokafka client library. My program stores a state in memory, that is
recovered from a changelog topic, like in kafka streams.

On each processing loop, I am consuming messages, producing messages
to an output topics and to my changelog topic, within a transaction.

When I need to restart a runner, to restore the state in memory, I
have a routine consuming the changelog topic from the beginning to the
"end" with a read_commited isolation level. Here I am struggling to
define when to stop my recovery :
* my current (maybe) working solution is to loop over "poll" until
poll is not returning any messages anymore
* I tried to do more something based on the end offests, the checking
the consumer position, but with control messages at the end of the
partition, I am running into an issue where position is one below end
offsets, and doesn't go further

I had a quick look to
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
but it is a bit hard to figure out what is going on here

Best regards,
Vincent


Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2023-07-26 Thread Matthias J. Sax

One last question. What should happen for the following case:

KStream myStream = build.stream(...).map(...);
myStream.markAsPartiitoned().groupByKey().aggregate(...);
myStream.join(...)

The question is about the "fan-out" pattern. `myStream`, which is marked 
for partitioning, is fed into two downstream operations. Thus, it's 
clear that the aggregation won't trigger a rebalance. However, the 
fan-out happens before `markAsRepartiitoned` and thus I would assume 
that the join would trigger a repartitioning?


This question is important, because if we follow what I said above, 
`markAsRepartiitoned` returns a new KStream object, but does mutate the 
upstream KStream object, what is semantically two different things. It 
also has an impact on how we need to implement the feature. The KIP 
should explicitly explain this case.



-Matthias

On 7/26/23 4:58 PM, Shay Lin wrote:

Hi John,

Thanks for your reply. I updated the KIP to reflect the changes we
discussed in the thread today.
#1 is duly noted, I learned from the examples Sophie sent earlier! =)

In the new version, I also talked about why IQ and joins will not work with
the interface and talked about the mitigation. The proposal
now specifically states we are solving the unneeded partition problem when
IQ or join does not coexist in the kafka streams. In the concerns section,
the proposal talks about having a reverse mapping would make this new
interface compatible with IQ and join again but is subject to demand.

Let me know what you think. Thanks!
Shay



On Wed, Jul 26, 2023 at 2:35 PM John Roesler  wrote:


Hello Shay,

Thanks for the KIP!

I just took a look in preparation to vote, and there are two small-ish
things that I'd like to fix first. Apologies if this stuff has already come
up in the discussion thread; I only skimmed it.

1. The KIP only mentions the name of the method instead of providing a
code snippet showing exactly what the method signature will be in the
interface. Normally, KIPs do the latter because it removes all ambiguity
from the proposal. It also gives you an opportunity to write down the
Javadoc you would add to the method instead of just mentioning the points
that you plan to document.

2. The KIP lists some concerns, but not what you will do to mitigate them.
For example, the concern about IQ not behaving correctly. Will you disable
the use of the implicit partitioner downstream of one of these
cancellations? Or provide a new interface to supply the "reverse mapping"
you mentioned? Or include documentation in the Javadoc for how to deal with
the situation? I think there are a range of options for each of those
concerns, and we should state up front what we plan to do.

Thanks again!
-John

On 2023/07/24 20:33:05 Sophie Blee-Goldman wrote:

Thanks Shay! You and Matthias have convinced me, I'm happy with the

current

proposal. I think once you make the minor
updates to the KIP document this will be ready for voting again.

Cheers,
Sophie

On Mon, Jul 24, 2023 at 8:26 AM Shay Lin  wrote:


Hi Sophie and Matthias, thanks for your comments and replies.

1. Scope of change: KStreams only or KStreams/KTable
I took some time to digest your points, looking through how KStreams
triggers repartitions today. I noticed that `repartitionRequired`is a

flag

in KStreamImpl etc and not in KTableImpl etc. When I look further, in

the

case of KTable, instead of passing in a boolean flag, a repartition

node `

TableRepartitionMapNode` is directly created. I went back and

referenced

the two issue tickets KAFKA-10844 and KAFKA-4835, both requests were
focused on KStreams, i.e. not to change the partition why the input

streams

are already correctly keyed. Is it possible that in the case of KTable,
users always intend to repartition (change key) when they call on
aggregate? -- (this was written before I saw Matthias's comment)

Overall, based on the tickets, I see the benefit of doing a contained
change focusing on KStreams, i.e. repartitionRequired, which would

solve

the pain points nicely. If we ran into similar complaints/optimization
requests for KTable down the line, we can address them on top of

this(let

me know if we have these requests already, I might just be negligent).

2. API: markAsPartitioned() vs config
If we go with the KStreams only scope, markAsPartition() is more
adequate, i.e. maps nicely to repartitionRequired. There is a list of
NamedOperations that may or may not trigger repartition based on its
context(KStreams or KTable) which would make the implementation more
confusing.

3. KIP documentation: Thanks for providing the links to previous KIPs.

I

will be adding the three use cases and javadoc. I will also document

the

risks when it relates to IQ and Join.

Best,
Shay

On Fri, Jul 21, 2023 at 5:55 PM Matthias J. Sax 

wrote:



I agree that it could easily be misused. There is a few Jira tickets

for

cases when people want to "cancel" a repartition step. I would hope
those

Re: [DISCUSS] KIP-960: Support interactive queries (IQv2) for versioned state stores

2023-07-26 Thread Matthias J. Sax
Thanks for the KIP Alieh. Glad to see that we can add IQ to the new 
versioned stores!




Couple of questions:


single-key lookup with timestamp (upper) bound


Not sure if "bound" is the right term? In the end, it's a point lookup 
for a key plus timestamps, so it's an as-of timestamp (not a bound)? Of 
course, the returned record would most likely have a different (smaller) 
timestamp, but that's expected but does not make the passed in timestamp 
a "bound" IMHO?



single-key query with timestamp range
single-key all versions query


Should we also add `withLowerTimeBound` and `withUpperTimeBound` 
(similar to what `RangeQuery` has)?


Btw: I think we should not pass `long` for timestamps, but `Instance` types.

For time-range queries, do we iterate over the values in timestamp 
ascending order? If yes, the interface should specify it? Also, would it 
make sense to add reverse order (also ok to exclude and only do if there 
is demand in a follow up KIP; if not, please add to "Rejected 
alternatives" section).


Also, for time-range query, what are the exact bound for stuff we 
include? In the end, a value was a "valid range" (conceptually), so do 
we include a record if it's valid range overlaps the search time-range, 
or must it be fully included? Or would we only say, that the `validFrom` 
timestamp that is stored must be in the search range (what implies that 
the lower end would be a non-overlapping but "fully included" bound, 
while the upper end would be a overlapping bound).


For key-range / time-range queries: do we return the result in `` 
order or `` order? Also, what about reverse iterators?


About ` ValueIterator` -- think the JavaDocs have c error in it for 
`peekNextRecord` (also, should it be called `peekNextValue`? (Also some 
other JavaDocs seem to be incomplete and not describe all parameters?)



Thanks.



-Matthias



On 7/26/23 7:24 AM, Alieh Saeedi wrote:

Hi all,

I would like to propose a KIP to support IQv2 for versioned state stores.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-960%3A+Support+interactive+queries+%28IQv2%29+for+versioned+state+stores

Looking forward to your feedback!

Cheers,
Alieh



Re: Streams: Global Store Processors

2023-07-26 Thread Matthias J. Sax

That's correct.

And yes, it's a public contract and thus a KIP would be needed to change 
(or remove it). Deprecation implies that the API is kept for at least 3 
releases (ie, one year), plus it can only be removed in a major release.


For example, if we deprecated something in 2.1 release, we could have 
removed it in 3.0. If we deprecated something in 2.7 release, (there was 
only 2.8 and than 3.0), we can only remove in 4.0 (only 2.6 or earlier 
deprecated things could be removed in 3.0 to meet the 3 releases / one 
year requirement).


But I am not sure if we would really remove the processor, or actually 
change the restore path instead. Last but not least, if there would be 
KIP about removing it, the main goal would be to hide the store from 
user code, so we could still allow to register some "call-back 
processor" that has no access to the state store itself.



-Matthias

On 7/26/23 10:39 AM, Colt McNealy wrote:

Hi all,

In this JIRA ticket: https://issues.apache.org/jira/browse/KAFKA-7663 it's
documented that Global Stores are bypassed on restoration.

Consequently, the input topic to a Global Store needs to essentially be a
changelog topic as the keys and values are copied directly into the store.

I heard (perhaps in the Slack) a while ago that there was some conversation
about removing the ability to supply a Processor to the Global Store to
prevent users from tripping over that behavior. However, we currently rely
on the Processor to notify other parts of our application that things have
changed in the store (eg. for cache invalidation, metrics, etc). Obviously,
we make sure to respect the semantics of how the processor+global store
works for restoration etc...

It seems to me like the fact that we can pass in a Processor is a public
API contract, so it should be safe to rely on that...? Would it require a
KIP to change the fact that we can pass in a Processor? How much
deprecation notice would we have before we need to find a new solution?

Thanks,
Colt McNealy

*Founder, LittleHorse.dev*



[jira] [Commented] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once

2023-07-26 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17747630#comment-17747630
 ] 

Matthias J. Sax commented on KAFKA-15259:
-

Thanks for opening this ticket. Given that you are using "exaclty-once", does 
it actually make sense to configure the handler with "continue" – using 
"continue" implies data-loss and thus contradict the usage of "exaclty-once".

> Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once
> 
>
> Key: KAFKA-15259
> URL: https://issues.apache.org/jira/browse/KAFKA-15259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Tomonari Yamashita
>Priority: Major
> Attachments: Reproducer.java, app_at_least_once.log, 
> app_exactly_once.log
>
>
> [Problem]
>  - Kafka Streams does not continue processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE if using execute_once.
>  -- "CONTINUE will signal that Streams should ignore the issue and continue 
> processing"(1), so Kafka Streams should continue processing even if using 
> execute_once when ProductionExceptionHandlerResponse.CONTINUE used.
>  -- However, if using execute_once, Kafka Streams does not continue 
> processing due to rollback despite 
> ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down 
> as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) 
> [Environment]
>  - Kafka Streams 3.5.1
> [Reproduction procedure]
>  # Create "input-topic" topic and "output-topic"
>  # Put several messages on "input-topic"
>  # Execute a simple Kafka streams program that transfers too large messages 
> from "input-topic" to "output-topic" with execute_once and returns 
> ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the 
> producer. Please refer to the reproducer program (attached file: 
> Reproducer.java).
>  # ==> However, Kafka Streams does not continue processing due to rollback 
> despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread 
> shutdown as the default 
> behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to 
> the debug log (attached file: app_exactly_once.log).
>  ## My excepted behavior is that Kafka Streams should continue processing 
> even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE 
> used.
> [As far as my investigation]
>  - FYI, if using at_least_once instead of execute_once, Kafka Streams 
> continue processing without rollback when 
> ProductionExceptionHandlerResponse.CONTINUE is used. Please refer to the 
> debug log (attached file: app_at_least_once.log).
> (1) CONFIGURING A STREAMS APPLICATION > default.production.exception.handler
>  - 
> [https://kafka.apache.org/35/documentation/streams/developer-guide/config-streams.html#default-production-exception-handler]
> (2) Transaction abort and shutdown occur
> {code:java}
> 2023-07-26 21:27:19 DEBUG KafkaProducer:1073 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactionalId=java-kafka-streams-0_0] Exception occurred during message 
> send:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 ERROR RecordCollectorImpl:322 - stream-thread 
> [java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1] 
> stream-task [0_0] Error encountered sending record to topic output-topic for 
> task 0_0 due to:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> Exception handler choose to CONTINUE processing in spite of this error but 
> written offsets would not be recorded.
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 1188 bytes when serialized which is larger than 1048576, which is the 
> value of the max.request.size configuration.
> 2023-07-26 21:27:19 INFO  TransactionManager:393 - [Producer 
> clientId=java-kafka-streams-e3187cf9-5337-4155-a7cd-fd4e426b889d-StreamThread-1-0_0-producer,
>  transactio

[jira] [Updated] (KAFKA-15257) Support interactive queries (IQv2) with versioned state store

2023-07-26 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15257:

Labels: kip  (was: )

> Support interactive queries (IQv2) with versioned state store
> -
>
> Key: KAFKA-15257
> URL: https://issues.apache.org/jira/browse/KAFKA-15257
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Alieh Saeedi
>Assignee: Alieh Saeedi
>Priority: Major
>  Labels: kip
>
> Query types to consider include:
>  * single-key latest-value lookup
>  * single-key lookup with timestamp bound
>  * single-key query with timestamp range
>  * single-key all versions query
>  * key-range latest-value query
>  * key-range query with timestamp bound
>  * key-range query with timestamp range
>  * key-range all versions query
>  * all-keys latest-value query
>  * all-keys all versions (i.e., entire store) query
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15257) Support interactive queries (IQv2) with versioned state store

2023-07-26 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15257:

Description: 
KIP-960: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-960%3A+Support+interactive+queries+%28IQv2%29+for+versioned+state+stores]
 

Query types to consider include:
 * single-key latest-value lookup
 * single-key lookup with timestamp bound
 * single-key query with timestamp range
 * single-key all versions query

 * key-range latest-value query
 * key-range query with timestamp bound
 * key-range query with timestamp range
 * key-range all versions query

 * all-keys latest-value query
 * all-keys all versions (i.e., entire store) query

 

 

 

 

  was:
Query types to consider include:
 * single-key latest-value lookup
 * single-key lookup with timestamp bound
 * single-key query with timestamp range
 * single-key all versions query

 * key-range latest-value query
 * key-range query with timestamp bound
 * key-range query with timestamp range
 * key-range all versions query

 * all-keys latest-value query
 * all-keys all versions (i.e., entire store) query

 

 

 

 


> Support interactive queries (IQv2) with versioned state store
> -
>
> Key: KAFKA-15257
> URL: https://issues.apache.org/jira/browse/KAFKA-15257
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Alieh Saeedi
>Assignee: Alieh Saeedi
>Priority: Major
>  Labels: kip
>
> KIP-960: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-960%3A+Support+interactive+queries+%28IQv2%29+for+versioned+state+stores]
>  
> Query types to consider include:
>  * single-key latest-value lookup
>  * single-key lookup with timestamp bound
>  * single-key query with timestamp range
>  * single-key all versions query
>  * key-range latest-value query
>  * key-range query with timestamp bound
>  * key-range query with timestamp range
>  * key-range all versions query
>  * all-keys latest-value query
>  * all-keys all versions (i.e., entire store) query
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15257) Support interactive queries (IQv2) with versioned state store

2023-07-26 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15257:

Component/s: streams

> Support interactive queries (IQv2) with versioned state store
> -
>
> Key: KAFKA-15257
> URL: https://issues.apache.org/jira/browse/KAFKA-15257
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Alieh Saeedi
>Assignee: Alieh Saeedi
>Priority: Major
>
> Query types to consider include:
>  * single-key latest-value lookup
>  * single-key lookup with timestamp bound
>  * single-key query with timestamp range
>  * single-key all versions query
>  * key-range latest-value query
>  * key-range query with timestamp bound
>  * key-range query with timestamp range
>  * key-range all versions query
>  * all-keys latest-value query
>  * all-keys all versions (i.e., entire store) query
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15257) Support interactive queries (IQv2) with versioned state store

2023-07-26 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15257:

Issue Type: New Feature  (was: Task)

> Support interactive queries (IQv2) with versioned state store
> -
>
> Key: KAFKA-15257
> URL: https://issues.apache.org/jira/browse/KAFKA-15257
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Alieh Saeedi
>Assignee: Alieh Saeedi
>Priority: Major
>
> Query types to consider include:
>  * single-key latest-value lookup
>  * single-key lookup with timestamp bound
>  * single-key query with timestamp range
>  * single-key all versions query
>  * key-range latest-value query
>  * key-range query with timestamp bound
>  * key-range query with timestamp range
>  * key-range all versions query
>  * all-keys latest-value query
>  * all-keys all versions (i.e., entire store) query
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-12317) Relax non-null key requirement for left/outer KStream joins

2023-07-25 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17747210#comment-17747210
 ] 

Matthias J. Sax commented on KAFKA-12317:
-

Thanks for picking it up – I think it would be good to tackle all 4 related 
tickets at once, to make sure we don't get a zoo of different behavior for 
different operators.

[~ableegoldman] [~vvcephei] [~guozhang] [~cadonna] – do you think we need a KIP 
for this change?

> Relax non-null key requirement for left/outer KStream joins
> ---
>
> Key: KAFKA-12317
> URL: https://issues.apache.org/jira/browse/KAFKA-12317
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>            Reporter: Matthias J. Sax
>Assignee: Florin Akermann
>Priority: Major
>
> Currently, for a stream-streams and stream-table/globalTable join 
> KafkaStreams drops all stream records with a `null`-key (`null`-join-key for 
> stream-globalTable), because for a `null`-(join)key the join is undefined: 
> ie, we don't have an attribute the do the table lookup (we consider the 
> stream-record as malformed). Note, that we define the semantics of 
> _left/outer_ join as: keep the stream record if no matching join record was 
> found.
> We could relax the definition of _left_ stream-table/globalTable and 
> _left/outer_ stream-stream join though, and not drop `null`-(join)key stream 
> records, and call the ValueJoiner with a `null` "other-side" value instead: 
> if the stream record key (or join-key) is `null`, we could treat is as 
> "failed lookup" instead of treating the stream record as corrupted.
> If we make this change, users that want to keep the current behavior, can add 
> a `filter()` before the join to drop `null`-(join)key records from the stream 
> explicitly.
> Note that this change also requires to change the behavior if we insert a 
> repartition topic before the join: currently, we drop `null`-key record 
> before writing into the repartition topic (as we know they would be dropped 
> later anyway). We need to relax this behavior for a left stream-table and 
> left/outer stream-stream join. User need to be aware (ie, we might need to 
> put this into the docs and JavaDocs), that records with `null`-key would be 
> partitioned randomly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15251) Upgrade system test to use 3.5.1

2023-07-25 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15251:
---

 Summary: Upgrade system test to use 3.5.1
 Key: KAFKA-15251
 URL: https://issues.apache.org/jira/browse/KAFKA-15251
 Project: Kafka
  Issue Type: Test
  Components: streams, system tests
Reporter: Matthias J. Sax


3.5.1 was released and we should update the upgrade system tests accordingly to 
use the new version



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15251) Upgrade system test to use 3.5.1

2023-07-25 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15251:
---

 Summary: Upgrade system test to use 3.5.1
 Key: KAFKA-15251
 URL: https://issues.apache.org/jira/browse/KAFKA-15251
 Project: Kafka
  Issue Type: Test
  Components: streams, system tests
Reporter: Matthias J. Sax


3.5.1 was released and we should update the upgrade system tests accordingly to 
use the new version



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-955: Add stream-table join on foreign key

2023-07-25 Thread Matthias J. Sax

Igor,

thanks for the KIP. Interesting proposal. I am wondering a little bit 
about the use-case and semantics, and if it's really required to add 
what you propose? Please correct me if I am wrong.


In the end, a stream-table join is a "stream enrichment" (via a table 
lookup). Thus, it's inherently a 1:1 join (in contrast to a FK 
table-table join which is a n:1 join).


If this assumption is correct, and you have data for which the table 
side join attribute is in the value, you could actually repartition the 
table data using the join attribute as the PK of the table.


If my assumption is incorrect, and you say you want to have a 1:n join 
(note that I intentionally reversed from n:1 to 1:n), I would rather 
object, because it seems to violate the idea to "enrich" a stream, what 
means that each input record produced an output record, not multiple?


Also note: for a FK table-table join, we use the forgeinKeyExtractor to 
get the join attribute from the left input table (which corresponds to 
the KStream in your KIP; ie, it's a n:1 join), while you propose to use 
the foreignKeyExtractor to be applied to the KTable (which is the right 
input, and thus it would be a 1:n join).


Maybe you can clarify the use case a little bit. For the current KIP 
description I only see the 1:1 join case, what would mean we might not 
need such a feature?



-Matthias


On 7/24/23 11:36 AM, Igor Fomenko wrote:

Hello developers of the Kafka Streams,

I would like to start discussion on KIP-955: Add stream-table join on
foreign key

This KIP proposes the new API to join KStrem with KTable based on foreign
key relation.
Ths KIP was inspired by one of my former projects to integrate RDBMS
databases with data consumers using Change Data Capture and Kafka.
If we had the capability in Kafka Stream to join KStream with KTable on
foreign key this would simplify our implementation significantly.

Looking forward to your feedback and discussion.

Regards,

Igor



[jira] [Updated] (KAFKA-15242) FixedKeyProcessor testing is unusable

2023-07-24 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15242:

Component/s: streams

> FixedKeyProcessor testing is unusable
> -
>
> Key: KAFKA-15242
> URL: https://issues.apache.org/jira/browse/KAFKA-15242
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Zlstibor Veljkovic
>Priority: Major
>
> Using mock processor context to get the forwarded message doesn't work.
> Also there is not a well documented way for testing FixedKeyProcessors.
> Please see the repo at [https://github.com/zveljkovic/kafka-repro]
> but most important piece is test file with runtime and compile time errors:
> [https://github.com/zveljkovic/kafka-repro/blob/master/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-954: expand default DSL store configuration to custom types

2023-07-23 Thread Matthias J. Sax
the discussion
around CUSTOM is in the rejected alternatives but I'm happy to differ to
whatever the project conventions are :)


If it's matches existing `ROCKS_DB` or `IN_MEMORY` we just process it as

we
do know, and if know we assume it's a fully qualified class name and try to
instantiate it?

Note that there is no functionality for this kind of thing in
AbstractConfig (it's either a String validated enum or a class) so this
would be a departure from convention. Again, I'm happy to implement that if
it's preferred.


Also wondering how it would related to the existing `Stores` factory?


StoreTypeSpec will depend on Stores factory - they're one layer removed.
You can imagine that StoreTypeSpec is just a grouping of methods from the
Stores factory into a convenient package for default configuration
purposes.

Thanks again for all the detailed thoughts Matthias!

On Fri, Jul 21, 2023 at 11:50 AM Matthias J. Sax  wrote:


Thanks for the KIP. Overall I like the idea to close this gap.

However, I am wondering if we should close others gaps first? In
particular, IIRC, we have a few cases for which we only have a RocksDB
implementation for a store, and thus, adding an in-memory version for
these stores first, to make the current `IN_MEMORY` parameter work,
might be the first step?

In particular, this holds for the new versioned-store (but I actually
believe the is some other internal store with no in-memory
implementation). -- For `suppress()` it's actually other way around we
we only have an in-memory implementation. Do you aim to allow custom
stores for `suppress()`, too?

Btw: Should versioned stores also be covered by the KIP (ie,
`StoreTypeSpec`)? We did consider to add a new option `VERSIONED` to the
existing `default.dsl.store` config, but opted out for various reasons.

Last, I am not sure if the new parameter replacing the existing one is
the best way to go? Did you put the idea to add `CUSTOM` to the existing
config into rejected alternative. Personally, I would prefer to add
`CUSTOM` as I would like to optimize to easy of use for the majority of
users (which don't implement a custom store), but only switch to
in-memory sometimes. -- As an alternative, you would also just extend
`dsl.default.store` (it's just a String) and allow to pass in anything.
If it's matches existing `ROCKS_DB` or `IN_MEMORY` we just process it as
we do know, and if know we assume it's a fully qualified class name and
try to instantiate it? -- Given that we plan to keep the store-enum, is
seems cleaner to keep the existing config and keep both the config and
enum aligned to each other?


It's just preliminary thought. I will need to go back an take a more
detailed look into the code to grok how the propose `StoreTypeSpec`
falls into place. Also wondering how it would related to the existing
`Stores` factory?

-Matthias


On 7/21/23 6:45 AM, Colt McNealy wrote:

Sophie—

Thanks for chiming in here. +1 to the idea of specifying the ordering
guarantees that we make in the StorageTypeSpec javadocs.

Quick question then. Is the javadoc that says:


Order is not guaranteed as bytes lexicographical ordering might not

represent key order.

no longer correct, and should say:


Order guarantees depend on the underlying implementation of the

ReadOnlyKeyValueStore. For more information, please consult the
[StorageTypeSpec javadocs]()

Thanks,
Colt McNealy

*Founder, LittleHorse.dev*


On Thu, Jul 20, 2023 at 9:28 PM Sophie Blee-Goldman <

ableegold...@gmail.com>

wrote:


Hey Almog, first off, thanks for the KIP! I (and others) raised

concerns

over how restrictive the default.dsl.store config would be if not
extendable to custom store types, especially given that this seems to

be

the primary userbase of such a feature. At the time we didn't really

have

any better ideas for a clean way to achieve that, but what you

proposed

makes a lot of sense to me. Happy to see a good solution to this, and
hopefully others will share my satisfaction :P

I did have one quick piece of feedback which arose from an unrelated
question posed to the dev mailing list w/ subject line
"ReadOnlyKeyValueStore#range()
Semantics"
<https://lists.apache.org/thread/jbckmth8d3mcgg0rd670cpvsgwzqlwrm>. I
recommend checking out the full thread for context, but it made me

think

about how we can leverage the new StoreTypeSpec concept as an answer

to

the

long-standing question in Streams: where can we put guarantees of the
public contract for RocksDB (or other store implementations) when all

the

RocksDB stuff is technically internal.

Basically, I'm suggesting two things: first, call out in some way

(perhaps

the StoreTypeSpec javadocs) that each StoreTypeSpec is considered a

public

contract in itself and should outline any semantic guarantees it does,

or

does not, make. Second, we should add a note on ordering guarantees in

the

two OOTB specs: for RocksDB we assert that range queries will honor
serialized byte ordering,

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2023-07-21 Thread Matthias J. Sax
I agree that it could easily be misused. There is a few Jira tickets for 
cases when people want to "cancel" a repartition step. I would hope 
those tickets are linked to the KIP (if not, we should do this, and 
maybe even c those cases as motivation into the KIP itself)?


It's always a tricky question to what extend we want to guide users, and 
to what extend we need to give levers for advances case (and how to 
design those levers...) It's for sure a good idea to call out "use with 
case" in the JavaDocs for the new method.



-Matthias

On 7/21/23 3:34 PM, Sophie Blee-Goldman wrote:

I guess I felt a bit uneasy about how this could be used/abused while
reading the KIP, but if we truly believe this is an advanced feature, I'm
fine with the way things currently are. It doesn't feel like the best API,
but it does seem to be the best *possible* API given the way things are.

W.r.t the KTable notes, that all makes sense to me. I just wanted to lay
out all the potential cases to make sure we had our bases covered.

I still think an example or two would help, but the only thing I will
actually wait on before feeling comfortable enough to vote on this would be
a clear method signature (and maybe sample javadocs) in the "Public
Interfaces" section.

Thanks again for the KIP Shay! Hope I haven't dragged it out too much

On Fri, Jul 21, 2023 at 3:19 PM Matthias J. Sax  wrote:


Some thought about the API question.



A. kstream.groupBy(...).aggregate(...)


This can be re-writtten as

kstream.selectKey(...)
 .markAsRepartitioned()
 .groupByKey()
 .aggregate()

Given that `markAsRepartitoned` is an advanced feature, I think it would
be ok?



B. ktable.groupBy(...).aggregate(...)


For KTable aggregation, not sure how useful it would be? In the end, an
table aggregation does only make sense if we pick something from the
value, ie, we indeed change the key?



C. kstream.selectKey(...).join(ktable)


We can just insert a `markAsRepartitioned()` after `selectKey` to avoid
repartitioning of the left input KStream.



KStream.selectKey(...).toTable().join(...)


Not sure if I understand what you try to say with this example? In the
end, `selectKey(...).toTable()` would repartiton. If I know that one can
upsert directly, one inserts a `markAsRepartitioned()` in between.


In general, the use case seems to be that the key is not in the right
"format", or there is no key, but data was partitioned by a
value-attribute upstream and we just want to extract this
value-attribute into the key. Both seems to be KStream cases?


-Matthias



On 7/15/23 1:43 PM, Sophie Blee-Goldman wrote:

Hey Shay, while I don't have any specific concerns about the new public

API

in this KIP, I'd like to better understand how this feature will work
before I vote. We should document the behavior of this new operator

clearly

in the KIP as well -- you don't necessarily need to write the complete
javadocs up front, but it should be possible for a user to read the KIP

and

then understand how this feature will work and how they would need to

apply

it.

To that end, I recommend framing this proposal with a few examples to

help

clarify the semantics. When and where can you apply the

markAsPartitioned()

operator? Some suggestions below.

Specific notes:

1. The KIP opens with "Each key changing operation in Kafka Streams
(selectKey, map, transform, etc.) now leads to automatic repartition

before

an aggregation." We should change "aggregation" to "stateful operation"

as

this is true for things like joins as well as aggregations
2. The callout on IQ makes me a bit uncomfortable -- basically it says

this

should not be a concern "if we use markAsPartitioned correctly". Does

this

mean if we, the devs implementing this, write the feature correctly? Or

is

it saying that this won't be a problem as long as "we", the users of this
feature, use it correctly"? Just wondering if you've put any thought into
how this would work yet (I personally have not)
3. The KIP should lay out the proposed API exactly, even if there's only
one new method. Check out this KIP
<

https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL


(or this KIP
<

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=128651808

)
for a good reference on what the Public Interfaces section should include
4. Regarding the proposed API itself, I wonder if KStream is really the
most appropriate interface for the new operator. A repartition can be
triggered on just a KTable. Here's where some examples would help.

Perhaps

we could focus on these three cases:

A. kstream.groupBy(...).aggregate(...)
B. ktable.groupBy(...).aggregate(...)
C. kstream.selectKey(...).join(ktable)

I'm sure someone will correct me if I'm missing any additional vital
examples, but at the very least, these are the three to con

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2023-07-21 Thread Matthias J. Sax
g
partitioned" is conceptually a property of other operations applied to a
KStream/KTable, rather than an operation itself. So rather than making this
a DSL operator itself, what if we added it to the Grouped and various
Joined configuration classes? It would allow us to more carefully hit only
the relevant parts of the DSL, so there are no questions about whether/when
to throw errors when the operator is incorrectly applied -- there would be
no way to apply it incorrectly. The main drawback I can think of is simply
that this touches on a larger surface area of the API. I personally don't
believe this is a good enough reason to make it a DSL operator as one could
make that argument for nearly any kind of KStream or KTable operator
configuration going forward, and would explode the KStream/KTable API
surface area instead. Perhaps this was discussed during the previous
iteration of this KIP, or I'm missing something here, so I just wanted to
put this out there and see what people think

Either way, thanks for picking up this KIP. It's been a long time coming :)

-Sophie





On Mon, Jul 10, 2023 at 2:05 PM Shay Lin  wrote:


Hi all,

It's been a few days so I went ahead with editing the KIP, the main change
is on the method name

https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling
.
I will follow up with a VOTE separately.

Best,
Shay

On Thu, Jun 29, 2023 at 4:52 PM Matthias J. Sax  wrote:


Shay,

thanks for picking up this KIP. It's a pity that the discussion stalled
for such a long time.

As expressed previously, I am happy with the name `markAsPartitioned()`
and also believe it's ok to just document the impact and leave it to the
user to do the right thing.

If we really get a lot of users that ask about it, because they did not
do the right thing, we could still add something (eg, a reverse-mapper
function) in a follow-up KIP. But we don't know if it's necessary; thus,
making a small incremental step sounds like a good approach to me.

Let's see if others agree or not.


-Matthias

On 6/28/23 5:29 PM, Shay Lin wrote:

Hi all,

Great discussion thread. May I take this KIP up? If it’s alright my

plan

is

to update the KIP with the operator `markAsPartitioned()`.

As you have discussed and pointed out, there are implications to

downstream

joins or aggregation operations. Still, the operator is intended for
advanced users so my two cents is it would be a valuable addition
nonetheless. We could add this as a caution/consideration as part of

the

java doc.

Let me know, thanks.
Shay









Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!

2023-07-21 Thread Matthias J. Sax
I am not a clients (or threading) expert, but I tend to agree to Colin's 
concerns.


In particular, it would be nice to see an example how you intent to use 
the API (I am not familiar with Kotlin or it's co-routins), to better 
understand what this changes help to solve to begin with.


Opening up the consumer sounds potentially dangerous and we should 
weight opportunity and risk before making a decision. So far, I see 
risks but do not understand the opportunity you are after.



-Matthias

On 7/14/23 11:43 AM, Kirk True wrote:

Hi Erik,

Thanks for the KIP!

I empathize with your frustration over the radio silence. It gets like that 
sometimes, and I apologize for my lack of feedback.

I’d personally like to see this lively exchange move over to the DISCUSS thread 
you’d created before.

Thanks,
Kirk


On Jul 14, 2023, at 1:33 AM, Erik van Oosten  
wrote:

Hi Colin,

The way I understood Philp's message is that KIP-944 also plays nice with 
KIP-945. But I might be mistaken.

Regardless, KIP-945 does /not/ resolve the underlying problem (the need for 
nested consumer invocations) because it has the explicit goal of not changing 
the user facing API.


... KIP-945 but haven't posted a DISCUSS thread yet


There is a thread called 'KafkaConsumer refactor proposal', but indeed no 
official discussion yet.


I really don't want to be debugging complex interactions between Java 
thread-local variables and green threads.


In that email thread, I proposed an API change in which callbacks are no longer 
needed. The proposal completely removes the need for such complex interactions. 
In addition, it gives clients the ability to process at full speed even while a 
coorperative rebalance is ongoing.

Regards,
 Erik.

Op 14-07-2023 om 00:36 schreef Colin McCabe:

HI Philip & Erik,

Hmm... if we agree that KIP-945 addresses this use case, I think it would be 
better to just focus on that KIP. Fundamentally it's a better and cleaner model 
than a complex scheme involving thread-local variables. I really don't want to 
be debugging complex interactions between Java thread-local variables and green 
threads.

It also generally helps to have some use-cases in mind when writing these 
things. If we get feedback about what would be useful for async runtimes, that 
would probably help improve and focus KIP-945. By the way, I can see you have a 
draft on the wiki for KIP-945 but haven't posted a DISCUSS thread yet, so I 
assume it's not ready for review yet ;)

best,
Colin


On Tue, Jul 11, 2023, at 12:24, Philip Nee wrote:

Hey Erik - Another thing I want to add to my comment is.  We are in-process
of re-writing the KafkaConsumer, and I think your proposal would work in
the new consumer because we are going to separate the user thread and the
background thread.  Here is the 1-pager, and we are in process of
converting this in to KIP-945.

Thanks,
P

On Tue, Jul 11, 2023 at 10:33 AM Philip Nee  wrote:


Hey Erik,

Sorry for holding up this email for a few days since Colin's response
includes some of my concerns.  I'm in favor of this KIP, and I think your
approach seems safe.  Of course, I probably missed something therefore I
think this KIP needs to cover different use cases to demonstrate it doesn't
cause any unsafe access. I think this can be demonstrated via diagrams and
some code in the KIP.

Thanks,
P

On Sat, Jul 8, 2023 at 12:28 PM Erik van Oosten
 wrote:


Hello Colin,

  >> In KIP-944, the callback thread can only delegate to another thread
after reading from and writing to a threadlocal variable, providing the
barriers right there.

  > I don't see any documentation that accessing thread local variables
provides a total store or load barrier. Do you have such documentation?
It seems like if this were the case, we could eliminate volatile
variables from most of the code base.

Now I was imprecise. The thread-locals are only somewhat involved. In
the KIP proposal the callback thread reads an access key from a
thread-local variable. It then needs to pass that access key to another
thread, which then can set it on its own thread-local variable. The act
of passing a value from one thread to another implies that a memory
barrier needs to be passed. However, this is all not so relevant since
there is no need to pass the access key back when the other thread is
done.

But now I think about it a bit more, the locking mechanism runs in a
synchronized block. If I remember correctly this should be enough to
pass read and write barriers.

  >> In the current implementation the consumer is also invoked from
random threads. If it works now, it should continue to work.
  > I'm not sure what you're referring to. Can you expand on this?

Any invocation of the consumer (e.g. method poll) is not from a thread
managed by the consumer. This is what I was assuming you meant with the
term 'random thread'.

  > Hmm, not sure what you mean by "cooperate with blocking code." If you
have 10 green threads you're multiplexing on to one 

Re: [DISCUSS] KIP-954: expand default DSL store configuration to custom types

2023-07-21 Thread Matthias J. Sax

Thanks for the KIP. Overall I like the idea to close this gap.

However, I am wondering if we should close others gaps first? In 
particular, IIRC, we have a few cases for which we only have a RocksDB 
implementation for a store, and thus, adding an in-memory version for 
these stores first, to make the current `IN_MEMORY` parameter work, 
might be the first step?


In particular, this holds for the new versioned-store (but I actually 
believe the is some other internal store with no in-memory 
implementation). -- For `suppress()` it's actually other way around we 
we only have an in-memory implementation. Do you aim to allow custom 
stores for `suppress()`, too?


Btw: Should versioned stores also be covered by the KIP (ie, 
`StoreTypeSpec`)? We did consider to add a new option `VERSIONED` to the 
existing `default.dsl.store` config, but opted out for various reasons.


Last, I am not sure if the new parameter replacing the existing one is 
the best way to go? Did you put the idea to add `CUSTOM` to the existing 
config into rejected alternative. Personally, I would prefer to add 
`CUSTOM` as I would like to optimize to easy of use for the majority of 
users (which don't implement a custom store), but only switch to 
in-memory sometimes. -- As an alternative, you would also just extend 
`dsl.default.store` (it's just a String) and allow to pass in anything. 
If it's matches existing `ROCKS_DB` or `IN_MEMORY` we just process it as 
we do know, and if know we assume it's a fully qualified class name and 
try to instantiate it? -- Given that we plan to keep the store-enum, is 
seems cleaner to keep the existing config and keep both the config and 
enum aligned to each other?



It's just preliminary thought. I will need to go back an take a more 
detailed look into the code to grok how the propose `StoreTypeSpec` 
falls into place. Also wondering how it would related to the existing 
`Stores` factory?


-Matthias


On 7/21/23 6:45 AM, Colt McNealy wrote:

Sophie—

Thanks for chiming in here. +1 to the idea of specifying the ordering
guarantees that we make in the StorageTypeSpec javadocs.

Quick question then. Is the javadoc that says:


Order is not guaranteed as bytes lexicographical ordering might not

represent key order.

no longer correct, and should say:


Order guarantees depend on the underlying implementation of the

ReadOnlyKeyValueStore. For more information, please consult the
[StorageTypeSpec javadocs]()

Thanks,
Colt McNealy

*Founder, LittleHorse.dev*


On Thu, Jul 20, 2023 at 9:28 PM Sophie Blee-Goldman 
wrote:


Hey Almog, first off, thanks for the KIP! I (and others) raised concerns
over how restrictive the default.dsl.store config would be if not
extendable to custom store types, especially given that this seems to be
the primary userbase of such a feature. At the time we didn't really have
any better ideas for a clean way to achieve that, but what you proposed
makes a lot of sense to me. Happy to see a good solution to this, and
hopefully others will share my satisfaction :P

I did have one quick piece of feedback which arose from an unrelated
question posed to the dev mailing list w/ subject line
"ReadOnlyKeyValueStore#range()
Semantics"
. I
recommend checking out the full thread for context, but it made me think
about how we can leverage the new StoreTypeSpec concept as an answer to the
long-standing question in Streams: where can we put guarantees of the
public contract for RocksDB (or other store implementations) when all the
RocksDB stuff is technically internal.

Basically, I'm suggesting two things: first, call out in some way (perhaps
the StoreTypeSpec javadocs) that each StoreTypeSpec is considered a public
contract in itself and should outline any semantic guarantees it does, or
does not, make. Second, we should add a note on ordering guarantees in the
two OOTB specs: for RocksDB we assert that range queries will honor
serialized byte ordering, whereas the InMemory flavor gives no ordering
guarantee whatsoever at this time.

Thoughts?

-Sophie

On Thu, Jul 20, 2023 at 4:28 PM Almog Gavra  wrote:


Hi All,

I would like to propose a KIP to expand support for default store types
(KIP-591) to encompass custom store implementations:



https://cwiki.apache.org/confluence/display/KAFKA/KIP-954%3A+expand+default+DSL+store+configuration+to+custom+types


Looking forward to your feedback!

Cheers,
Almog







[jira] [Comment Edited] (KAFKA-15116) Kafka Streams processing blocked during rebalance

2023-07-21 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17745690#comment-17745690
 ] 

Matthias J. Sax edited comment on KAFKA-15116 at 7/21/23 6:18 PM:
--

{quote}The internal store is shared across stream threads.
{quote}
That is not how Kafka Streams works. If you have a store, the store is sharded 
and each StreamsThreads has it's own shard. A single key goes into a single 
shard (ie, must go into a single shard – otherwise you break the system) by 
partitioning the data by key.
{quote}There is a consumer outside of kafka streams that is reading 
"read_committed" messages that populates the store and unblocks the processor.
{quote}
Are you saying you are reading the corresponding changelog topic? That is not 
recommended in general, as it's considered an implementation detail. It's still 
not clear why anything would be "blocked" or how this external consumer would 
do the unblocking (blocking to me really mean "to wait / block the thread").
{quote}In this context I'm talking about eos and the transaction being 
committed and therefore the consumer being able to read the "read_committed" 
message.
{quote}
Well yes, if Kafka Streams commit, app pending transactions are committed. So 
if you are saying, you want to accumulate 3 message for a key, but so far only 
2 message got processed, 2 messages would be written into the state store and 
changelog topic on commit. But that is by design and correct. As said above, 
you should not read from the changelog topic. The right thing to do would be, 
to change your processor and let it write into an output topic if all 3 
messages are there (and never write a partial result into this topic), and read 
from this output topic instead of the changelog (in case I did understand the 
scenario you describe correctly).
{quote}I think ultimately our problem is that the stream thread carries on 
processing messages during a rebalance but does not complete them (transaction 
commit)
{quote}
I think you make incorrect assumption how processing works (and what a 
transaction in Kafka is). A transaction is really just to guard against 
failures – it has no _semantic_ meaning in Kafka that would align to your 
business logic (there is no "begin TX" or "commit TX" calls exposed in Kafka 
Streams that you could use to align TX to your business logic – and you don't 
have too).
{quote}Even though pausing processing during a rebalance probably shouldn't be 
default behaviour it would be ideal for us if it were configurable.
{quote}
This was the old "eager rebalancing" and it was changed because there is 
actually no reason to "stop the world" during a rebalance. Also I am not sure 
how it would help your case? Even we stop processing during a rebalance, we 
would need to commit the open TX when rebalancing starts. So nothing really 
changes.
{quote}Pausing consumption feels valid especially when there is a dependency 
between messages with the same partion key?
{quote}
How should the system know if there is a dependency? It seems you are not 
writing your app in the proper way and may incorrect assumptions how Kafka is 
designed?


was (Author: mjsax):
{quote}The internal store is shared across stream threads.
{quote}
That is not how Kafka Streams works. If you have a store, the store is sharded 
and each StreamsThreads has it's own shard.
{quote}There is a consumer outside of kafka streams that is reading 
"read_committed" messages that populates the store and unblocks the processor.
{quote}
Are you saying you are reading the corresponding changelog topic? That is not 
recommended in general, as it's considered an implementation detail. It's still 
not clear why anything would be "blocked" or how this external consumer would 
do the unblocking (blocking to me really mean "to wait / block the thread").
{quote}In this context I'm talking about eos and the transaction being 
committed and therefore the consumer being able to read the "read_committed" 
message.
{quote}
Well yes, if Kafka Streams commit, app pending transactions are committed. So 
if you are saying, you want to accumulate 3 message for a key, but so far only 
2 message got processed, 2 messages would be written into the state store and 
changelog topic on commit. But that is by design and correct. As said above, 
you should not read from the changelog topic. The right thing to do would be, 
to change your processor and let it write into an output topic if all 3 
messages are there (and never write a partial result into this topic), and read 
from this output topic instead of the changelog (in case I did understand the 
scenario you describe correctly).
{quote}I think ultimately our problem is that the stream thread carries on 
processing messages during a rebal

[jira] [Commented] (KAFKA-15116) Kafka Streams processing blocked during rebalance

2023-07-21 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17745690#comment-17745690
 ] 

Matthias J. Sax commented on KAFKA-15116:
-

{quote}The internal store is shared across stream threads.
{quote}
That is not how Kafka Streams works. If you have a store, the store is sharded 
and each StreamsThreads has it's own shard.
{quote}There is a consumer outside of kafka streams that is reading 
"read_committed" messages that populates the store and unblocks the processor.
{quote}
Are you saying you are reading the corresponding changelog topic? That is not 
recommended in general, as it's considered an implementation detail. It's still 
not clear why anything would be "blocked" or how this external consumer would 
do the unblocking (blocking to me really mean "to wait / block the thread").
{quote}In this context I'm talking about eos and the transaction being 
committed and therefore the consumer being able to read the "read_committed" 
message.
{quote}
Well yes, if Kafka Streams commit, app pending transactions are committed. So 
if you are saying, you want to accumulate 3 message for a key, but so far only 
2 message got processed, 2 messages would be written into the state store and 
changelog topic on commit. But that is by design and correct. As said above, 
you should not read from the changelog topic. The right thing to do would be, 
to change your processor and let it write into an output topic if all 3 
messages are there (and never write a partial result into this topic), and read 
from this output topic instead of the changelog (in case I did understand the 
scenario you describe correctly).
{quote}I think ultimately our problem is that the stream thread carries on 
processing messages during a rebalance but does not complete them (transaction 
commit)
{quote}
I think you make incorrect assumption how processing works (and what a 
transaction in Kafka is). A transaction is really just to guard against 
failures – it has no _semantic_ meaning in Kafka that would align to your 
business logic (there is no "begin TX" or "commit TX" calls exposed in Kafka 
Streams that you could use to align TX to your business logic – and you don't 
have too).
{quote}Even though pausing processing during a rebalance probably shouldn't be 
default behaviour it would be ideal for us if it were configurable.
{quote}
This was the old "eager rebalancing" and it was changed because there is 
actually no reason to "stop the world" during a rebalance. Also I am not sure 
how it would help your case? Even we stop processing during a rebalance, we 
would need to commit the open TX when rebalancing starts. So nothing really 
changes.
{quote}Pausing consumption feels valid especially when there is a dependency 
between messages with the same partion key?
{quote}
How should the system know if there is a dependency? It seems you are not 
writing your app in the proper way and may incorrect assumptions how Kafka is 
designed?

> Kafka Streams processing blocked during rebalance
> -
>
> Key: KAFKA-15116
> URL: https://issues.apache.org/jira/browse/KAFKA-15116
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.0
>Reporter: David Gammon
>Priority: Major
>
> We have a Kafka Streams application that simply takes a messages, processes 
> it and then produces an event out the other side. The complexity is that 
> there is a requirement that all events with the same partition key must be 
> committed before the next message  is processed.
> This works most of the time flawlessly but we have started to see problems 
> during deployments where the first message blocks the second message during a 
> rebalance because the first message isn’t committed before the second message 
> is processed. This ultimately results in transactions timing out and more 
> rebalancing.
> We’ve tried lots of configuration to get the behaviour we require with no 
> luck. We’ve now put in a temporary fix so that Kafka Streams works with our 
> framework but it feels like this might be a missing feature or potentially a 
> bug.
> +Example+
> Given:
>  * We have two messages (InA and InB).
>  * Both messages have the same partition key.
>  * A rebalance is in progress so streams is no longer able to commit.
> When:
>  # Message InA -> processor -> OutA (not committed)
>  # Message InB -> processor -> blocked because #1 has not been committed



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15190) Allow configuring a streams process ID

2023-07-19 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744726#comment-17744726
 ] 

Matthias J. Sax commented on KAFKA-15190:
-

{quote}but although {{StreamsPartitionAssignor}} sometimes calls it a client ID 
and sometimes a process ID it's a {{UUID}} so I assume it really is the process 
ID.
{quote}
Thanks for calling this out. You are right; I missed this point.

As you did mention "max recovery lag", I assume you have a stateful app that 
uses in-memory stores only?

Another thing coming to my mind: the `client.id` has actually different purpose 
and should not be unique per `KafkaStreams` instance, but should be the _same_ 
for all instances (the name is a little bit mis-leading). For example, if you 
configure quotas, it's based on `client.id` and you usually want quotas to be 
set per application, not per instance.

> Allow configuring a streams process ID
> --
>
> Key: KAFKA-15190
> URL: https://issues.apache.org/jira/browse/KAFKA-15190
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Joe Wreschnig
>Priority: Major
>  Labels: needs-kip
>
> We run our Kafka Streams applications in containers with no persistent 
> storage, and therefore the mitigation of persisting process ID the state 
> directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during 
> restarts.
> However, we do have a persistent container ID (from a Kubernetes 
> StatefulSet). Would it be possible to expose a configuration option to let us 
> set the streams process ID ourselves?
> We are already using this ID as our group.instance.id - would it make sense 
> to have the process ID be automatically derived from this (plus 
> application/client IDs) if it's set? The two IDs seem to have overlapping 
> goals of identifying "this consumer" across restarts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15190) Allow configuring a streams process ID

2023-07-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744288#comment-17744288
 ] 

Matthias J. Sax edited comment on KAFKA-15190 at 7/18/23 6:22 PM:
--

One more thing: the `process.id` is actually only used as part of the 
`client.id` iff not `client.id` config is set. – Hence, setting the `client.id` 
should avoid the issue of task shuffling (and the rebalance in itself should 
not be an issue, as it's cheap)?


was (Author: mjsax):
One more thing: the `process.id` is actually only used as part of the 
`client.id` iff not `client.id` config is set. – Hence, setting the `client.id` 
should avoid the issue of rebalancing (and task shuffling)?

> Allow configuring a streams process ID
> --
>
> Key: KAFKA-15190
> URL: https://issues.apache.org/jira/browse/KAFKA-15190
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Joe Wreschnig
>Priority: Major
>  Labels: needs-kip
>
> We run our Kafka Streams applications in containers with no persistent 
> storage, and therefore the mitigation of persisting process ID the state 
> directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during 
> restarts.
> However, we do have a persistent container ID (from a Kubernetes 
> StatefulSet). Would it be possible to expose a configuration option to let us 
> set the streams process ID ourselves?
> We are already using this ID as our group.instance.id - would it make sense 
> to have the process ID be automatically derived from this (plus 
> application/client IDs) if it's set? The two IDs seem to have overlapping 
> goals of identifying "this consumer" across restarts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15190) Allow configuring a streams process ID

2023-07-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744288#comment-17744288
 ] 

Matthias J. Sax commented on KAFKA-15190:
-

One more thing: the `process.id` is actually only used as part of the 
`client.id` iff not `client.id` config is set. – Hence, setting the `client.id` 
should avoid the issue of rebalancing (and task shuffling)?

> Allow configuring a streams process ID
> --
>
> Key: KAFKA-15190
> URL: https://issues.apache.org/jira/browse/KAFKA-15190
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Joe Wreschnig
>Priority: Major
>  Labels: needs-kip
>
> We run our Kafka Streams applications in containers with no persistent 
> storage, and therefore the mitigation of persisting process ID the state 
> directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during 
> restarts.
> However, we do have a persistent container ID (from a Kubernetes 
> StatefulSet). Would it be possible to expose a configuration option to let us 
> set the streams process ID ourselves?
> We are already using this ID as our group.instance.id - would it make sense 
> to have the process ID be automatically derived from this (plus 
> application/client IDs) if it's set? The two IDs seem to have overlapping 
> goals of identifying "this consumer" across restarts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts

2023-07-12 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-13295.
-
Resolution: Fixed

With the new restore-thread, this issue should be resolved implicilty.

> Long restoration times for new tasks can lead to transaction timeouts
> -
>
> Key: KAFKA-13295
> URL: https://issues.apache.org/jira/browse/KAFKA-13295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sagar Rao
>Priority: Critical
>  Labels: eos, new-streams-runtime-should-fix
>
> In some EOS applications with relatively long restoration times we've noticed 
> a series of ProducerFencedExceptions occurring during/immediately after 
> restoration. The broker logs were able to confirm these were due to 
> transactions timing out.
> In Streams, it turns out we automatically begin a new txn when calling 
> {{send}} (if there isn’t already one in flight). A {{send}} occurs often 
> outside a commit during active processing (eg writing to the changelog), 
> leaving the txn open until the next commit. And if a StreamThread has been 
> actively processing when a rebalance results in a new stateful task without 
> revoking any existing tasks, the thread won’t actually commit this open txn 
> before it goes back into the restoration phase while it builds up state for 
> the new task. So the in-flight transaction is left open during restoration, 
> during which the StreamThread only consumes from the changelog without 
> committing, leaving it vulnerable to timing out when restoration times exceed 
> the configured transaction.timeout.ms for the producer client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts

2023-07-12 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-13295.
-
Resolution: Fixed

With the new restore-thread, this issue should be resolved implicilty.

> Long restoration times for new tasks can lead to transaction timeouts
> -
>
> Key: KAFKA-13295
> URL: https://issues.apache.org/jira/browse/KAFKA-13295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sagar Rao
>Priority: Critical
>  Labels: eos, new-streams-runtime-should-fix
>
> In some EOS applications with relatively long restoration times we've noticed 
> a series of ProducerFencedExceptions occurring during/immediately after 
> restoration. The broker logs were able to confirm these were due to 
> transactions timing out.
> In Streams, it turns out we automatically begin a new txn when calling 
> {{send}} (if there isn’t already one in flight). A {{send}} occurs often 
> outside a commit during active processing (eg writing to the changelog), 
> leaving the txn open until the next commit. And if a StreamThread has been 
> actively processing when a rebalance results in a new stateful task without 
> revoking any existing tasks, the thread won’t actually commit this open txn 
> before it goes back into the restoration phase while it builds up state for 
> the new task. So the in-flight transaction is left open during restoration, 
> during which the StreamThread only consumes from the changelog without 
> committing, leaving it vulnerable to timing out when restoration times exceed 
> the configured transaction.timeout.ms for the producer client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-941 Range queries to accept null lower and upper bounds

2023-07-10 Thread Matthias J. Sax

+1 (binding)

On 7/10/23 12:13 PM, Bill Bejeck wrote:

Hi Lucia,

Thanks for the KIP! It will be a welcomed improvement.

+1(binding)

-Bill

On Mon, Jul 10, 2023 at 2:40 PM Lucia Cerchie 
wrote:


Hello everyone,

I'd like to call a vote on KIP-941
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-941%3A+Range+queries+to+accept+null+lower+and+upper+bounds

.

It has been under discussion since June 26, and has received edits to the
KIP and approval by discussion participants.

Best,
Lucia

--

[image: Confluent] 
Lucia Cerchie
Developer Advocate
Follow us: [image: Blog]
<
https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog

[image:

Twitter] [image: Slack]
[image: YouTube]


[image: Try Confluent Cloud for Free]
<
https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound_source=gmail_medium=organic








Re: [DISCUSS] KIP-932: Queues for Kafka

2023-07-10 Thread Matthias J. Sax

There is another detail about EOS that is important I guess.

Messages written into topic-partitions, are only marked as 
"transactional", but when we commit (or abort), we only write an 
additional "tx marker" into the partition (the original message is not 
touched). If we deliver "pending" messages, the client would need 
additional logic to buffer pending messages, plus logic to evaluate 
tx-markers to determine if/when a pending record could be processed if 
committed or discarded if aborted. The current client has nothing like 
this built-in, because we don't need to (as explained in the original 
message, why we don't read beyond the LSO).


Or we would need to have an different way to let the client know when a 
pending message is not pending any longer, and if it was committed or 
aborted. For example, we could change the client so it would always drop 
pending messages, and it would be the broker's responsibility to 
re-deliver them after they got committed. So the client won't need to 
buffer (good), however given how the broker works, this seems to be very 
undesirable to do it this way.


Maybe there are other options? In the end, it's always going to be much 
more complex, so it's not clear if it would be worth the effort or just 
do what we do know and not read beyond the LSO and keep it simple?



-Matthias

On 7/10/23 2:43 AM, Dániel Urbán wrote:

Yes, I think it's clear now, thank you.
I agree that allowing reading behind the LSO would require more work on the
broker side (we would need 1 more state for the messages, and transition
when the LSO moves forward), but I don't see the extra complexity on the
consumer side. Based on the KIP so far, brokers will be able to return
specific batches/messages to queue consumers - consumers will need to be
able to skip messages in case another consumer of the same group has
already acquired/acked those. If we have this logic present in the protocol
and the clients, consumers could skip pending messages using the same
mechanism, and only the broker would need to know *why* exactly a specific
record/batch is skipped.

I don't think that this feature would be too important, but compared to the
complexity of the KIP, 1 more state doesn't seem too complicated to me.

Thanks,
Daniel

Matthias J. Sax  ezt írta (időpont: 2023. júl. 10., H,
7:22):


Daniel, sure.

To allow the client to filter aborted messages, the broker currently
attaches metadata that tell the client which records were aborted. But
the first message after the LSO is a messages in pending state, ie, it
was neither committed nor aborted yet, so it's not possible to filter or
deliver it. Thus, the broker cannot provide this metadata (not sure if
the client could filter without this metadata?)

The main reason why this happens broker side is to avoid that the client
needs to buffer pending messages "indefinitely" until the TX might
eventually commit or abort, and thus put a lot a memory pressure on the
client. For the "classic" case, the situation is  more severe as we
guarantee ordered delivery, and thus, the client would need to buffer
everything after the LSO. -- While it's relaxed for queuing as we might
not guarantee order (ie, instead of buffering everything, only pending
messages must be buffered), it would still imply a huge additional
burden on tracking metadata (for both the broker and the consumer), and
the wire protocol, and I am already worried about the metadata we might
need to track for queuing in general.

Does this make sense?


-Matthias



On 7/7/23 01:35, Dániel Urbán wrote:

Hi Matthias,
Can you please elaborate on this: "First, you need to understand that
aborted records are filtered client side, and thus for "read-committed"

we

can never read beyond the LSO, and the same seems to apply for queuing."
I don't understand the connection here - what does skipping aborted

records

have to do with the LSO? As you said, aborted message filtering is done

on

the client side (in consumers, yes, but not sure if it has to be the same
for queues), but being blocked on the LSO is the responsibility of the
broker, isn't it? My thought was that the broker could act differently

when

working with queues and read_committed isolation.
Thanks,
Daniel

On Thu, Jul 6, 2023 at 7:26 PM Matthias J. Sax  wrote:


Thanks for the KIP.

It seems we are in very early stage, and some very important sections in
the KIP are still marked as TODO. In particular, I am curious about the
protocol changes, how the "queuing state" will be represented and made
durable, and all the error edge case / fail-over / fencing
(broker/clients) that we need to put in place.


A few other comments/question from my side:

(1) Fetch from follower: this was already touched on, but the point is
really that the consumer does not decide about it, but the broker does.
When a consumer sends it's first fetch request it will always go to the

Re: Testing FixedKeyProcessor implementation using unit tests

2023-07-10 Thread Matthias J. Sax

Not sure right now, but could be a bug.

Can you maybe share the full stack trace and the test program?

-Matthias

On 7/10/23 3:47 AM, EXT.Zlatibor.Veljkovic wrote:

Hi, I am using kafka-streams-test-utils and have problem with testing 
FixedKeyProcessor [KIP-820 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-820%3A+Extend+KStream+process+with+new+Processor+API#KIP820:ExtendKStreamprocesswithnewProcessorAPI-InfrastructureforFixedKeyRecords].

Using mock processor context to get the forwarded message doesn't work.

class org.apache.kafka.streams.processor.api.MockProcessorContext cannot be 
cast to class org.apache.kafka.streams.processor.api.FixedKeyProcessorContext

Anything I can do to get forwarded records?

Thanks,
Zed



Re: [ANNOUNCE] New committer: Greg Harris

2023-07-10 Thread Matthias J. Sax

Congrats!

On 7/10/23 8:45 AM, Chris Egerton wrote:

Hi all,

The PMC for Apache Kafka has invited Greg Harris to become a committer, and
we are happy to announce that he has accepted!

Greg has been contributing to Kafka since 2019. He has made over 50 commits
mostly around Kafka Connect and Mirror Maker 2. His most notable
contributions include KIP-898: "Modernize Connect plugin discovery" and a
deep overhaul of the offset syncing logic in MM2 that addressed several
technically-difficult, long-standing, high-impact issues.

He has also been an active participant in discussions and reviews on the
mailing lists and on GitHub.

Thanks for all of your contributions, Greg. Congratulations!



Re: [ANNOUNCE] New committer: Greg Harris

2023-07-10 Thread Matthias J. Sax

Congrats!

On 7/10/23 8:45 AM, Chris Egerton wrote:

Hi all,

The PMC for Apache Kafka has invited Greg Harris to become a committer, and
we are happy to announce that he has accepted!

Greg has been contributing to Kafka since 2019. He has made over 50 commits
mostly around Kafka Connect and Mirror Maker 2. His most notable
contributions include KIP-898: "Modernize Connect plugin discovery" and a
deep overhaul of the offset syncing logic in MM2 that addressed several
technically-difficult, long-standing, high-impact issues.

He has also been an active participant in discussions and reviews on the
mailing lists and on GitHub.

Thanks for all of your contributions, Greg. Congratulations!



Re: [DISCUSS] KIP-932: Queues for Kafka

2023-07-09 Thread Matthias J. Sax

Daniel, sure.

To allow the client to filter aborted messages, the broker currently 
attaches metadata that tell the client which records were aborted. But 
the first message after the LSO is a messages in pending state, ie, it 
was neither committed nor aborted yet, so it's not possible to filter or 
deliver it. Thus, the broker cannot provide this metadata (not sure if 
the client could filter without this metadata?)


The main reason why this happens broker side is to avoid that the client 
needs to buffer pending messages "indefinitely" until the TX might 
eventually commit or abort, and thus put a lot a memory pressure on the 
client. For the "classic" case, the situation is  more severe as we 
guarantee ordered delivery, and thus, the client would need to buffer 
everything after the LSO. -- While it's relaxed for queuing as we might 
not guarantee order (ie, instead of buffering everything, only pending 
messages must be buffered), it would still imply a huge additional 
burden on tracking metadata (for both the broker and the consumer), and 
the wire protocol, and I am already worried about the metadata we might 
need to track for queuing in general.


Does this make sense?


-Matthias



On 7/7/23 01:35, Dániel Urbán wrote:

Hi Matthias,
Can you please elaborate on this: "First, you need to understand that
aborted records are filtered client side, and thus for "read-committed" we
can never read beyond the LSO, and the same seems to apply for queuing."
I don't understand the connection here - what does skipping aborted records
have to do with the LSO? As you said, aborted message filtering is done on
the client side (in consumers, yes, but not sure if it has to be the same
for queues), but being blocked on the LSO is the responsibility of the
broker, isn't it? My thought was that the broker could act differently when
working with queues and read_committed isolation.
Thanks,
Daniel

On Thu, Jul 6, 2023 at 7:26 PM Matthias J. Sax  wrote:


Thanks for the KIP.

It seems we are in very early stage, and some very important sections in
the KIP are still marked as TODO. In particular, I am curious about the
protocol changes, how the "queuing state" will be represented and made
durable, and all the error edge case / fail-over / fencing
(broker/clients) that we need to put in place.


A few other comments/question from my side:

(1) Fetch from follower: this was already touched on, but the point is
really that the consumer does not decide about it, but the broker does.
When a consumer sends it's first fetch request it will always go to the
leader, and the broker would reply to the consumer "go and fetch from
this other broker". -- I think it's ok to exclude fetch from follower in
the first version of the KIP, but it would need a broker change such
that the broker knows it's a "queue fetch" request. -- It would also be
worth to explore how fetch from follow could work in the future and
ensure that our initial design allows for it and is future proof.


(2) Why do we not allow pattern subscription and what happens if
different consumers subscribe to different topics? It's not fully
explained in the KIP.


(3) auto.offset.reset and SPSO/SPSE -- I don't understand why we would
not allow auto.offset.reset? In the discussion, you mentioned that
"first consumer would win, if two consumers have a different config" --
while this is correct, it's the same for a consumer group right now.
Maybe we should not try to solve a "non problem"? -- In general, my
impression is that we are going to do Kafkaeque Queuing, what is fine,
but it might be to our advantage to carry over as many established
concepts as we can? And if not, have a very good reason not to.

In the end, it find if very clumsy to only have an admin API to change
the starting point of a consumer.

(3B) What happens if lag grows and data is purged broker side?

(3C) What happens if the broker released records (based on "timeout /
exceeding deliver count), and the "ack/reject" comes afterwards?

(3D) How to find out what records got archived but where not acked (ie,
lost) for re-processing/debugging purpose? The question was already
asked and the answer was "not supported", but I think it would be
must-have before the feature is usable in production? We can of course
also only do it in a future release and not the first "MVP"
implementation, but the KIP should address it. In the end, the overall
group monitoring story is missing.


(4) I am also wondering about the overall design with regard to "per
record" vs "per batch" granularity. In the end, queuing usually aims for
"per records" semantics, but "per record" implies to keep track of a lot
of metadata. Kafka is designed on a "per batch" granularity, and it's
unclear to me how both will go together?

(4A) Do we keep "ack/reject

Re: [DISCUSS] KIP-932: Queues for Kafka

2023-07-06 Thread Matthias J. Sax

Thanks for the KIP.

It seems we are in very early stage, and some very important sections in 
the KIP are still marked as TODO. In particular, I am curious about the 
protocol changes, how the "queuing state" will be represented and made 
durable, and all the error edge case / fail-over / fencing 
(broker/clients) that we need to put in place.



A few other comments/question from my side:

(1) Fetch from follower: this was already touched on, but the point is 
really that the consumer does not decide about it, but the broker does. 
When a consumer sends it's first fetch request it will always go to the 
leader, and the broker would reply to the consumer "go and fetch from 
this other broker". -- I think it's ok to exclude fetch from follower in 
the first version of the KIP, but it would need a broker change such 
that the broker knows it's a "queue fetch" request. -- It would also be 
worth to explore how fetch from follow could work in the future and 
ensure that our initial design allows for it and is future proof.



(2) Why do we not allow pattern subscription and what happens if 
different consumers subscribe to different topics? It's not fully 
explained in the KIP.



(3) auto.offset.reset and SPSO/SPSE -- I don't understand why we would 
not allow auto.offset.reset? In the discussion, you mentioned that 
"first consumer would win, if two consumers have a different config" -- 
while this is correct, it's the same for a consumer group right now. 
Maybe we should not try to solve a "non problem"? -- In general, my 
impression is that we are going to do Kafkaeque Queuing, what is fine, 
but it might be to our advantage to carry over as many established 
concepts as we can? And if not, have a very good reason not to.


In the end, it find if very clumsy to only have an admin API to change 
the starting point of a consumer.


(3B) What happens if lag grows and data is purged broker side?

(3C) What happens if the broker released records (based on "timeout / 
exceeding deliver count), and the "ack/reject" comes afterwards?


(3D) How to find out what records got archived but where not acked (ie, 
lost) for re-processing/debugging purpose? The question was already 
asked and the answer was "not supported", but I think it would be 
must-have before the feature is usable in production? We can of course 
also only do it in a future release and not the first "MVP" 
implementation, but the KIP should address it. In the end, the overall 
group monitoring story is missing.



(4) I am also wondering about the overall design with regard to "per 
record" vs "per batch" granularity. In the end, queuing usually aims for 
"per records" semantics, but "per record" implies to keep track of a lot 
of metadata. Kafka is designed on a "per batch" granularity, and it's 
unclear to me how both will go together?


(4A) Do we keep "ack/reject/..." state per-record, or per batch? It 
seems per record, but it would require to hold a lot of meta-data. Also, 
how does it work for the current protocol, is a batch is partially acked 
and we need to re-deliver? Would we add metadata and the let client 
filter acked messages (similar to how "read-committed" mode works)?


(4B) What does "the share-partition leader prefers to return complete
 record batches." exactly mean? "Prefers" is a fuzzy word. What happens 
if we cannot return a complete record batch?


(4C) What happens if different consumer of the same group configure 
different batch sizes for fetching records? How do we track the 
corresponding meta-data?


(4D)


In the situation where some records in a batch have been released or rejected 
separately, subsequent fetches of those records are more likely to have gaps.


What does this mean?

(4E)


For efficiency, the consumer preferentially returns complete record sets with 
no gaps


Can you elaborate on the details?


API contract:

(5A)

acks must be issued in the order in which the records appear


Why is this the case? Sounds like an arbitrary restriction to me? Can 
you share your reasoning?



(5B) How to "reject" (or just "release") all records of a batch at once? 
It seem the API only allows to "ack" all record of a batch at once.


(5C) Currently, `ConsumerRecords` object may contain records from 
different partitions? Would this still be the case?



(6) Group management / re-balancing:

(6A) The KIP should explain better how heart-beating works (was already 
partially discussed). How does `max.poll.interval.ms` interact? Would it 
trigger a "release" of records if violated?


(6B) You mentioned that a consumer that does not heartbeat would just be 
removed from the group with a rebalance: Given the current design to 
assign all partitions to every consumer in the group, that would be ok. 
But as you mentioned on the KIP, we might want to be more clever with 
regard to assigning partitions in the future, and I think we would 
actually need to trigger a rebalance to avoid a later protocol change: 
otherwise, 

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2023-06-29 Thread Matthias J. Sax

Shay,

thanks for picking up this KIP. It's a pity that the discussion stalled 
for such a long time.


As expressed previously, I am happy with the name `markAsPartitioned()` 
and also believe it's ok to just document the impact and leave it to the 
user to do the right thing.


If we really get a lot of users that ask about it, because they did not 
do the right thing, we could still add something (eg, a reverse-mapper 
function) in a follow-up KIP. But we don't know if it's necessary; thus, 
making a small incremental step sounds like a good approach to me.


Let's see if others agree or not.


-Matthias

On 6/28/23 5:29 PM, Shay Lin wrote:

Hi all,

Great discussion thread. May I take this KIP up? If it’s alright my plan is
to update the KIP with the operator `markAsPartitioned()`.

As you have discussed and pointed out, there are implications to downstream
joins or aggregation operations. Still, the operator is intended for
advanced users so my two cents is it would be a valuable addition
nonetheless. We could add this as a caution/consideration as part of the
java doc.

Let me know, thanks.
Shay



Re: [DISCUSS] KIP-941: Range queries to accept null lower and upper bounds

2023-06-29 Thread Matthias J. Sax

Thanks for the KIP. LGTM.

I believe you can start a vote.

-Matthias

On 6/26/23 11:25 AM, Lucia Cerchie wrote:

Thanks for asking for clarification, Sophie; that gives me guidance on
improving the KIP! Here's the updated version, including the JIRA link:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-941%3A+Range+queries+to+accept+null+lower+and+upper+bounds


On Thu, Jun 22, 2023 at 12:57 PM Sophie Blee-Goldman 
wrote:


Hey Lucia, thanks for the KIP! Just some minor notes:

I'm in favor of the proposal overall, at least I think so -- for someone
not intimately familiar with the new IQ API and *RangeQuery* class, the KIP
was a bit difficult to follow along and I had to read in between the lines
to figure out what the old behavior was and what the new and improved logic
would do.

It would be good to state clearly in the beginning what happens when null
is passed in right now, and what will happen after this KIP is implemented.
For example in the "Public Interfaces" section, I couldn't tell if the
middle sentence was describing what was changing, or what it was changing
*to.*

One last little thing is can you link to the jira ticket at the top? And
please create one if it doesn't already exist -- it helps people figure out
when a KIP has been implemented and in which versions, as well as navigate
from the KIP to the actual code that was merged. Things can change during
implementation and the KIP document is how most people read up on new
features, but almost all of us are probably guilty of forgetting to update
the KIP document. So it's important to be able to find the code when in
doubt.

Otherwise nice KIP!

On Thu, Jun 22, 2023 at 8:19 AM Lucia Cerchie

wrote:


Thanks Kirk and John for the valuable feedback!

John, I'll update the KIP to reflect that nuance you mention -- yes it is
just about making the withRange method more permissive. Thanks for the
testing file as well, I'll be sure to write my test cases there.

On Wed, Jun 21, 2023 at 10:50 AM Kirk True  wrote:


Hi John/Lucia,

Thanks for the feedback!

Of course I only noticed the private-ness of the RangeQuery constructor
moments after sending my email ¯\_(ツ)_/¯

Just to be clear, I’m happy with the proposed change as it conforms to
Postel’s Law ;) Apologies that it was worded tersely.

Thanks,
Kirk


On Jun 21, 2023, at 10:20 AM, John Roesler 

wrote:


Hi all,

Thanks for the KIP, Lucia! This is a nice change.

To Kirk's question (1), the example is a bit misleading. The typical

case that would ease user pain is specifically using "null" to indicate

an

open-ended range, especially since null is not a valid key.


I could additionally see an empty string as being nice, but the

actual

API is generic, not String, so there's no meaningful concept of
empty/blank/whitespace that we could check for, just null or not.


Regarding (2), there's no public factory that takes Optional

parameters.

I think you're looking at the private constructor. An alternative Lucia
could consider is to instead propose adding a new factory like
`withRange(Optional lower, Optional upper)`.


FWIW, I'd be in favor of this KIP as proposed.

A couple of smaller notes:

3. In the compatibility notes, I wasn't sure what "web request" was

referring to. I think you just mean that all existing valid API calls

will

continue to work the same, and we're only making the withRange method

more

permissive with its arguments.


4. For the Test Plan, I wrote some tests that validate these queries

against every kind and configuration of store possible. Please add your

new

test cases to that one to make absolutely sure it'll work for every

store.

Obviously, you may also want to add some specific unit tests in

addition.


See





https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java


Thanks again!
-John

On 6/21/23 12:00, Kirk True wrote:

Hi Lucia,
One question:
1. Since the proposed implementation change for withRange() method

uses

Optional.ofNullable() (which only catches nulls and not

blank/whitespace

strings), wouldn’t users still need to have code like that in the

example?

2. Why don't users create RangeQuery objects that use Optional

directly? What’s the benefit of introducing what appears to be a very

thin

utility facade?

Thanks,
Kirk

On Jun 21, 2023, at 9:51 AM, Kirk True  wrote:

Hi Lucia,

Thanks for the KIP!

The KIP wasn’t in the email and I didn’t see it on the main KIP

directory. Here it is:








https://cwiki.apache.org/confluence/display/KAFKA/KIP-941%3A+Range+queries+to+accept+null+lower+and+upper+bounds


Can the KIP be added to the main KIP page (





https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals

)?

That will help with discoverability and encourage discussion.


Thanks,
Kirk


On Jun 15, 2023, at 2:13 PM, Lucia Cerchie

 wrote:


Hi everyone,

I'd like to discuss KIP-941, which will change the behavior of

range


Re: [DISCUSS] KIP-941 Support async runtimes in consumer

2023-06-29 Thread Matthias J. Sax

Seems the KIP number is 947, not 941?

Can you maybe start a new thread to avoid confusion?

Thanks.

On 6/28/23 1:11 AM, Erik van Oosten wrote:

Hello developers of the Java based consumer,

I submitted https://github.com/apache/kafka/pull/13914 to fix a long 
standing problem that the Kafka consumer on the JVM is not usable from 
asynchronous runtimes such as Kotlin co-routines and ZIO. However, since 
it extends the public API I was requested to create a KIP.


So here it is:
KIP-941 Support async runtimes in consumer 
https://cwiki.apache.org/confluence/x/chw0Dw


Any questions, comments, ideas and other additions are welcome!

The KIP should be complete except for the testing section. As far as I 
am aware there are no tests for the current behavior. Any help in this 
area would be appreciated.


Kind regards,
     Erik.




[jira] [Commented] (KAFKA-15116) Kafka Streams processing blocked during rebalance

2023-06-29 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738782#comment-17738782
 ] 

Matthias J. Sax commented on KAFKA-15116:
-

> Message A uses an internal store to store information about the entity.  The 
> store knows that there is a pending event that is yet to be committed so it 
> blocks until it is committed. 

Are you saying that this happens in a background thread that you start 
yourself? If yes, it a non-supported pattern, and we cannot give any guarantee 
about the behavior of the system. If there is no background thread, that 
blocking would imply that `StreamThread` blocks (also something you should not 
do, as it would imply the that thread drops out of the consumer group after 
`max.poll.interval.ms` passed, and thus, how could message B get processed? Or 
is this internal store that you mentioned shared across `StreamThreads` (this 
would also be an anti-pattern, and we cannot give any guarantee how the system 
behalves if you do this).

> The store knows that there is a pending event that is yet to be committed so 
> it blocks until it is committed.

I am also wondering what you exactly mean by "committed" (it's a highly 
overloaded term, so it would be good to clarify). In Kafka itself, there could 
be two meanings: for at-least-once-processing "committing" means to commit the 
input topic offsets and mark the input records as processed. For 
exaclty-once-processing "committing" means to commit the Kafka TX, ie, 
committing the result record into the output topic plus committing the input 
topic offset to mark the input records as processed. Not sure which one you 
mean, or if you actually refer to some mechanism to commit into your internal 
store?

I guess, I still don't understand your overall end-to-end workflow of your 
program.

> Kafka Streams processing blocked during rebalance
> -
>
> Key: KAFKA-15116
> URL: https://issues.apache.org/jira/browse/KAFKA-15116
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.0
>Reporter: David Gammon
>Priority: Major
>
> We have a Kafka Streams application that simply takes a messages, processes 
> it and then produces an event out the other side. The complexity is that 
> there is a requirement that all events with the same partition key must be 
> committed before the next message  is processed.
> This works most of the time flawlessly but we have started to see problems 
> during deployments where the first message blocks the second message during a 
> rebalance because the first message isn’t committed before the second message 
> is processed. This ultimately results in transactions timing out and more 
> rebalancing.
> We’ve tried lots of configuration to get the behaviour we require with no 
> luck. We’ve now put in a temporary fix so that Kafka Streams works with our 
> framework but it feels like this might be a missing feature or potentially a 
> bug.
> +Example+
> Given:
>  * We have two messages (InA and InB).
>  * Both messages have the same partition key.
>  * A rebalance is in progress so streams is no longer able to commit.
> When:
>  # Message InA -> processor -> OutA (not committed)
>  # Message InB -> processor -> blocked because #1 has not been committed



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Kafka Streaming: RocksDbSessionBytesStoreSupplier seems lost data in Kubernetes

2023-06-29 Thread Matthias J. Sax
The class `RocksDbSessionBytesStoreSupplier` is in package `internal` 
and thus, you should not use it directly. Instead, you should use the 
public factory class `org.apache.kafka.streams.state.Stores`


However, your usage seems correct in general.

Not sure why you pass-in the supplier directly though? In the end, if 
you want to set a name for the store, you can use 
`Materialized.as("..."), and you can set retention time via 
`Materailazed#withRetention(...)` (what would be the proper usage of the 
API).


Besides this, the store should be backed by a changelog topic and thus 
you should never lose any data, independent of you deployment.


Of course, I would recommend to use a stateful set and re-attach storage 
to the pod to avoid re-creating the store from the changelog.


HTH,

-Matthias


On 6/28/23 8:49 AM, An, Hongguo (CORP) wrote:

Hi:
I am using RocksDbSessionBytesStoreSupplier in my kafka streaming application 
for an aggregation like this:


var materialized =

Materialized.>as(

  new 
RocksDbSessionBytesStoreSupplier(env.getProperty("messages.cdc.pft.topic", 
"NASHCM.PAYROLL.PFT.FILENUMBER"),

Duration.parse(env.getProperty("pft.duration", 
"P7D")).toMillis()))

.withKeySerde(stringSerde)

.withValueSerde(listSerde);




stream.windowedBy(SessionWindows

.with(Duration.parse(env.getProperty("pft.gap", "PT0.1S")))

.grace(Duration.parse(env.getProperty("pft.duration", 
"PT0.05S")))

   )

   .aggregate(ArrayList::new,

(k, v, list)->{list.add(v); return list;},

(k, list1, list2)->{list1.addAll(list2); return list1;},

materialized)

.toStream().foreach((key, value) -> {

//sometimes value is null, but this should never happened – and we do see some 
messages not processed.

}



The application runs on Kubernetes, should we not use 
RocksDbSessionBytesStoreSupplier?



Thanks

Andrew



This message and any attachments are intended only for the use of the addressee 
and may contain information that is privileged and confidential. If the reader 
of the message is not the intended recipient or an authorized representative of 
the intended recipient, you are hereby notified that any dissemination of this 
communication is strictly prohibited. If you have received this communication 
in error, notify the sender immediately by return email and delete the message 
and any attachments from your system.


[jira] [Commented] (KAFKA-13973) block-cache-capacity metrics worth twice as much as normal

2023-06-28 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738231#comment-17738231
 ] 

Matthias J. Sax commented on KAFKA-13973:
-

The GitHub issue for RocksDB was "declined". I did file a follow up ticker for 
Speedb ([https://github.com/speedb-io/speedb/issues/583)] – maybe we get help 
there.

> block-cache-capacity metrics worth twice as much as normal
> --
>
> Key: KAFKA-13973
> URL: https://issues.apache.org/jira/browse/KAFKA-13973
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.0
>Reporter: Sylvain Le Gouellec
>Priority: Minor
> Attachments: Screenshot 2022-06-09 at 08.55.36.png, Screenshot 
> 2022-06-09 at 09.33.50.png
>
>
> I have created a very simple kafka-streams application with 1 state store. 
> I'm very surprised that the block-cache-capacity metrics show a {{100MB}} 
> block cache capacity instead of the default one in kafka streams is 
> {{{}50MB{}}}.
>  
> My topology :
> StreamsBuilder sb = new StreamsBuilder();
> sb.stream("input")
> .groupByKey()
> .count()
> .toStream()
> .to("output");
>  
> I checkout the {{kafka-streams}} code and I saw a strange thing. When the 
> {{{}RocksDBTimestampedStore{}}}store is created, we try to create two column 
> families for backward compatibility with a potentiel old key/value store.
> In this method, {{setDbAccessor(col1, col2)}} if the first column is not 
> valid, well you close this one 
> ([L102|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L102]).
>  But regarding the rocksdb instance, it's seems that the column families is 
> not deleted completely and the metrics exposed by [Rocksdb continue to 
> aggregate 
> (L373)|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java#L373]
>  {{block-cache-capacity }}for both column families (default and 
> keyValueWithTimestamp).
> Maybe you have to drop explicitly the column family, in the 
> {{setDbAccessor(col1, col2)}} if the first column is not valid (like 
> {{{}db.dropColumnFamily(noTimestampColumnFamily);{}}})
>  
> I tried to drop the {{noTimestampColumnFamily in setDbAccessor if the first 
> column is not valid like : }}
>  
> {code:java}
> private void setDbAccessor(final ColumnFamilyHandle noTimestampColumnFamily,
>final ColumnFamilyHandle 
> withTimestampColumnFamily) throws RocksDBException {
> final RocksIterator noTimestampsIter = 
> db.newIterator(noTimestampColumnFamily);
> noTimestampsIter.seekToFirst();
> if (noTimestampsIter.isValid()) {
> log.info("Opening store {} in upgrade mode", name);
> dbAccessor = new DualColumnFamilyAccessor(noTimestampColumnFamily, 
> withTimestampColumnFamily);
> } else {
> log.info("Opening store {} in regular mode", name);
> dbAccessor = new 
> SingleColumnFamilyAccessor(withTimestampColumnFamily);
> noTimestampColumnFamily.close();
> db.dropColumnFamily(noTimestampColumnFamily); // try fix it
> }
> noTimestampsIter.close();
> }{code}
>  
>  
>  
> {{But it's seems that you can't drop the default column family in RocksDb 
> (see screenshot).}}
> {{*So how can we have the real block-cache-capacity metrics value in Kafka 
> Streams monitoring ?* }}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15116) Kafka Streams processing blocked during rebalance

2023-06-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17736637#comment-17736637
 ] 

Matthias J. Sax commented on KAFKA-15116:
-

> The complexity is that there is a requirement that all events with the same 
> partition key must be committed before the next message  is processed.

Not sure if I understand this requirement. Can you elaborate? The input is an 
infinite stream. So if you get, let's say 3 messages all with key A, how do you 
know that there is no 4th message with key A? – Also, in Kafka Streams you 
cannot really control when a commit happens to begin with.

> where the first message blocks the second message during a rebalance because 
> the first message isn’t committed before the second message is processed

Also not sure what this means? If a rebalance is triggered, all pending 
messages will be flushed out, the offset will be committed.

> This ultimately results in transactions timing out and more rebalancing.

Kafka Streams manages transaction under the hood for you. So you don't know 
when a TX starts or ends. How can you reason about it?

> We’ve now put in a temporary fix 

Can you give more details?

> A rebalance is in progress so streams is no longer able to commit.

If a rebalance is triggered, KS should first commit before the rebalance goes 
into "in progress" state – and thus, it should not be necessary to commit (it 
was already done).

> Kafka Streams processing blocked during rebalance
> -
>
> Key: KAFKA-15116
> URL: https://issues.apache.org/jira/browse/KAFKA-15116
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.0
>Reporter: David Gammon
>Priority: Major
>
> We have a Kafka Streams application that simply takes a messages, processes 
> it and then produces an event out the other side. The complexity is that 
> there is a requirement that all events with the same partition key must be 
> committed before the next message  is processed.
> This works most of the time flawlessly but we have started to see problems 
> during deployments where the first message blocks the second message during a 
> rebalance because the first message isn’t committed before the second message 
> is processed. This ultimately results in transactions timing out and more 
> rebalancing.
> We’ve tried lots of configuration to get the behaviour we require with no 
> luck. We’ve now put in a temporary fix so that Kafka Streams works with our 
> framework but it feels like this might be a missing feature or potentially a 
> bug.
> +Example+
> Given:
>  * We have two messages (InA and InB).
>  * Both messages have the same partition key.
>  * A rebalance is in progress so streams is no longer able to commit.
> When:
>  # Message InA -> processor -> OutA (not committed)
>  # Message InB -> processor -> blocked because #1 has not been committed



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15108) task.timeout.ms does not work when TimeoutException is thrown by streams producer

2023-06-20 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17735476#comment-17735476
 ] 

Matthias J. Sax commented on KAFKA-15108:
-

There are a few cases for which we cannot handle a `TimeoutException` more 
gracefully, and the docs gloss over this fact. – The scenario you describe is 
one of these cases.

I agree that we should maybe try to include it – the challenge (and why it was 
not included in the original work) is, that it will need different handling 
compared how we handle `TimeoutException` for the regular case...

> task.timeout.ms does not work when TimeoutException is thrown by streams 
> producer
> -
>
> Key: KAFKA-15108
> URL: https://issues.apache.org/jira/browse/KAFKA-15108
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.0
>Reporter: Tomonari Yamashita
>Priority: Major
>
> [Problem]
>  - task.timeout.ms does not work when TimeoutException is thrown by streams 
> producer
>  -- Kafka Streams upgrade guide says, "Kafka Streams is now handling 
> TimeoutException thrown by the consumer, producer, and admin client."(1) and 
> "To bound how long Kafka Streams retries a task, you can set task.timeout.ms 
> (default is 5 minutes)."(1).
>  -- However, it doesn't look like task.timeout.ms is working for the streams 
> producer, then it seems to keep retrying forever.
> [Environment]
>  - Kafka Streams 3.5.0
> [Reproduce procedure]
>  # Create "input-topic" topic
>  # Put several messages on "input-topic"
>  # DONT create "output-topic" topic, to fire TimeoutException
>  # Create the following simple Kafka streams program; this program just 
> transfers messages from "input-topic" to "output-topic".
>  -- 
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "java-kafka-streams");
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde");
> props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serdes$StringSerde");
> props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,"com.example.CustomProductionExceptionHandler");
>  // not needed
> StreamsBuilder builder = new StreamsBuilder();
> builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
> .to("output-topic", Produced.with(Serdes.String(), Serdes.String()));
> KafkaStreams streams = new KafkaStreams(builder.build(), props);
> {code}
>  # Wait for task.timeout.ms (default is 5 minutes).
>  ## If the debug log is enabled, a large number of 
> UNKNOWN_TOPIC_OR_PARTITIONs will be logged because "output-topic" does not 
> exist.
>  ## And every one minute, TimeoutException will be generated (2)
>  # ==> However, it doesn't look like task.timeout.ms is working for the 
> streams producer, then it seems to keep retrying forever.
>  ## My excepted behavior is that task.timeout.ms is working, and the client 
> will be shutdown because the default behavior is 
> StreamThreadExceptionResponse.SHUTDOWN_CLIENT when an exception is thrown.
> [As far as my investigation]
>  - TimeoutException thrown by the streams producer is replaced with 
> TaskCorruptedException in RecordCollectorImpl.recordSendError(...) (3)
>  - And after that it does not appear to be executing code that contains logic 
> related to task.timeout.ms.
> (1) Kafka Streams upgrade guide
> - [https://kafka.apache.org/35/documentation/streams/upgrade-guide]
> - 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams]
> {code:java}
> Kafka Streams is now handling TimeoutException thrown by the consumer, 
> producer, and admin client. If a timeout occurs on a task, Kafka Streams 
> moves to the next task and retries to make progress on the failed task in the 
> next iteration. To bound how long Kafka Streams retries a task, you can set 
> task.timeout.ms (default is 5 minutes). If a task does not make progress 
> within the specified task timeout, which is tracked on a per-task basis, 
> Kafka Streams throws a TimeoutException (cf. KIP-572).
> {code}
> (2) TimeoutException occurs
> {code:java}
> 2023-06-19 19:51:26 WARN  NetworkClient:1145 - [Producer 
> clientId=java-kafka-strea

Re: [ANNOUNCE] New committer: Divij Vaidya

2023-06-13 Thread Matthias J. Sax

Congrats!

On 6/13/23 10:24 AM, Satish Duggana wrote:

Congratulations Divij!!

On Tue, 13 Jun 2023 at 22:41, Manyanda Chitimbo
 wrote:


Congratulations Divij.

On Tue 13 Jun 2023 at 17:50, Bruno Cadonna  wrote:


Hi all,

The PMC of Apache Kafka is pleased to announce a new Kafka committer
Divij Vaidya.

Divij's major contributions are:

GDPR compliance enforcement of kafka-site -
https://issues.apache.org/jira/browse/KAFKA-13868

Performance improvements:

Improve performance of VarInt encoding and decoding -
https://github.com/apache/kafka/pull/13312

Reduce data copy & buffer allocation during decompression -
https://github.com/apache/kafka/pull/13135

He also was heavily involved in the migration to Mockito.

Furthermore, Divij is very active on the mailing lists as well as in
maintaining and reviewing pull requests.

Congratulations, Divij!

Thanks,

Bruno (on behalf of the Apache Kafka PMC)


--

Manyanda Chitimbo.


Re: Consuming an entire partition with control messages

2023-06-13 Thread Matthias J. Sax

Sounds like a bug in aiokafka library to me.

If the last message in a topic partition is a tx-marker, the consumer 
should step over it, and report the correct position after the marker.


The official KafkaConsumer (ie, the Java one), does the exact same thing.


-Matthias

On 5/30/23 8:41 AM, Vincent Maurin wrote:

Hello !

I am working on an exactly once stream processors in Python, using
aiokafka client library. My program stores a state in memory, that is
recovered from a changelog topic, like in kafka streams.

On each processing loop, I am consuming messages, producing messages
to an output topics and to my changelog topic, within a transaction.

When I need to restart a runner, to restore the state in memory, I
have a routine consuming the changelog topic from the beginning to the
"end" with a read_commited isolation level. Here I am struggling to
define when to stop my recovery :
* my current (maybe) working solution is to loop over "poll" until
poll is not returning any messages anymore
* I tried to do more something based on the end offests, the checking
the consumer position, but with control messages at the end of the
partition, I am running into an issue where position is one below end
offsets, and doesn't go further

I had a quick look to
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
but it is a bit hard to figure out what is going on here

Best regards,
Vincent


Re: [DISCUSS] KIP-923: Add A Grace Period to Stream Table Join

2023-05-25 Thread Matthias J. Sax
he context of versioned stores refers to how far back in time
out-of-order writes may occur, which probably isn't directly relevant for
introducing a stream-side buffer, though it's also possible I've overlooked
something. (As a bonus, switching from "table grace period" in the KIP to
"table history retention" also helps to clarify/distinguish that it's a
different parameter from the "join grace period," which I could see being
confusing to readers. :) )


Cheers,
Victoria

On Thu, May 18, 2023 at 1:43 PM Walker Carlson
 wrote:


Hey all,

Thanks for the comments, they gave me a lot to think about. I'll try to
address them all inorder. I have made some updates to the kip related to
them, but I mention where below.

Lucas

Good idea about the example. I added a simple one.

1) I have thought about including options for the underlying buffer
configuration. One of which might be adding an in memory option. My biggest
concern is about the semantic guarantees. This isn't like suppress or with
windows where producing incomplete results is repetitively harmless. Here
we would be possibly producing incorrect results. I also would like to keep
the interface changes as simple as I can. Making more than this change to
Joined I feel could make this more complicated than it needs to be. If we
really want to I could see adding a grace() option with a BufferConifg in
there or something, but I would rather not.

2) The buffer will be independent of if the table is versioned or not. If
table is not materialized it will materialize it as versioned. It might
make sense to do a follow up kip where we force the retention period  of
the versioned to be greater than whatever the max of the stream buffer is.

Victoria

1) Yes, records will exit in timestamp order not in offset order.
2) Late records will be dropped (Late as out of the grace period). From my
understanding that is the point of a grace period, no? Doesn't the same
thing happen with versioned stores?
3) The segment store already has an observed stream time, we advance based
on that. That should only advance based on records that enter the store. So
yes, only stream side records. We could maybe do an improvement later to
advance stream time from table side as well, but that might be debatable as
we might get more late records. Anyways I would rather have that as a
separate discussion.

in memory option? We can do that, for the buffer I plan to use the
TimeOrderedKeyValueBuffer interface which already has an in memory
implantation, so it would be simple.

I said more in my answer to Lucas's question. The concern I have with
buffer configs or in memory is complicating the interface. Also semantic
guarantees but in memory shouldn't effect that

Matthias

1) fixed out of order vs late terminology in the kip.

2) I was referring to having a stream. So after this kip we can have a
buffered stream or a normal one. For the table we can use a versioned table
or a normal table.

3 Good call out. I clarified this as "If the table side uses a materialized
version store, it can store multiple versions of each record within its
defined grace period." and modified the rest of the paragraph a bit.

4) I get the preserving off offset ordering, but if the stream is buffered
to join on timestamp instead of offset doesn't it already seem like we care
more about time in this case?

If we end up adding more options it might make sense to do this. Maybe
offset order processing can be a follow up?

I'll add a section for this in Rejected Alternatives. I think it makes
sense to do something like this but maybe in a follow up.

5) I hadn't thought about this. I suppose if they changed this in an
upgrade the next record would either evict a lot of records (if the grace
period decreased) or there would be a pause until the new grace period
reached. Increasing is a bit more problematic, especially if the table
grace period and retention time stays the same. If the data is reprocessed
after a change like that then there would be different results, but I feel
like that would be expected after such a change.

What do you think should happen?

Hopefully this answers your questions!

Walker

On Mon, May 8, 2023 at 11:32 AM Matthias J. Sax  wrote:


Thanks for the KIP! Also some question/comments from my side:

10) Notation: you use the term "late data" but I think you mean
out-of-order. We reserve the term "late" to records that arrive after
grace period passed, and thus, "late == out-of-order data that is

dropped".



20) "There is only one option from the stream side and only recently is
there a second option on the table side."

What are those options? Victoria already asked about the table side, but
I am also not sure what option you mean for the stream side?


30) "If the table side uses a materialized version store the value is
the latest by stream time rather than by offset within it

[jira] [Commented] (KAFKA-7497) Kafka Streams should support self-join on streams

2023-05-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725357#comment-17725357
 ] 

Matthias J. Sax commented on KAFKA-7497:


Seems to be fixed. Cf https://issues.apache.org/jira/browse/KAFKA-14209 

> Kafka Streams should support self-join on streams
> -
>
> Key: KAFKA-7497
> URL: https://issues.apache.org/jira/browse/KAFKA-7497
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Robin Moffatt
>Priority: Major
>  Labels: needs-kip
>
> There are valid reasons to want to join a stream to itself, but Kafka Streams 
> does not currently support this ({{Invalid topology: Topic foo has already 
> been registered by another source.}}).  To perform the join requires creating 
> a second stream as a clone of the first, and then doing a join between the 
> two. This is a clunky workaround and results in unnecessary duplication of 
> data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-7497) Kafka Streams should support self-join on streams

2023-05-23 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7497.

Resolution: Fixed

> Kafka Streams should support self-join on streams
> -
>
> Key: KAFKA-7497
> URL: https://issues.apache.org/jira/browse/KAFKA-7497
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Robin Moffatt
>Priority: Major
>  Labels: needs-kip
>
> There are valid reasons to want to join a stream to itself, but Kafka Streams 
> does not currently support this ({{Invalid topology: Topic foo has already 
> been registered by another source.}}).  To perform the join requires creating 
> a second stream as a clone of the first, and then doing a join between the 
> two. This is a clunky workaround and results in unnecessary duplication of 
> data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-7497) Kafka Streams should support self-join on streams

2023-05-23 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7497.

Resolution: Fixed

> Kafka Streams should support self-join on streams
> -
>
> Key: KAFKA-7497
> URL: https://issues.apache.org/jira/browse/KAFKA-7497
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Robin Moffatt
>Priority: Major
>  Labels: needs-kip
>
> There are valid reasons to want to join a stream to itself, but Kafka Streams 
> does not currently support this ({{Invalid topology: Topic foo has already 
> been registered by another source.}}).  To perform the join requires creating 
> a second stream as a clone of the first, and then doing a join between the 
> two. This is a clunky workaround and results in unnecessary duplication of 
> data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14173) TopologyTestDriver does not use mock wall clock time when sending test records

2023-05-23 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14173.
-
Resolution: Not A Problem

> TopologyTestDriver does not use mock wall clock time when sending test records
> --
>
> Key: KAFKA-14173
> URL: https://issues.apache.org/jira/browse/KAFKA-14173
> Project: Kafka
>  Issue Type: Bug
>  Components: streams-test-utils
>Affects Versions: 2.3.1
>Reporter: Guido Josquin
>Priority: Minor
>
> I am trying to test a stream-stream join with `TopologyTestDriver`. My goal 
> is to confirm that my topology performs the following left join correctly.
> {code:java}
> bills
>   .leftJoin(payments)(
> {
>   case (billValue, null) => billValue
>   case (billValue, paymentValue) => (billValue.toInt - 
> paymentValue.toInt).toString
> },
> JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100))
>   )
>   .to("debt")
> {code}
>  
> In other words, if we see a `bill` and a `payment` within 100ms, the payment 
> should be subtracted from the bill. If we do not see a payment, the debt is 
> simply the bill.
> Here is the test code.
> {code:java}
> val simpleLeftJoinTopology = new SimpleLeftJoinTopology
> val driver = new TopologyTestDriver(simpleLeftJoinTopology.topology)
> val serde = Serdes.stringSerde
> val bills = driver.createInputTopic("bills", serde.serializer, 
> serde.serializer)
> val payments = driver.createInputTopic("payments", serde.serializer, 
> serde.serializer)
> val debt = driver.createOutputTopic("debt", serde.deserializer, 
> serde.deserializer)
> bills.pipeInput("fred", "100")
> bills.pipeInput("george", "20")
> payments.pipeInput("fred", "95")
> // When in doubt, sleep twice
> driver.advanceWallClockTime(Duration.ofMillis(500))
> Thread.sleep(500)
> // Send a new record to cause the previous window to be closed
> payments.pipeInput("percy", "0")
> val keyValues = debt.readKeyValuesToList()
> keyValues should contain theSameElementsAs Seq(
>   // This record is present
>   new KeyValue[String, String]("fred", "5"),
>   // This record is missing
>   new KeyValue[String, String]("george", "20")
> )
> {code}
> Full code available at [https://github.com/Oduig/kstreams-left-join-example]
> Is seems that advancing the wall clock time, sleeping, or sending an extra 
> record, never triggers the join condition when data only arrives on the left 
> side. It is possible to circumvent this by passing an explicit event time 
> with each test record. (See 
> https://stackoverflow.com/questions/73443812/using-kafka-streams-topologytestdriver-how-to-test-left-join-between-two-strea/73540161#73540161)
>  
> However, the behavior deviates from a real Kafka broker. With a real broker, 
> if we do not send an event, it uses the wall clock time of the broker 
> instead. The behavior under test should be the same: 
> `driver.advanceWallClockTime` should provide the default time to be used for 
> `TestTopic.pipeInput`, when no other time is specified.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14173) TopologyTestDriver does not use mock wall clock time when sending test records

2023-05-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725356#comment-17725356
 ] 

Matthias J. Sax commented on KAFKA-14173:
-

Just discovering this ticket.

I guess you would need to use `TestInputTopic#advanceTime` for this case?

Closing the ticket as "no an issue", as the API is there. Feel free to follow 
up.

> TopologyTestDriver does not use mock wall clock time when sending test records
> --
>
> Key: KAFKA-14173
> URL: https://issues.apache.org/jira/browse/KAFKA-14173
> Project: Kafka
>  Issue Type: Bug
>  Components: streams-test-utils
>Affects Versions: 2.3.1
>Reporter: Guido Josquin
>Priority: Minor
>
> I am trying to test a stream-stream join with `TopologyTestDriver`. My goal 
> is to confirm that my topology performs the following left join correctly.
> {code:java}
> bills
>   .leftJoin(payments)(
> {
>   case (billValue, null) => billValue
>   case (billValue, paymentValue) => (billValue.toInt - 
> paymentValue.toInt).toString
> },
> JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100))
>   )
>   .to("debt")
> {code}
>  
> In other words, if we see a `bill` and a `payment` within 100ms, the payment 
> should be subtracted from the bill. If we do not see a payment, the debt is 
> simply the bill.
> Here is the test code.
> {code:java}
> val simpleLeftJoinTopology = new SimpleLeftJoinTopology
> val driver = new TopologyTestDriver(simpleLeftJoinTopology.topology)
> val serde = Serdes.stringSerde
> val bills = driver.createInputTopic("bills", serde.serializer, 
> serde.serializer)
> val payments = driver.createInputTopic("payments", serde.serializer, 
> serde.serializer)
> val debt = driver.createOutputTopic("debt", serde.deserializer, 
> serde.deserializer)
> bills.pipeInput("fred", "100")
> bills.pipeInput("george", "20")
> payments.pipeInput("fred", "95")
> // When in doubt, sleep twice
> driver.advanceWallClockTime(Duration.ofMillis(500))
> Thread.sleep(500)
> // Send a new record to cause the previous window to be closed
> payments.pipeInput("percy", "0")
> val keyValues = debt.readKeyValuesToList()
> keyValues should contain theSameElementsAs Seq(
>   // This record is present
>   new KeyValue[String, String]("fred", "5"),
>   // This record is missing
>   new KeyValue[String, String]("george", "20")
> )
> {code}
> Full code available at [https://github.com/Oduig/kstreams-left-join-example]
> Is seems that advancing the wall clock time, sleeping, or sending an extra 
> record, never triggers the join condition when data only arrives on the left 
> side. It is possible to circumvent this by passing an explicit event time 
> with each test record. (See 
> https://stackoverflow.com/questions/73443812/using-kafka-streams-topologytestdriver-how-to-test-left-join-between-two-strea/73540161#73540161)
>  
> However, the behavior deviates from a real Kafka broker. With a real broker, 
> if we do not send an event, it uses the wall clock time of the broker 
> instead. The behavior under test should be the same: 
> `driver.advanceWallClockTime` should provide the default time to be used for 
> `TestTopic.pipeInput`, when no other time is specified.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14173) TopologyTestDriver does not use mock wall clock time when sending test records

2023-05-23 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14173.
-
Resolution: Not A Problem

> TopologyTestDriver does not use mock wall clock time when sending test records
> --
>
> Key: KAFKA-14173
> URL: https://issues.apache.org/jira/browse/KAFKA-14173
> Project: Kafka
>  Issue Type: Bug
>  Components: streams-test-utils
>Affects Versions: 2.3.1
>Reporter: Guido Josquin
>Priority: Minor
>
> I am trying to test a stream-stream join with `TopologyTestDriver`. My goal 
> is to confirm that my topology performs the following left join correctly.
> {code:java}
> bills
>   .leftJoin(payments)(
> {
>   case (billValue, null) => billValue
>   case (billValue, paymentValue) => (billValue.toInt - 
> paymentValue.toInt).toString
> },
> JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100))
>   )
>   .to("debt")
> {code}
>  
> In other words, if we see a `bill` and a `payment` within 100ms, the payment 
> should be subtracted from the bill. If we do not see a payment, the debt is 
> simply the bill.
> Here is the test code.
> {code:java}
> val simpleLeftJoinTopology = new SimpleLeftJoinTopology
> val driver = new TopologyTestDriver(simpleLeftJoinTopology.topology)
> val serde = Serdes.stringSerde
> val bills = driver.createInputTopic("bills", serde.serializer, 
> serde.serializer)
> val payments = driver.createInputTopic("payments", serde.serializer, 
> serde.serializer)
> val debt = driver.createOutputTopic("debt", serde.deserializer, 
> serde.deserializer)
> bills.pipeInput("fred", "100")
> bills.pipeInput("george", "20")
> payments.pipeInput("fred", "95")
> // When in doubt, sleep twice
> driver.advanceWallClockTime(Duration.ofMillis(500))
> Thread.sleep(500)
> // Send a new record to cause the previous window to be closed
> payments.pipeInput("percy", "0")
> val keyValues = debt.readKeyValuesToList()
> keyValues should contain theSameElementsAs Seq(
>   // This record is present
>   new KeyValue[String, String]("fred", "5"),
>   // This record is missing
>   new KeyValue[String, String]("george", "20")
> )
> {code}
> Full code available at [https://github.com/Oduig/kstreams-left-join-example]
> Is seems that advancing the wall clock time, sleeping, or sending an extra 
> record, never triggers the join condition when data only arrives on the left 
> side. It is possible to circumvent this by passing an explicit event time 
> with each test record. (See 
> https://stackoverflow.com/questions/73443812/using-kafka-streams-topologytestdriver-how-to-test-left-join-between-two-strea/73540161#73540161)
>  
> However, the behavior deviates from a real Kafka broker. With a real broker, 
> if we do not send an event, it uses the wall clock time of the broker 
> instead. The behavior under test should be the same: 
> `driver.advanceWallClockTime` should provide the default time to be used for 
> `TestTopic.pipeInput`, when no other time is specified.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered

2023-05-23 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10575:

Labels: kip  (was: )

> StateRestoreListener#onRestoreEnd should always be triggered
> 
>
> Key: KAFKA-10575
> URL: https://issues.apache.org/jira/browse/KAFKA-10575
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: kip
> Fix For: 3.5.0
>
>
> Part of KIP-869: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-869%3A+Improve+Streams+State+Restoration+Visibility]
> Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete 
> the restoration of an active task and transit it to the running state. 
> However the restoration can also be stopped when the restoring task gets 
> closed (because it gets migrated to another client, for example). We should 
> also trigger the callback indicating its progress when the restoration 
> stopped in any scenarios.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered

2023-05-23 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10575:

Description: 
Part of KIP-869: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-869%3A+Improve+Streams+State+Restoration+Visibility]

Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete the 
restoration of an active task and transit it to the running state. However the 
restoration can also be stopped when the restoring task gets closed (because it 
gets migrated to another client, for example). We should also trigger the 
callback indicating its progress when the restoration stopped in any scenarios.

  was:Today we only trigger `StateRestoreListener#onRestoreEnd` when we 
complete the restoration of an active task and transit it to the running state. 
However the restoration can also be stopped when the restoring task gets closed 
(because it gets migrated to another client, for example). We should also 
trigger the callback indicating its progress when the restoration stopped in 
any scenarios.


> StateRestoreListener#onRestoreEnd should always be triggered
> 
>
> Key: KAFKA-10575
> URL: https://issues.apache.org/jira/browse/KAFKA-10575
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 3.5.0
>
>
> Part of KIP-869: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-869%3A+Improve+Streams+State+Restoration+Visibility]
> Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete 
> the restoration of an active task and transit it to the running state. 
> However the restoration can also be stopped when the restoring task gets 
> closed (because it gets migrated to another client, for example). We should 
> also trigger the callback indicating its progress when the restoration 
> stopped in any scenarios.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered

2023-05-23 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-10575.
-
Fix Version/s: 3.5.0
   Resolution: Fixed

> StateRestoreListener#onRestoreEnd should always be triggered
> 
>
> Key: KAFKA-10575
> URL: https://issues.apache.org/jira/browse/KAFKA-10575
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 3.5.0
>
>
> Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete 
> the restoration of an active task and transit it to the running state. 
> However the restoration can also be stopped when the restoring task gets 
> closed (because it gets migrated to another client, for example). We should 
> also trigger the callback indicating its progress when the restoration 
> stopped in any scenarios.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered

2023-05-23 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-10575.
-
Fix Version/s: 3.5.0
   Resolution: Fixed

> StateRestoreListener#onRestoreEnd should always be triggered
> 
>
> Key: KAFKA-10575
> URL: https://issues.apache.org/jira/browse/KAFKA-10575
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 3.5.0
>
>
> Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete 
> the restoration of an active task and transit it to the running state. 
> However the restoration can also be stopped when the restoring task gets 
> closed (because it gets migrated to another client, for example). We should 
> also trigger the callback indicating its progress when the restoration 
> stopped in any scenarios.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered

2023-05-23 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-10575:
---

Assignee: Guozhang Wang  (was: highluck)

> StateRestoreListener#onRestoreEnd should always be triggered
> 
>
> Key: KAFKA-10575
> URL: https://issues.apache.org/jira/browse/KAFKA-10575
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>
> Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete 
> the restoration of an active task and transit it to the running state. 
> However the restoration can also be stopped when the restoring task gets 
> closed (because it gets migrated to another client, for example). We should 
> also trigger the callback indicating its progress when the restoration 
> stopped in any scenarios.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] 3.4.1 RC0

2023-05-22 Thread Matthias J. Sax

Thanks a lot!

-Matthias

On 5/21/23 7:27 PM, Luke Chen wrote:

Hi Matthias,

Yes, I agree we should get this hotfix into 3.4.1.
I've backported into the 3.4 branch.
I'll create a new RC for 3.4.1.

Thanks.
Luke

On Mon, May 22, 2023 at 5:13 AM Matthias J. Sax  wrote:


Hi Luke,

RC0 for 3.4.1 includes a fix for
https://issues.apache.org/jira/browse/KAFKA-14862. We recently
discovered that tge fix itself introduces a regression. We have already
a PR to fix-forward the regression:
https://github.com/apache/kafka/pull/13734

I think we should get the open PR merged, and back part not only to 3.5,
but also to 3.4.1, and get a new RC for 3.4.1.

Thoughts?


-Matthias


On 5/19/23 6:12 AM, Josep Prat wrote:

Hi Luke,
This gets a +1 from my end. I believe non-binding because if I understand
it correctly, binding votes for releases are only issued by PMCs (


https://cwiki.apache.org/confluence/display/KAFKA/Release+Process#ReleaseProcess-Afterthevotepasses

).

I did the following validations:
- Verified signatures and checksums for all the generated artifacts
- Built from source with Java 11 and Scala 2.13.10
- Run unit tests
- Run integration tests
- Run the quickstart with Zookeeper and KRaft

Best,

On Wed, May 17, 2023 at 2:11 PM Josep Prat  wrote:


Hi Luke,

I ran the tests from the source package you created and I didn't get any
of the test failures you had on your CI build. I got other flaky tests
though, that after being run in isolation ran successfully. I'll try to

run

signature validation, and some further testing later today or later this
week.

Best,

On Wed, May 17, 2023 at 12:43 PM Federico Valeri 
wrote:


Hi Luke, thanks for running the release.

Looks like the Maven artifacts are not in staging:



https://repository.apache.org/content/groups/staging/org/apache/kafka/kafka-clients/3.4.1/


Documentation still has 3.4.0, instead of 3.4.1 (not sure if this will
be aligned later):
https://kafka.apache.org/34/documentation.html#producerapi

Br
Fede


On Wed, May 17, 2023 at 5:24 AM Luke Chen  wrote:


Hello Kafka users, developers and client-developers,

This is the first candidate for release of Apache Kafka 3.4.1.

This is a bugfix release with several fixes since the release of

3.4.0.

A

few of the major issues include:
- core
KAFKA-14644 <https://issues.apache.org/jira/browse/KAFKA-14644>

Process

should stop after failure in raft IO thread
KAFKA-14946 <https://issues.apache.org/jira/browse/KAFKA-14946> KRaft
controller node shutting down while renouncing leadership
KAFKA-14887 <https://issues.apache.org/jira/browse/KAFKA-14887> ZK

session

timeout can cause broker to shutdown
- client
KAFKA-14639 <https://issues.apache.org/jira/browse/KAFKA-14639> Kafka
CooperativeStickyAssignor revokes/assigns partition in one rebalance

cycle

- connect
KAFKA-12558 <https://issues.apache.org/jira/browse/KAFKA-12558> MM2

may not

sync partition offsets correctly
KAFKA-14666 <https://issues.apache.org/jira/browse/KAFKA-14666> MM2

should

translate consumer group offsets behind replication flow
- stream
KAFKA-14172 <https://issues.apache.org/jira/browse/KAFKA-14172> bug:

State

stores lose state when tasks are reassigned under EOS

Release notes for the 3.4.1 release:
https://home.apache.org/~showuon/kafka-3.4.1-rc0/RELEASE_NOTES.html

*** Please download, test and vote by May 24, 2023
Kafka's KEYS file containing PGP keys we use to sign the release:
https://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
https://home.apache.org/~showuon/kafka-3.4.1-rc0/

* Maven artifacts to be voted upon:


https://repository.apache.org/content/groups/staging/org/apache/kafka/


* Javadoc:
https://home.apache.org/~showuon/kafka-3.4.1-rc0/javadoc/

* Tag to be voted upon (off 3.4 branch) is the 3.4.1 tag:
https://github.com/apache/kafka/releases/tag/3.4.1-rc0

* Documentation:
https://kafka.apache.org/34/documentation.html

* Protocol:
https://kafka.apache.org/34/protocol.html

The most recent build has had test failures. These all appear to be

due

to

flakiness, but it would be nice if someone more familiar with the

failed

tests could confirm this. I may update this thread with passing build

links

if I can get one, or start a new release vote thread if test failures

must

be addressed beyond re-running builds until they pass.

Unit/integration tests:
https://ci-builds.apache.org/job/Kafka/job/kafka/job/3.4/133/

System tests:
Will update the results later

Thank you.
Luke





--
[image: Aiven] <https://www.aiven.io>

*Josep Prat*
Open Source Engineering Director, *Aiven*
josep.p...@aiven.io   |   +491715557497
aiven.io <https://www.aiven.io>   |
<https://www.facebook.com/aivencloud>
<https://www.linkedin.com/company/aiven/>   <

https://twitter.com/aiven_io>

*Aiven Deutschland GmbH*
Alexanderufer 3-7, 10117 Berlin
Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
Amtsgericht Charlottenburg, HRB 209739 B










Re: [VOTE] 3.4.1 RC0

2023-05-22 Thread Matthias J. Sax

Thanks a lot!

-Matthias

On 5/21/23 7:27 PM, Luke Chen wrote:

Hi Matthias,

Yes, I agree we should get this hotfix into 3.4.1.
I've backported into the 3.4 branch.
I'll create a new RC for 3.4.1.

Thanks.
Luke

On Mon, May 22, 2023 at 5:13 AM Matthias J. Sax  wrote:


Hi Luke,

RC0 for 3.4.1 includes a fix for
https://issues.apache.org/jira/browse/KAFKA-14862. We recently
discovered that tge fix itself introduces a regression. We have already
a PR to fix-forward the regression:
https://github.com/apache/kafka/pull/13734

I think we should get the open PR merged, and back part not only to 3.5,
but also to 3.4.1, and get a new RC for 3.4.1.

Thoughts?


-Matthias


On 5/19/23 6:12 AM, Josep Prat wrote:

Hi Luke,
This gets a +1 from my end. I believe non-binding because if I understand
it correctly, binding votes for releases are only issued by PMCs (


https://cwiki.apache.org/confluence/display/KAFKA/Release+Process#ReleaseProcess-Afterthevotepasses

).

I did the following validations:
- Verified signatures and checksums for all the generated artifacts
- Built from source with Java 11 and Scala 2.13.10
- Run unit tests
- Run integration tests
- Run the quickstart with Zookeeper and KRaft

Best,

On Wed, May 17, 2023 at 2:11 PM Josep Prat  wrote:


Hi Luke,

I ran the tests from the source package you created and I didn't get any
of the test failures you had on your CI build. I got other flaky tests
though, that after being run in isolation ran successfully. I'll try to

run

signature validation, and some further testing later today or later this
week.

Best,

On Wed, May 17, 2023 at 12:43 PM Federico Valeri 
wrote:


Hi Luke, thanks for running the release.

Looks like the Maven artifacts are not in staging:



https://repository.apache.org/content/groups/staging/org/apache/kafka/kafka-clients/3.4.1/


Documentation still has 3.4.0, instead of 3.4.1 (not sure if this will
be aligned later):
https://kafka.apache.org/34/documentation.html#producerapi

Br
Fede


On Wed, May 17, 2023 at 5:24 AM Luke Chen  wrote:


Hello Kafka users, developers and client-developers,

This is the first candidate for release of Apache Kafka 3.4.1.

This is a bugfix release with several fixes since the release of

3.4.0.

A

few of the major issues include:
- core
KAFKA-14644 <https://issues.apache.org/jira/browse/KAFKA-14644>

Process

should stop after failure in raft IO thread
KAFKA-14946 <https://issues.apache.org/jira/browse/KAFKA-14946> KRaft
controller node shutting down while renouncing leadership
KAFKA-14887 <https://issues.apache.org/jira/browse/KAFKA-14887> ZK

session

timeout can cause broker to shutdown
- client
KAFKA-14639 <https://issues.apache.org/jira/browse/KAFKA-14639> Kafka
CooperativeStickyAssignor revokes/assigns partition in one rebalance

cycle

- connect
KAFKA-12558 <https://issues.apache.org/jira/browse/KAFKA-12558> MM2

may not

sync partition offsets correctly
KAFKA-14666 <https://issues.apache.org/jira/browse/KAFKA-14666> MM2

should

translate consumer group offsets behind replication flow
- stream
KAFKA-14172 <https://issues.apache.org/jira/browse/KAFKA-14172> bug:

State

stores lose state when tasks are reassigned under EOS

Release notes for the 3.4.1 release:
https://home.apache.org/~showuon/kafka-3.4.1-rc0/RELEASE_NOTES.html

*** Please download, test and vote by May 24, 2023
Kafka's KEYS file containing PGP keys we use to sign the release:
https://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
https://home.apache.org/~showuon/kafka-3.4.1-rc0/

* Maven artifacts to be voted upon:


https://repository.apache.org/content/groups/staging/org/apache/kafka/


* Javadoc:
https://home.apache.org/~showuon/kafka-3.4.1-rc0/javadoc/

* Tag to be voted upon (off 3.4 branch) is the 3.4.1 tag:
https://github.com/apache/kafka/releases/tag/3.4.1-rc0

* Documentation:
https://kafka.apache.org/34/documentation.html

* Protocol:
https://kafka.apache.org/34/protocol.html

The most recent build has had test failures. These all appear to be

due

to

flakiness, but it would be nice if someone more familiar with the

failed

tests could confirm this. I may update this thread with passing build

links

if I can get one, or start a new release vote thread if test failures

must

be addressed beyond re-running builds until they pass.

Unit/integration tests:
https://ci-builds.apache.org/job/Kafka/job/kafka/job/3.4/133/

System tests:
Will update the results later

Thank you.
Luke





--
[image: Aiven] <https://www.aiven.io>

*Josep Prat*
Open Source Engineering Director, *Aiven*
josep.p...@aiven.io   |   +491715557497
aiven.io <https://www.aiven.io>   |
<https://www.facebook.com/aivencloud>
<https://www.linkedin.com/company/aiven/>   <

https://twitter.com/aiven_io>

*Aiven Deutschland GmbH*
Alexanderufer 3-7, 10117 Berlin
Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
Amtsgericht Charlottenburg, HRB 209739 B










Re: [VOTE] 3.4.1 RC0

2023-05-21 Thread Matthias J. Sax

Hi Luke,

RC0 for 3.4.1 includes a fix for 
https://issues.apache.org/jira/browse/KAFKA-14862. We recently 
discovered that tge fix itself introduces a regression. We have already 
a PR to fix-forward the regression: 
https://github.com/apache/kafka/pull/13734


I think we should get the open PR merged, and back part not only to 3.5, 
but also to 3.4.1, and get a new RC for 3.4.1.


Thoughts?


-Matthias


On 5/19/23 6:12 AM, Josep Prat wrote:

Hi Luke,
This gets a +1 from my end. I believe non-binding because if I understand
it correctly, binding votes for releases are only issued by PMCs (
https://cwiki.apache.org/confluence/display/KAFKA/Release+Process#ReleaseProcess-Afterthevotepasses
).

I did the following validations:
- Verified signatures and checksums for all the generated artifacts
- Built from source with Java 11 and Scala 2.13.10
- Run unit tests
- Run integration tests
- Run the quickstart with Zookeeper and KRaft

Best,

On Wed, May 17, 2023 at 2:11 PM Josep Prat  wrote:


Hi Luke,

I ran the tests from the source package you created and I didn't get any
of the test failures you had on your CI build. I got other flaky tests
though, that after being run in isolation ran successfully. I'll try to run
signature validation, and some further testing later today or later this
week.

Best,

On Wed, May 17, 2023 at 12:43 PM Federico Valeri 
wrote:


Hi Luke, thanks for running the release.

Looks like the Maven artifacts are not in staging:

https://repository.apache.org/content/groups/staging/org/apache/kafka/kafka-clients/3.4.1/

Documentation still has 3.4.0, instead of 3.4.1 (not sure if this will
be aligned later):
https://kafka.apache.org/34/documentation.html#producerapi

Br
Fede


On Wed, May 17, 2023 at 5:24 AM Luke Chen  wrote:


Hello Kafka users, developers and client-developers,

This is the first candidate for release of Apache Kafka 3.4.1.

This is a bugfix release with several fixes since the release of 3.4.0.

A

few of the major issues include:
- core
KAFKA-14644  Process
should stop after failure in raft IO thread
KAFKA-14946  KRaft
controller node shutting down while renouncing leadership
KAFKA-14887  ZK

session

timeout can cause broker to shutdown
- client
KAFKA-14639  Kafka
CooperativeStickyAssignor revokes/assigns partition in one rebalance

cycle

- connect
KAFKA-12558  MM2

may not

sync partition offsets correctly
KAFKA-14666  MM2

should

translate consumer group offsets behind replication flow
- stream
KAFKA-14172  bug:

State

stores lose state when tasks are reassigned under EOS

Release notes for the 3.4.1 release:
https://home.apache.org/~showuon/kafka-3.4.1-rc0/RELEASE_NOTES.html

*** Please download, test and vote by May 24, 2023
Kafka's KEYS file containing PGP keys we use to sign the release:
https://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
https://home.apache.org/~showuon/kafka-3.4.1-rc0/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/org/apache/kafka/

* Javadoc:
https://home.apache.org/~showuon/kafka-3.4.1-rc0/javadoc/

* Tag to be voted upon (off 3.4 branch) is the 3.4.1 tag:
https://github.com/apache/kafka/releases/tag/3.4.1-rc0

* Documentation:
https://kafka.apache.org/34/documentation.html

* Protocol:
https://kafka.apache.org/34/protocol.html

The most recent build has had test failures. These all appear to be due

to

flakiness, but it would be nice if someone more familiar with the failed
tests could confirm this. I may update this thread with passing build

links

if I can get one, or start a new release vote thread if test failures

must

be addressed beyond re-running builds until they pass.

Unit/integration tests:
https://ci-builds.apache.org/job/Kafka/job/kafka/job/3.4/133/

System tests:
Will update the results later

Thank you.
Luke





--
[image: Aiven] 

*Josep Prat*
Open Source Engineering Director, *Aiven*
josep.p...@aiven.io   |   +491715557497
aiven.io    |

   
*Aiven Deutschland GmbH*
Alexanderufer 3-7, 10117 Berlin
Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
Amtsgericht Charlottenburg, HRB 209739 B






Re: [VOTE] 3.4.1 RC0

2023-05-21 Thread Matthias J. Sax

Hi Luke,

RC0 for 3.4.1 includes a fix for 
https://issues.apache.org/jira/browse/KAFKA-14862. We recently 
discovered that tge fix itself introduces a regression. We have already 
a PR to fix-forward the regression: 
https://github.com/apache/kafka/pull/13734


I think we should get the open PR merged, and back part not only to 3.5, 
but also to 3.4.1, and get a new RC for 3.4.1.


Thoughts?


-Matthias


On 5/19/23 6:12 AM, Josep Prat wrote:

Hi Luke,
This gets a +1 from my end. I believe non-binding because if I understand
it correctly, binding votes for releases are only issued by PMCs (
https://cwiki.apache.org/confluence/display/KAFKA/Release+Process#ReleaseProcess-Afterthevotepasses
).

I did the following validations:
- Verified signatures and checksums for all the generated artifacts
- Built from source with Java 11 and Scala 2.13.10
- Run unit tests
- Run integration tests
- Run the quickstart with Zookeeper and KRaft

Best,

On Wed, May 17, 2023 at 2:11 PM Josep Prat  wrote:


Hi Luke,

I ran the tests from the source package you created and I didn't get any
of the test failures you had on your CI build. I got other flaky tests
though, that after being run in isolation ran successfully. I'll try to run
signature validation, and some further testing later today or later this
week.

Best,

On Wed, May 17, 2023 at 12:43 PM Federico Valeri 
wrote:


Hi Luke, thanks for running the release.

Looks like the Maven artifacts are not in staging:

https://repository.apache.org/content/groups/staging/org/apache/kafka/kafka-clients/3.4.1/

Documentation still has 3.4.0, instead of 3.4.1 (not sure if this will
be aligned later):
https://kafka.apache.org/34/documentation.html#producerapi

Br
Fede


On Wed, May 17, 2023 at 5:24 AM Luke Chen  wrote:


Hello Kafka users, developers and client-developers,

This is the first candidate for release of Apache Kafka 3.4.1.

This is a bugfix release with several fixes since the release of 3.4.0.

A

few of the major issues include:
- core
KAFKA-14644  Process
should stop after failure in raft IO thread
KAFKA-14946  KRaft
controller node shutting down while renouncing leadership
KAFKA-14887  ZK

session

timeout can cause broker to shutdown
- client
KAFKA-14639  Kafka
CooperativeStickyAssignor revokes/assigns partition in one rebalance

cycle

- connect
KAFKA-12558  MM2

may not

sync partition offsets correctly
KAFKA-14666  MM2

should

translate consumer group offsets behind replication flow
- stream
KAFKA-14172  bug:

State

stores lose state when tasks are reassigned under EOS

Release notes for the 3.4.1 release:
https://home.apache.org/~showuon/kafka-3.4.1-rc0/RELEASE_NOTES.html

*** Please download, test and vote by May 24, 2023
Kafka's KEYS file containing PGP keys we use to sign the release:
https://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
https://home.apache.org/~showuon/kafka-3.4.1-rc0/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/org/apache/kafka/

* Javadoc:
https://home.apache.org/~showuon/kafka-3.4.1-rc0/javadoc/

* Tag to be voted upon (off 3.4 branch) is the 3.4.1 tag:
https://github.com/apache/kafka/releases/tag/3.4.1-rc0

* Documentation:
https://kafka.apache.org/34/documentation.html

* Protocol:
https://kafka.apache.org/34/protocol.html

The most recent build has had test failures. These all appear to be due

to

flakiness, but it would be nice if someone more familiar with the failed
tests could confirm this. I may update this thread with passing build

links

if I can get one, or start a new release vote thread if test failures

must

be addressed beyond re-running builds until they pass.

Unit/integration tests:
https://ci-builds.apache.org/job/Kafka/job/kafka/job/3.4/133/

System tests:
Will update the results later

Thank you.
Luke





--
[image: Aiven] 

*Josep Prat*
Open Source Engineering Director, *Aiven*
josep.p...@aiven.io   |   +491715557497
aiven.io    |

   
*Aiven Deutschland GmbH*
Alexanderufer 3-7, 10117 Berlin
Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
Amtsgericht Charlottenburg, HRB 209739 B






Re: [DISCUSS] Apache Kafka 3.5.0 release

2023-05-19 Thread Matthias J. Sax
pache.org/jira/browse/KAFKA-14840

(nearly

done)


I just wanted to check with you before

cherry-picking

these

to

3.5


David


On Mon, Apr 24, 2023 at 1:18 PM Mickael Maison <

mickael.mai...@gmail.com


wrote:


Hi Justine,

That makes sense. Feel free to revert that

commit

in

3.5.


Thanks,
Mickael

On Mon, Apr 24, 2023 at 7:16 PM Mickael Maison

<

mickael.mai...@gmail.com>

wrote:


Hi Josep,

Thanks for letting me know!

On Mon, Apr 24, 2023 at 6:58 PM Justine

Olshan

 wrote:


Hey Mickael,

I've just opened a blocker to revert

KAFKA-14561

in

3.5.

There

are

a

few

blocker bugs that I don't think I can fix

before

the

code

freeze,

so I

think for the quality of the release, we

should

just

revert the

commit.


Thanks,
Justine

On Fri, Apr 21, 2023 at 1:23 PM Josep Prat




wrote:


Hi Mickael,

Greg Harris managed to fix a flaky test

in



https://github.com/apache/kafka/pull/13575,

I

cherry-picked

it

to

the 3.5

(and 3.4) branch. I updated the Jira to

reflect

that

is

now

fixed on

3.5.0

as well as 3.6.0.
Let me know if I forgot anything.

Best,

On Fri, Apr 21, 2023 at 3:44 PM Mickael

Maison

<

mickael.mai...@gmail.com>

wrote:


Hi,

Just a quick reminder that code freeze

is

next

week.

We still have 27 JIRAs targeting 3.5

[0]

including

quite a

few

bugs

and flaky test issues opened recently.

If

you

have

time,

take

one

of

these items or help with the reviews.

I'll send another update next once

we've

entered

code

freeze.


0:




















https://issues.apache.org/jira/browse/KAFKA-13421?jql=project%20%3D%20KAFKA%20AND%20fixVersion%20%3D%203.5.0%20AND%20status%20not%20in%20(resolved%2C%20closed)%20ORDER%20BY%20priority%20DESC%2C%20status%20DESC%2C%20updated%20DESC


Thanks,
Mickael

On Thu, Apr 20, 2023 at 9:14 PM Mickael

Maison <

mickael.mai...@gmail.com


wrote:


Hi Ron,

Yes feel free to merge that fix.

Thanks

for

letting

me

know!


Mickael

On Thu, Apr 20, 2023 at 8:15 PM Ron

Dagostino <

rndg...@gmail.com



wrote:


Hi Mickael.  I would like to merge


https://github.com/apache/kafka/pull/13532

(KAFKA-14887:

No

shutdown

for ZK session expiration in

feature

processing) to

the 3.5

branch.

It is a very small and focused fix

that

can

cause

unexpected

broker

shutdowns when there is

instability in

the

connectivity to

ZooKeeper.

The risk is very low.

Ron


On Tue, Apr 18, 2023 at 9:57 AM

Mickael

Maison

<

mickael.mai...@gmail.com> wrote:


Hi David,

Thanks for the update. I've

marked

KAFKA-14869 as

fixed

in

3.5.0, I

guess you'll only resolve this

ticket

once

you

merge

the

backports

to

earlier branches. The ticket will

have

to

be

resolved to

run

the

release but that should leave you

enough

time.


Thanks,
Mickael

On Tue, Apr 18, 2023 at 3:42 PM

David

Jacot

 wrote:


Hi Mickael,

FYI - I just merged the two

PRs for

KIP-915 to

trunk/3.5.

We are

all good.


Cheers,
David

On Mon, Apr 17, 2023 at 5:10 PM

Mickael

Maison

<

mickael.mai...@gmail.com>

wrote:


Hi Chris,

I was looking at that just

now!

As

you

said,

the

PRs

merged

provide

some functionality so I think

it's

fine

to

deliver

the

KIP

across 2

releases.
I left a comment in



https://issues.apache.org/jira/browse/KAFKA-14876

to document what's in 3.5.

Thanks,
Mickael


On Mon, Apr 17, 2023 at

5:05 PM

Chris

Egerton



wrote:


Hi Mickael,

It looks like we missed the

feature

freeze

cutoff

for

part

but

not all of

KIP-875 [1]. The features

we've

been

able

to

merge

so

far are

the new

STOPPED state for

connectors

[2]

and

the

API

for

reading

offsets [3]. The

features we have not been

able

to

merge yet

are the

APIs for

altering and

resetting offsets.

The already-merged

features are

useful

on

their own

and I

believe it

should

be acceptable to release

them

separately

from

the

not-yet-merged ones,

but

wanted to double-check

with you

that

it's

alright

to

split

this KIP

across

two or more releases,

starting

with

3.5.0.


Cheers,

Chris

[1] -
























https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect

[2] -

https://github.com/apache/kafka/pull/13424

[3] -

https://github.com/apache/kafka/pull/13434


On Fri, Apr 14, 2023 at

10:13 AM

Matthias

J.

Sax <

mj...@apache.org>

wrote:



Thanks a lot!

On 4/14/23 5:32 AM,

Mickael

Maison

wrote:

Hi Matthias,

I merged the PR before

cutting

the

3.5

branch.


Thanks,
Mickael

On Fri, Apr 14, 2023 at

2:31 PM

Mickael

Maison

<

mickael.mai...@gmail.com>

wrote:


Hi David,

I've created the 3.5

branch.

Feel

free to

cherry

pick

these 2

commits

when they are ready.

Thanks,
Mickael

On Fri, Apr 14, 2023

at

11:23 AM

Satish

Duggana

<

satish.dugg...@gmail.com



wrote:


Thanks Luke for

helping

with

the

reviews

and

adding a

few tests in

a

couple of PRs.

Hi Mickae

Re: Query regarding implementation of KStreams with Hbase

2023-05-12 Thread Matthias J. Sax
Kafka Streams is designed to read and write from a broker cluster. It's 
not designed to write data to different system like HBase.


If you want to get data from Kafka to HBase, you should use Kafka Connect.

Of course, it's possible (but not recommended) to implement your own 
`Processor` and do whatever you want with the data inside Kafka Streams.


HTH.

-Matthias

On 5/11/23 10:38 AM, Rohit M wrote:

Hi team,

There is lot in internet where Kstreams read data from topic , perform some
transformation and write it back to a topic. But I wonder if write to hbase
table is possible with KStream?. And what I mean by this is that we should
read data from topic using KStream , perform some operations like we do on
a dataframe and then write it to a hbase table
I didn't find any resources on internet implementing KStreams with hbase. I
would be glad if could get some help with a piece of code in scala
preferably to read from a topic or even a hbase table using KStream
application , perform some transformation and write it to a hbase table


Regards
Rohit M



Re: Some questions on Kafka on order of messages with mutiple partitions

2023-05-12 Thread Matthias J. Sax

Does having  9 partitions with 9 replication factors make sense here?


A replication factor of 9 sounds very high. For production, replication 
factor of 3 is recommended.


How many partitions you want/need is a different question, and cannot be 
answered in a general way.



"Yes" to all other questions.


-Matthias



On 5/12/23 9:50 AM, Mich Talebzadeh wrote:

Hi,

I have used Apache Kafka in conjunction with Spark as a messaging 
source. This rather dated diagram describes it


I have two physical hosts each 64 GB, running RHES 7.6, these are called 
rhes75 and rhes76 respectively. The Zookeeper version is 3.7.1 and kafka 
version is 3.4.0



image.png
I have a topic md -> MarketData that has been defined as below

kafka-topics.sh --create --bootstrap-server 
rhes75:9092,rhes75:9093,rhes75:9094,rhes76:9092,rhes76:9093,rhes76:9094,rhes76:9095,rhes76:9096, rhes76:9097 --replication-factor 9 --partitions 9 --topic md


kafka-topics.sh --describe --bootstrap-server 
rhes75:9092,rhes75:9093,rhes75:9094,rhes76:9092,rhes76:9093,rhes76:9094,rhes76:9095,rhes76:9096, rhes76:9097 --topic md



This is working fine

Topic: md       TopicId: UfQly87bQPCbVKoH-PQheg PartitionCount: 9   
ReplicationFactor: 9    Configs: segment.bytes=1073741824
         Topic: md       Partition: 0    Leader: 12      Replicas: 
12,10,8,2,9,11,1,7,3  Isr: 10,1,9,2,12,7,3,11,8
         Topic: md       Partition: 1    Leader: 9       Replicas: 
9,8,2,12,11,1,7,3,10  Isr: 10,1,9,2,12,7,3,11,8
         Topic: md       Partition: 2    Leader: 11      Replicas: 
11,2,12,9,1,7,3,10,8  Isr: 10,1,9,2,12,7,3,11,8
         Topic: md       Partition: 3    Leader: 1       Replicas: 
1,12,9,11,7,3,10,8,2  Isr: 10,1,9,2,12,7,3,11,8
         Topic: md       Partition: 4    Leader: 7       Replicas: 
7,9,11,1,3,10,8,2,12  Isr: 10,1,9,2,12,7,3,11,8
         Topic: md       Partition: 5    Leader: 3       Replicas: 
3,11,1,7,10,8,2,12,9  Isr: 10,1,9,2,12,7,3,11,8
         Topic: md       Partition: 6    Leader: 10      Replicas: 
10,1,7,3,8,2,12,9,11  Isr: 10,1,9,2,12,7,3,11,8
         Topic: md       Partition: 7    Leader: 8       Replicas: 
8,7,3,10,2,12,9,11,1  Isr: 10,1,9,2,12,7,3,11,8
         Topic: md       Partition: 8    Leader: 2       Replicas: 
2,3,10,8,12,9,11,1,7  Isr: 10,1,9,2,12,7,3,11,8


However, I have a number of questions

 1. Does having  9 partitions with 9 replication factors make sense here?
 2. As I understand the parallelism is equal to the number of partitions
for a topic.
 3. Kafka only provides a total order over messages *within a
partition*, not between different partitions in a topic and in
this case I have one topic
 4.

Data within a Partition will be stored in the order in which it is
written, therefore, data read from a Partition will be read in order
for that partition?

 5.

Finally if I want to get messages in order across multiple all 9
partitionss, then I need to group messages with a key, so that
messages with the samekey goto the samepartition and withinthat
partition the messages are ordered

Thanks


*Disclaimer:* Use it at your own risk.Any and all responsibility for any 
loss, damage or destruction of data or any other property which may 
arise from relying on this email's technical content is explicitly 
disclaimed. The author will in no case be liable for any monetary 
damages arising from such loss, damage or destruction.




[jira] [Commented] (KAFKA-13349) Allow Iterator.remove on KeyValueIterator

2023-05-11 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17722005#comment-17722005
 ] 

Matthias J. Sax commented on KAFKA-13349:
-

Yes, we want to add `remove()` to interface 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java]
 and thus all implementation will need to support it.

> Allow Iterator.remove on KeyValueIterator
> -
>
> Key: KAFKA-13349
> URL: https://issues.apache.org/jira/browse/KAFKA-13349
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip, newbie++
>
> Today Stream's state store's range iterator does not support `remove`. We 
> could consider adding such support for all the built-in state stores:
> * RocksDB's native iterator does not support removal, but we can always do a 
> delete(key) concurrently while the iterator is open on the snapshot.
> * In-Memory: straight forward implementation.
> The benefit of that is then for range-and-delete truncation operation we do 
> not necessarily have to be cautious about concurrent modification exceptions. 
> This could also help GC with in-memory stores.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14911) Add system tests for rolling upgrade path of KIP-904

2023-05-11 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17722004#comment-17722004
 ] 

Matthias J. Sax commented on KAFKA-14911:
-

No worries. And thanks for helping on reviewing! Equally important.

> Add system tests for rolling upgrade path of KIP-904
> 
>
> Key: KAFKA-14911
> URL: https://issues.apache.org/jira/browse/KAFKA-14911
> Project: Kafka
>  Issue Type: Test
>Reporter: Farooq Qaiser
>Assignee: Victoria Xia
>Priority: Major
> Fix For: 3.5.0
>
>
> As per [~mjsax] comment 
> [here|https://github.com/apache/kafka/pull/10747#pullrequestreview-1376539752],
>  we should add a system test to test the rolling upgrade path for 
> [KIP-904|https://cwiki.apache.org/confluence/x/P5VbDg] which introduces a new 
> serialization format for groupBy internal repartition topics and was 
> implemented as part of https://issues.apache.org/jira/browse/KAFKA-12446 
> There is `StreamsUpgradeTest.java` and `streams_upgrade_test.py` (cf 
> `test_rolling_upgrade_with_2_bounces`) as a starting point.
> Might be best to do a similar thing as for FK-joins, and add a new test 
> variation. 
> The tricky thing about the test would be, to ensure that the repartition 
> topic is not empty when we do the bounce, so the test should be setup 
> accordingly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14981) Set `group.instance.id` in streams consumer so that rebalance will not happen if a instance is restarted

2023-05-11 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17722003#comment-17722003
 ] 

Matthias J. Sax commented on KAFKA-14981:
-

I was not aware that there was (or maybe still are) issue. Are there any 
tickets for it?

> Set `group.instance.id` in streams consumer so that rebalance will not happen 
> if a instance is restarted
> 
>
> Key: KAFKA-14981
> URL: https://issues.apache.org/jira/browse/KAFKA-14981
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hao Li
>Priority: Minor
>
> `group.instance.id` enables static membership so that if a consumer is 
> restarted within `session.timeout.ms`, rebalance will not be triggered and 
> originally assignment can be returned directly from broker. We can set this 
> id in Kafka streams using `threadId` so that no rebalance is trigger within 
> `session.timeout.ms`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: apply for permission to contribute to Apache Kafka

2023-05-10 Thread Matthias J. Sax
I just checked permissions and you should be all set. Did you try to log 
out and log in again?


-Matthias

On 5/9/23 10:04 PM, Doe John wrote:

Thanks,

After obtaining permission, I want to assign this JIRA ticket 
 to myself, but there's no 「Assign」 button for me.

image.png
Is there any problem here?

Best Regards,
John Doe



Luke Chen mailto:show...@gmail.com>> 于2023年5月10日 
周三 01:04写道:


Done.

Thanks.
Luke

On Sat, May 6, 2023 at 9:38 PM Doe John mailto:zh2725284...@gmail.com>> wrote:

 > my Jira ID: johndoe
 >
 > on email zh2725284...@gmail.com 
 >
 > Thanks!
 >



Re: Question ❓

2023-05-10 Thread Matthias J. Sax

Partitions are not for different users.

If you want to isolate users, you would do it at the topic level. You 
could use ACLs to grant access to different topics: 
https://kafka.apache.org/documentation/#security_authz



-Matthias

On 5/9/23 11:11 AM, влад тасканов wrote:


Hi. I recently start­ed studying kafka and raised a question. Is it possible 
for each user to make a separate queue? as I understand it, there is a broker 
with different topics, and each topic had the number of partitions = the number 
of use­rs. if yes, you can link to an example or explanation. Google didn't 
help me.


[jira] [Commented] (KAFKA-14981) Set `group.instance.id` in streams consumer so that rebalance will not happen if a instance is restarted

2023-05-09 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17721114#comment-17721114
 ] 

Matthias J. Sax commented on KAFKA-14981:
-

Very interesting idea – given that we persist the thread-id (aka process-id) in 
the state directory on local disk, it could help. And even if we don't persist 
it (because there is no local storage), it seems no harm would be done if the 
id changes every single time.

Wondering if we would need a KIP for this. By gut feeling is no, but not sure.

> Set `group.instance.id` in streams consumer so that rebalance will not happen 
> if a instance is restarted
> 
>
> Key: KAFKA-14981
> URL: https://issues.apache.org/jira/browse/KAFKA-14981
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hao Li
>Priority: Minor
>
> `group.instance.id` enables static membership so that if a consumer is 
> restarted within `session.timeout.ms`, rebalance will not be triggered and 
> originally assignment can be returned directly from broker. We can set this 
> id in Kafka streams using `threadId` so that no rebalance is trigger within 
> `session.timeout.ms`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-923: Add A Grace Period to Stream Table Join

2023-05-08 Thread Matthias J. Sax

Thanks for the KIP! Also some question/comments from my side:

10) Notation: you use the term "late data" but I think you mean 
out-of-order. We reserve the term "late" to records that arrive after 
grace period passed, and thus, "late == out-of-order data that is dropped".



20) "There is only one option from the stream side and only recently is 
there a second option on the table side."


What are those options? Victoria already asked about the table side, but 
I am also not sure what option you mean for the stream side?



30) "If the table side uses a materialized version store the value is 
the latest by stream time rather than by offset within its defined grace 
period."


The phrase "the value is the latest by stream time" is confusing -- in 
the end, a versioned stores multiple versions, not just one.



40) I am also wondering about ordering. In general, KS tries to preserve 
offset-order during processing (with some exception, when offset order 
preservation is not clearly defined). Given that the stream-side buffer 
is really just a "linear buffer", we could easily preserve offset-order. 
But I also see a benefit of re-ordering and emitting out-of-order data 
right away when read (instead of blocking them behind in-order records 
that are not ready yet). -- It might even be a possibility, to let users 
pick a emit strategy eg "EmitStrategy.preserveOffsets" (name just a 
placeholder).


The KIP should explain this in more detail and also discuss different 
options and mention them in "Rejected alternatives" in case we don't 
want to include them.



50) What happens when users change the grace period? Especially, when 
they turn it on/off (but also increasing/decreasing is an interesting 
point)? I think we should try to support this if possible; the 
"Compatibility" section needs to cover switching on/off in more detail.



-Matthias




On 5/2/23 2:06 PM, Victoria Xia wrote:

Cool KIP, Walker! Thanks for sharing this proposal.

A few clarifications:

1. Is the order that records exit the buffer in necessarily the same as the
order that records enter the buffer in, or no? Based on the description in
the KIP, it sounds like the answer is no, i.e., records will exit the
buffer in increasing timestamp order, which means that they may be ordered
(even for the same key) compared to the input order.

2. What happens if the join grace period is nonzero, and a stream-side
record arrives with a timestamp that is older than the current stream time
minus the grace period? Will this record trigger a join result, or will it
be dropped? Based on the description for what happens when the join grace
period is set to zero, it sounds like the late record will be dropped, even
if the join grace period is nonzero. Is that true?

3. What could cause stream time to advance, for purposes of removing
records from the join buffer? For example, will new records arriving on the
table side of the join cause stream time to advance? From the KIP it sounds
like only stream-side records will advance stream time -- does that mean
that the join processor itself will have to track this stream time?

Also +1 to Lucas's question about what options will be available for
configuring the join buffer. Will users have the option to choose whether
they want the buffer to be in-memory vs persistent?

- Victoria

On Fri, Apr 28, 2023 at 11:54 AM Lucas Brutschy
 wrote:


HI Walker,

thanks for the KIP! We definitely need this. I have two questions:

  - Have you considered allowing the customization of the underlying
buffer implementation? As I can see, `StreamJoined` lets you customize
the underlying store via a `WindowStoreSupplier`. Would it make sense
for `Joined` to have this as well? I can imagine one may want to limit
the number of records in the buffer, for example. If we hit the
maximum, the only option would be to drop semantic guarantees, but
users may still want to do this.
  - With "second option on the table side" you are referring to
versioned tables, right? Will the buffer on the stream side behave any
different whether the table side is versioned or not?

Finally, I think a simple example in the motivation section could help
non-experts understand the KIP.

Best,
Lucas

On Tue, Apr 25, 2023 at 9:13 PM Walker Carlson
 wrote:


Hello everybody,

I have a stream proposal to improve the stream table join by adding a

grace

period and buffer to the stream side of the join to allow processing in
timestamp order matching the recent improvements of the versioned tables.

Please take a look here 

and

share your thoughts.

best,
Walker






[jira] [Commented] (KAFKA-14957) Default value for state.dir is confusing

2023-05-03 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17719079#comment-17719079
 ] 

Matthias J. Sax commented on KAFKA-14957:
-

Ah. Thanks.

That's gonna be nasty to fix... This part of the docs is generated from the 
code... So it depends on the platform that does the build what ends up in the 
docs. Would require a larger change to generate it differently...

> Default value for state.dir is confusing
> 
>
> Key: KAFKA-14957
> URL: https://issues.apache.org/jira/browse/KAFKA-14957
> Project: Kafka
>  Issue Type: Bug
>  Components: docs, streams
>Reporter: Mickael Maison
>Priority: Minor
>  Labels: beginner, newbie
>
> The default value for state.dir is documented as 
> /var/folders/0t/68svdzmx1sld0mxjl8dgmmzmgq/T//kafka-streams
> This is misleading, the value will be different in each environment as it 
> computed using System.getProperty("java.io.tmpdir"). We should update the 
> description to mention how the path is computed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14957) Default value for state.dir is confusing

2023-05-02 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14957:

Component/s: docs

> Default value for state.dir is confusing
> 
>
> Key: KAFKA-14957
> URL: https://issues.apache.org/jira/browse/KAFKA-14957
> Project: Kafka
>  Issue Type: Bug
>  Components: docs, streams
>Reporter: Mickael Maison
>Priority: Minor
>
> The default value for state.dir is documented as 
> /var/folders/0t/68svdzmx1sld0mxjl8dgmmzmgq/T//kafka-streams
> This is misleading, the value will be different in each environment as it 
> computed using System.getProperty("java.io.tmpdir"). We should update the 
> description to mention how the path is computed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14957) Default value for state.dir is confusing

2023-05-02 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14957:

Labels: beginner newbie  (was: )

> Default value for state.dir is confusing
> 
>
> Key: KAFKA-14957
> URL: https://issues.apache.org/jira/browse/KAFKA-14957
> Project: Kafka
>  Issue Type: Bug
>  Components: docs, streams
>Reporter: Mickael Maison
>Priority: Minor
>  Labels: beginner, newbie
>
> The default value for state.dir is documented as 
> /var/folders/0t/68svdzmx1sld0mxjl8dgmmzmgq/T//kafka-streams
> This is misleading, the value will be different in each environment as it 
> computed using System.getProperty("java.io.tmpdir"). We should update the 
> description to mention how the path is computed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14957) Default value for state.dir is confusing

2023-05-02 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14957:

Priority: Minor  (was: Major)

> Default value for state.dir is confusing
> 
>
> Key: KAFKA-14957
> URL: https://issues.apache.org/jira/browse/KAFKA-14957
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Mickael Maison
>Priority: Minor
>
> The default value for state.dir is documented as 
> /var/folders/0t/68svdzmx1sld0mxjl8dgmmzmgq/T//kafka-streams
> This is misleading, the value will be different in each environment as it 
> computed using System.getProperty("java.io.tmpdir"). We should update the 
> description to mention how the path is computed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14957) Default value for state.dir is confusing

2023-05-02 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17718759#comment-17718759
 ] 

Matthias J. Sax commented on KAFKA-14957:
-

[~mimaison] – Thanks.

What part of the docs are you referring too?

> Default value for state.dir is confusing
> 
>
> Key: KAFKA-14957
> URL: https://issues.apache.org/jira/browse/KAFKA-14957
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Mickael Maison
>Priority: Major
>
> The default value for state.dir is documented as 
> /var/folders/0t/68svdzmx1sld0mxjl8dgmmzmgq/T//kafka-streams
> This is misleading, the value will be different in each environment as it 
> computed using System.getProperty("java.io.tmpdir"). We should update the 
> description to mention how the path is computed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Adding non-committers as Github collaborators

2023-04-28 Thread Matthias J. Sax

In general I am +1

The only question I have is about


You may only have 20 active collaborators at any given time per repository.


Not sure if this is a concern or not? I would assume not, but wanted to 
bring it to everyone's attention.


There is actually also a way to allow people to re-trigger Jenkins jobs: 
https://github.com/apache/kafka/pull/13578


Retriggering test is a little bit more sensitive as our resources are 
limited, and we should avoid overwhelming Jenkins even more.



-Matthias


On 4/27/23 11:45 AM, David Arthur wrote:

Hey folks,

I stumbled across this wiki page from the infra team that describes the
various features supported in the ".asf.yaml" file:
https://cwiki.apache.org/confluence/display/INFRA/Git+-+.asf.yaml+features

One section that looked particularly interesting was
https://cwiki.apache.org/confluence/display/INFRA/Git+-+.asf.yaml+features#Git.asf.yamlfeatures-AssigningexternalcollaboratorswiththetriageroleonGitHub

github:
   collaborators:
 - userA
 - userB

This would allow us to define non-committers as collaborators on the Github
project. Concretely, this means they would receive the "triage" Github role
(defined here
https://docs.github.com/en/organizations/managing-user-access-to-your-organizations-repositories/repository-roles-for-an-organization#permissions-for-each-role).
Practically, this means we could let non-committers do things like assign
labels and reviewers on Pull Requests.

I wanted to see what the committer group thought about this feature. I
think it could be useful.

Cheers,
David



<    5   6   7   8   9   10   11   12   13   14   >