[jira] [Commented] (KAFKA-16925) stream-table join does not immediately forward expired records on restart

2024-06-12 Thread Ayoub Omari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17854483#comment-17854483
 ] 

Ayoub Omari commented on KAFKA-16925:
-

> Yes, but it's tracked on a per-task basis, and thus, it could be ahead of the 
> time the operator tracked

My understanding is that the processor here wants to know the maximum observed 
stream time so far (including the current record), and 
context.currentStreamTimeMs() is set from the timestamp of input records. For 
me that's the information this processor is looking for ?

I am not talking about general case, where I agree that it is better to put the 
logic in the store itself or any other way that is reusable in processors.

For this particular case, the processor is maintaining a buffer, and it wants 
to evict it periodically. As it is the processor which commands the eviction 
and the buffer doesn't have an eviction policy itself (e.g. like window stores) 
I was thinking that it could make sense that the information is maintained 
within the processor rather than the buffer ? 

> stream-table join does not immediately forward expired records on restart
> -
>
> Key: KAFKA-16925
> URL: https://issues.apache.org/jira/browse/KAFKA-16925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Ayoub Omari
>Assignee: Ayoub Omari
>Priority: Major
>
> [KIP-923|https://cwiki.apache.org/confluence/display/KAFKA/KIP-923%3A+Add+A+Grace+Period+to+Stream+Table+Join]
>  introduced grace period for KStreamKTableJoin. This allows to join a stream 
> to a KTable backed by a Versioned state store.
> Upon receiving a record, it is put in a buffer until grace period is elapsed. 
> When the grace period elapses, the record is joined with its most recent 
> match from the versioned state store.
> +Late records+ are +not+ put in the buffer and are immediately joined.
>  
> {code:java}
> If the grace period is non zero, the record will enter a stream buffer and 
> will dequeue when the record timestamp is less than or equal to stream time 
> minus the grace period.  Late records, out of the grace period, will be 
> executed right as they come in. (KIP-923){code}
>  
> However, this is not the case today on rebalance or restart. The reason is 
> that observedStreamTime is taken from the underlying state store which looses 
> this information on rebalance/restart:  
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java#L164]
>  
> If the task restarts and receives an expired record, the buffer considers 
> that this record has the maximum stream time observed so far, and puts it in 
> the buffer instead of immediately joining it.
>  
> {*}Example{*}:
>  * Grace period = 60s
>  * KTable contains (key, rightValue)
>  
> +Normal scenario+
> {code:java}
> streamInput1 (key, value1) <--- time = T : put in buffer
> streamInput2 (key, value2) <--- time = T - 60s : immediately joined // 
> streamTime = T{code}
>  
> +Scenario with rebalance+
> {code:java}
> streamInput1 (key, value1) <--- time = T : put in buffer
> // --- rebalance ---
> streamInput2 (key, value2) <--- time = T - 60s : put in buffer // streamTime 
> = T - 60s{code}
>  
> The processor should use currentStreamTime from Context instead. Which is 
> recovered on restart.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16925) stream-table join does not immediately forward expired records on restart

2024-06-11 Thread Ayoub Omari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853954#comment-17853954
 ] 

Ayoub Omari commented on KAFKA-16925:
-

I think what makes `context.currentStreamTime()` work is that this timestamp is 
stored along with the offset at each commit inside __consumer_offsets. On 
restart, we retrieve the last processed offset and also the timestamp. 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L975]

> stream-table join does not immediately forward expired records on restart
> -
>
> Key: KAFKA-16925
> URL: https://issues.apache.org/jira/browse/KAFKA-16925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Ayoub Omari
>Assignee: Ayoub Omari
>Priority: Major
>
> [KIP-923|https://cwiki.apache.org/confluence/display/KAFKA/KIP-923%3A+Add+A+Grace+Period+to+Stream+Table+Join]
>  introduced grace period for KStreamKTableJoin. This allows to join a stream 
> to a KTable backed by a Versioned state store.
> Upon receiving a record, it is put in a buffer until grace period is elapsed. 
> When the grace period elapses, the record is joined with its most recent 
> match from the versioned state store.
> +Late records+ are +not+ put in the buffer and are immediately joined.
>  
> {code:java}
> If the grace period is non zero, the record will enter a stream buffer and 
> will dequeue when the record timestamp is less than or equal to stream time 
> minus the grace period.  Late records, out of the grace period, will be 
> executed right as they come in. (KIP-923){code}
>  
> However, this is not the case today on rebalance or restart. The reason is 
> that observedStreamTime is taken from the underlying state store which looses 
> this information on rebalance/restart:  
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java#L164]
>  
> If the task restarts and receives an expired record, the buffer considers 
> that this record has the maximum stream time observed so far, and puts it in 
> the buffer instead of immediately joining it.
>  
> {*}Example{*}:
>  * Grace period = 60s
>  * KTable contains (key, rightValue)
>  
> +Normal scenario+
> {code:java}
> streamInput1 (key, value1) <--- time = T : put in buffer
> streamInput2 (key, value2) <--- time = T - 60s : immediately joined // 
> streamTime = T{code}
>  
> +Scenario with rebalance+
> {code:java}
> streamInput1 (key, value1) <--- time = T : put in buffer
> // --- rebalance ---
> streamInput2 (key, value2) <--- time = T - 60s : put in buffer // streamTime 
> = T - 60s{code}
>  
> The processor should use currentStreamTime from Context instead. Which is 
> recovered on restart.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16925) stream-table join does not immediately forward expired records on restart

2024-06-10 Thread Ayoub Omari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ayoub Omari updated KAFKA-16925:

Description: 
[KIP-923|https://cwiki.apache.org/confluence/display/KAFKA/KIP-923%3A+Add+A+Grace+Period+to+Stream+Table+Join]
 introduced grace period for KStreamKTableJoin. This allows to join a stream to 
a KTable backed by a Versioned state store.

Upon receiving a record, it is put in a buffer until grace period is elapsed. 
When the grace period elapses, the record is joined with its most recent match 
from the versioned state store.

+Late records+ are +not+ put in the buffer and are immediately joined.

 
{code:java}
If the grace period is non zero, the record will enter a stream buffer and will 
dequeue when the record timestamp is less than or equal to stream time minus 
the grace period.  Late records, out of the grace period, will be executed 
right as they come in. (KIP-923){code}
 

However, this is not the case today on rebalance or restart. The reason is that 
observedStreamTime is taken from the underlying state store which looses this 
information on rebalance/restart:  
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java#L164]

 

If the task restarts and receives an expired record, the buffer considers that 
this record has the maximum stream time observed so far, and puts it in the 
buffer instead of immediately joining it.

 

{*}Example{*}:
 * Grace period = 60s
 * KTable contains (key, rightValue)

 

+Normal scenario+
{code:java}
streamInput1 (key, value1) <--- time = T : put in buffer

streamInput2 (key, value2) <--- time = T - 60s : immediately joined // 
streamTime = T{code}
 

+Scenario with rebalance+
{code:java}
streamInput1 (key, value1) <--- time = T : put in buffer

// --- rebalance ---

streamInput2 (key, value2) <--- time = T - 60s : put in buffer // streamTime = 
T - 60s{code}
 

The processor should use currentStreamTime from Context instead. Which is 
recovered on restart.

 

  was:
[KIP-923|https://cwiki.apache.org/confluence/display/KAFKA/KIP-923%3A+Add+A+Grace+Period+to+Stream+Table+Join]
 introduced grace period for KStreamKTableJoin. This allows to join a stream to 
a KTable backed by a Versioned state store.

Upon receiving a record, it is put in a buffer until grace period is elapsed. 
When the grace period elapses, the record is joined with its most recent match 
from the versioned state store.

+Late records+ are +not+ put in the buffer and are immediately joined.

 
{code:java}
If the grace period is non zero, the record will enter a stream buffer and will 
dequeue when the record timestamp is less than or equal to stream time minus 
the grace period.  Late records, out of the grace period, will be executed 
right as they come in. (KIP-923){code}
 

However, this is not the case today on rebalance or restart. The reason is that 
observedStreamTime is maintained within a variable which is lost on 
rebalance/restart: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java#L54]

 

If the task restarts and receives an expired record, it considers that it has 
the maximum stream time observed so far, and puts it in the buffer instead of 
immediately joining it.

 

{*}Example{*}:
 * Grace period = 60s
 * KTable contains (key, rightValue)

 

+Normal scenario+
{code:java}
streamInput1 (key, value1) <--- time = T : put in buffer

streamInput2 (key, value2) <--- time = T - 60s : immediately joined // 
streamTime = T{code}
 

+Scenario with rebalance+
{code:java}
streamInput1 (key, value1) <--- time = T : put in buffer

// --- rebalance ---

streamInput2 (key, value2) <--- time = T - 60s : put in buffer // streamTime = 
T - 60s{code}
 

The processor should use currentStreamTime from Context instead. Which is 
recovered on restart.

 


> stream-table join does not immediately forward expired records on restart
> -
>
> Key: KAFKA-16925
> URL: https://issues.apache.org/jira/browse/KAFKA-16925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Ayoub Omari
>Assignee: Ayoub Omari
>Priority: Major
>
> [KIP-923|https://cwiki.apache.org/confluence/display/KAFKA/KIP-923%3A+Add+A+Grace+Period+to+Stream+Table+Join]
>  introduced grace period for KStreamKTableJoin. This allows to join a stream 
> to a KTable backed by a Versioned state store.
> Upon receiving a record, it is put in a buffer until grace period is elapsed. 
> When the grace period elapses, the record is joined with its most recent 
> match from the versioned state store.
> +Late records+ are +not+ put in the buffer and are immediately joined.
>  
> 

[jira] [Updated] (KAFKA-16925) stream-table join does not immediately forward expired records on restart

2024-06-10 Thread Ayoub Omari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ayoub Omari updated KAFKA-16925:

Description: 
[KIP-923|https://cwiki.apache.org/confluence/display/KAFKA/KIP-923%3A+Add+A+Grace+Period+to+Stream+Table+Join]
 introduced grace period for KStreamKTableJoin. This allows to join a stream to 
a KTable backed by a Versioned state store.

Upon receiving a record, it is put in a buffer until grace period is elapsed. 
When the grace period elapses, the record is joined with its most recent match 
from the versioned state store.

+Late records+ are +not+ put in the buffer and are immediately joined.

 
{code:java}
If the grace period is non zero, the record will enter a stream buffer and will 
dequeue when the record timestamp is less than or equal to stream time minus 
the grace period.  Late records, out of the grace period, will be executed 
right as they come in. (KIP-923){code}
 

However, this is not the case today on rebalance or restart. The reason is that 
observedStreamTime is maintained within a variable which is lost on 
rebalance/restart: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java#L54]

 

If the task restarts and receives an expired record, it considers that it has 
the maximum stream time observed so far, and puts it in the buffer instead of 
immediately joining it.

 

{*}Example{*}:
 * Grace period = 60s
 * KTable contains (key, rightValue)

 

+Normal scenario+
{code:java}
streamInput1 (key, value1) <--- time = T : put in buffer

streamInput2 (key, value2) <--- time = T - 60s : immediately joined // 
streamTime = T{code}
 

+Scenario with rebalance+
{code:java}
streamInput1 (key, value1) <--- time = T : put in buffer

// --- rebalance ---

streamInput2 (key, value2) <--- time = T - 60s : put in buffer // streamTime = 
T - 60s{code}
 

The processor should use currentStreamTime from Context instead. Which is 
recovered on restart.

 

  was:
[KIP-923|https://cwiki.apache.org/confluence/display/KAFKA/KIP-923%3A+Add+A+Grace+Period+to+Stream+Table+Join]
 introduced grace period for KStreamKTableJoin. This allows to join a stream to 
a KTable backed by a Versioned state store.

Upon receiving a record, it is put in a buffer until grace period is elapsed. 
When the grace period elapses, the record is joined with its most recent match 
from the versioned state store.

+Late records+ are +not+ put in the buffer and are immediately joined.

 
{code:java}
If the grace period is non zero, the record will enter a stream buffer and will 
dequeue when the record timestamp is less than or equal to stream time minus 
the grace period.  Late records, out of the grace period, will be executed 
right as they come in. (KIP-923){code}
 

However, this is not the case today on rebalance or restart. The reason is that 
observedStreamTime is maintained within a variable which is lost on 
rebalance/restart: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java#L54]

 

If the task restarts and receives an expired record, it considers that it has 
the maximum stream time observed so far, and puts it in the buffer instead of 
immediately joining it.

 

{*}Example{*}:
 * Grace period = 60s
 * KTable contains (key, rightValue)

+Normal scenario+
{code:java}
streamInput1 (key, value1) <--- time = T : put in buffer

streamInput2 (key, value2) <--- time = T - 60s : immediately joined // 
streamTime = T{code}
+Scenario with rebalance+

 
{code:java}
streamInput1 (key, value1) <--- time = T : put in buffer

// --- rebalance ---

streamInput2 (key, value2) <--- time = T - 60s : put in buffer // streamTime = 
T - 60s{code}
 

The processor should use currentStreamTime from Context instead. Which is 
recovered on restart.

 


> stream-table join does not immediately forward expired records on restart
> -
>
> Key: KAFKA-16925
> URL: https://issues.apache.org/jira/browse/KAFKA-16925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Ayoub Omari
>Assignee: Ayoub Omari
>Priority: Major
>
> [KIP-923|https://cwiki.apache.org/confluence/display/KAFKA/KIP-923%3A+Add+A+Grace+Period+to+Stream+Table+Join]
>  introduced grace period for KStreamKTableJoin. This allows to join a stream 
> to a KTable backed by a Versioned state store.
> Upon receiving a record, it is put in a buffer until grace period is elapsed. 
> When the grace period elapses, the record is joined with its most recent 
> match from the versioned state store.
> +Late records+ are +not+ put in the buffer and are immediately joined.
>  
> {code:java}
> If the grace period is non zero, the record 

[jira] [Created] (KAFKA-16925) stream-table join does not immediately forward expired records on restart

2024-06-10 Thread Ayoub Omari (Jira)
Ayoub Omari created KAFKA-16925:
---

 Summary: stream-table join does not immediately forward expired 
records on restart
 Key: KAFKA-16925
 URL: https://issues.apache.org/jira/browse/KAFKA-16925
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Ayoub Omari
Assignee: Ayoub Omari


[KIP-923|https://cwiki.apache.org/confluence/display/KAFKA/KIP-923%3A+Add+A+Grace+Period+to+Stream+Table+Join]
 introduced grace period for KStreamKTableJoin. This allows to join a stream to 
a KTable backed by a Versioned state store.

Upon receiving a record, it is put in a buffer until grace period is elapsed. 
When the grace period elapses, the record is joined with its most recent match 
from the versioned state store.

+Late records+ are +not+ put in the buffer and are immediately joined.

 
{code:java}
If the grace period is non zero, the record will enter a stream buffer and will 
dequeue when the record timestamp is less than or equal to stream time minus 
the grace period.  Late records, out of the grace period, will be executed 
right as they come in. (KIP-923){code}
 

However, this is not the case today on rebalance or restart. The reason is that 
observedStreamTime is maintained within a variable which is lost on 
rebalance/restart: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java#L54]

 

If the task restarts and receives an expired record, it considers that it has 
the maximum stream time observed so far, and puts it in the buffer instead of 
immediately joining it.

 

{*}Example{*}:
 * Grace period = 60s
 * KTable contains (key, rightValue)

+Normal scenario+
{code:java}
streamInput1 (key, value1) <--- time = T : put in buffer

streamInput2 (key, value2) <--- time = T - 60s : immediately joined // 
streamTime = T{code}
+Scenario with rebalance+

 
{code:java}
streamInput1 (key, value1) <--- time = T : put in buffer

// --- rebalance ---

streamInput2 (key, value2) <--- time = T - 60s : put in buffer // streamTime = 
T - 60s{code}
 

The processor should use currentStreamTime from Context instead. Which is 
recovered on restart.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16700) Kafka Streams: possible message loss on KTable-KTable FK Left Join

2024-05-14 Thread Ayoub Omari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846363#comment-17846363
 ] 

Ayoub Omari commented on KAFKA-16700:
-

[~stoeckmk] About KIP-962, it's not about null foreign keys, it's only about 
null keys of the left topic. So null foreign keys are still behaving the same 
way as before the KIP.

 

> Kafka Streams: possible message loss on KTable-KTable FK Left Join
> --
>
> Key: KAFKA-16700
> URL: https://issues.apache.org/jira/browse/KAFKA-16700
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
> Environment: Apache Kafka 3.7.0 cluster in KRaft mode, 3 brokers and 
> 3 controllers, on managed Kubernetes 1.28.7, operated by Strimzi Kafka 
> Operators
>Reporter: Karsten Stöckmann
>Priority: Major
>  Labels: dsl, joins, streams
>
> We are experiencing significant, yet intermittent / non-deterministic / 
> unexplainable message loss on a Kafka Streams topology while performing a 
> *KTable-KTable* {*}FK Left Join{*}.
> Assume the following snippet:
> {code:java}
> streamsBuilder
> .table(
> folderTopicName,
> Consumed.with(
> folderKeySerde,
> folderSerde))
> .leftJoin(
> agencies, // KTable
> Folder::agencyIdValue,
> AggregateFolder::new,
> TableJoined.as("folder-to-agency"),
> Materialized
> .as("folder-to-agency-materialized")
> .withKeySerde(folderKeySerde)
> .withValueSerde(aggregateFolderSerde))
> .leftJoin(
> documents,
> {code}
> The setup is as follows:
> A Debezium Connector for PostgreSQL streams database changes into various 
> Kafka topics. A series of Quarkus Kafka Streams applications then performs 
> aggregation operations on those topics to create index documents later to be 
> sent into an OpenSearch system.
> When firing up the Kafka Streams infrastructure to work on initially 
> populated Kafka Topics (i.e. a snapshot of all relevant table data has been 
> streamed to Kafka), the above shown KTable-KTable FK Left Join seems to 
> produce message loss on the first of a series of FK Left Joins; the right 
> hand {{KTable}} is consumed from an aggregated 
> topic fed from another Kafka Streams topology / application.
> On a (heavily reduced) test data set of 6828 messages in the 
> {{folderTopicName}} Topic, we observe the following results:
>  * {{{}folder-to-agency-subscription-registration{}}}: *6967* messages
>  * {{{}folder-to-agency-subscription-response{}}}: *3048* messages
>  * {{{}folder-to-agency-subscription-store-changelog{}}}: *6359* messages
>  * {{{}folder-to-agency-materialized-changelog{}}}: *4644* messages.
> Telling from the nature of a (FK) Left Join, I'd expect all messages from the 
> left hand topic should produce an aggregate even if no matching message is 
> found in the right hand topic.
> Message loss unpredictably varies across tests and seems not to be bound to 
> specific keys or messages.
> As it seems, this can only be observed when initially firing up the Streams 
> infrastructure to process the message 'backlog' that had been snapshotted by 
> Debezium. A manual snapshot triggered later (i.e. Streams applications 
> already running) seems not to show this behaviour. Additionally, as of yet we 
> observed this kind of message loss only when running multiple replicas of the 
> affected application. When carrying out the tests with only one replica, 
> everything seems to work as expected. We've tried to leverage 
> {{group.initial.rebalance.delay.ms}} in order to rule out possible 
> rebalancing issues, but to no avail.
> Our Kafka configuration:
> {code:yaml}
> offsets.topic.replication.factor: 3
> transaction.state.log.replication.factor: 3
> transaction.state.log.min.isr: 2
> default.replication.factor: 3
> min.insync.replicas: 2
> message.max.bytes: "20971520"
> {code}
> Our Kafka Streams application configuration:
> {code:yaml}
> kafka-streams.num.stream.threads: 5
> kafka-streams.num.standby.replicas: 1
> kafka-streams.auto.offset.reset: earliest
> kafka-streams.cache.max.bytes.buffering: "20971520"
> kafka-streams.commit.interval.ms: 100
> kafka-streams.fetch.max.bytes: "10485760"
> kafka-streams.max.request.size: "10485760"
> kafka-streams.max.partition.fetch.bytes: "10485760"
> kafka-streams.metadata.max.age.ms: 30
> kafka-streams.statestore.cache.max.bytes: "20971520"
> kafka-streams.topology.optimization: all
> kafka-streams.processing.guarantee: exactly_once_v2
> # Kafka Streams Intermediate Topics
> kafka-streams.topic.compression.type: lz4
> 

[jira] [Assigned] (KAFKA-10369) Introduce Distinct operation in KStream

2024-05-13 Thread Ayoub Omari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ayoub Omari reassigned KAFKA-10369:
---

Assignee: Ayoub Omari  (was: Ivan Ponomarev)

> Introduce Distinct operation in KStream
> ---
>
> Key: KAFKA-10369
> URL: https://issues.apache.org/jira/browse/KAFKA-10369
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ivan Ponomarev
>Assignee: Ayoub Omari
>Priority: Major
>  Labels: kip
>
> Message deduplication is a common task.
> One example: we might have multiple data sources each reporting its state 
> periodically with a relatively high frequency, their current states should be 
> stored in a database. In case the actual change of the state occurs with a 
> lower frequency than it is reported, in order to reduce the number of writes 
> to the database we might want to filter out duplicated messages using Kafka 
> Streams.
> 'Distinct' operation is common in data processing, e. g.
>  * Java Stream has [distinct() 
> |https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#distinct--]
>  operation,
>  * SQL has DISTINCT keyword.
>  
> Hence it is natural to expect the similar functionality from Kafka Streams.
> Although Kafka Streams Tutorials contains an 
> [example|https://kafka-tutorials.confluent.io/finding-distinct-events/kstreams.html]
>  of how distinct can be emulated , but this example is complicated: it 
> involves low-level coding with local state store and a custom transformer. It 
> might be much more convenient to have distinct as a first-class DSL operation.
> Due to 'infinite' nature of KStream, distinct operation should be windowed, 
> similar to windowed joins and aggregations for KStreams.
> See 
> [KIP-655|https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16700) Kafka Streams: possible message loss on KTable-KTable FK Left Join

2024-05-13 Thread Ayoub Omari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845956#comment-17845956
 ] 

Ayoub Omari commented on KAFKA-16700:
-

Also, because you are chaining more than one leftJoin, you should check that 
the second leftJoin's foreignKeyExtractor never returns null.

To help investigate more, I would suggest you write the result of the first 
leftJoin in an intermediate topic, and check whether that topic have the 
expected number of records (=number of input records). If so, it means the 
issue comes from the second leftJoin. 

> Kafka Streams: possible message loss on KTable-KTable FK Left Join
> --
>
> Key: KAFKA-16700
> URL: https://issues.apache.org/jira/browse/KAFKA-16700
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
> Environment: Apache Kafka 3.7.0 cluster in KRaft mode, 3 brokers and 
> 3 controllers, on managed Kubernetes 1.28.7, operated by Strimzi Kafka 
> Operators
>Reporter: Karsten Stöckmann
>Priority: Major
>  Labels: dsl, joins, streams
>
> We are experiencing significant, yet intermittent / non-deterministic / 
> unexplainable message loss on a Kafka Streams topology while performing a 
> *KTable-KTable* {*}FK Left Join{*}.
> Assume the following snippet:
> {code:java}
> streamsBuilder
> .table(
> folderTopicName,
> Consumed.with(
> folderKeySerde,
> folderSerde))
> .leftJoin(
> agencies, // KTable
> Folder::agencyIdValue,
> AggregateFolder::new,
> TableJoined.as("folder-to-agency"),
> Materialized
> .as("folder-to-agency-materialized")
> .withKeySerde(folderKeySerde)
> .withValueSerde(aggregateFolderSerde))
> .leftJoin(
> documents,
> {code}
> The setup is as follows:
> A Debezium Connector for PostgreSQL streams database changes into various 
> Kafka topics. A series of Quarkus Kafka Streams applications then performs 
> aggregation operations on those topics to create index documents later to be 
> sent into an OpenSearch system.
> When firing up the Kafka Streams infrastructure to work on initially 
> populated Kafka Topics (i.e. a snapshot of all relevant table data has been 
> streamed to Kafka), the above shown KTable-KTable FK Left Join seems to 
> produce message loss on the first of a series of FK Left Joins; the right 
> hand {{KTable}} is consumed from an aggregated 
> topic fed from another Kafka Streams topology / application.
> On a (heavily reduced) test data set of 6828 messages in the 
> {{folderTopicName}} Topic, we observe the following results:
>  * {{{}folder-to-agency-subscription-registration{}}}: *6967* messages
>  * {{{}folder-to-agency-subscription-response{}}}: *3048* messages
>  * {{{}folder-to-agency-subscription-store-changelog{}}}: *6359* messages
>  * {{{}folder-to-agency-materialized-changelog{}}}: *4644* messages.
> Telling from the nature of a (FK) Left Join, I'd expect all messages from the 
> left hand topic should produce an aggregate even if no matching message is 
> found in the right hand topic.
> Message loss unpredictably varies across tests and seems not to be bound to 
> specific keys or messages.
> As it seems, this can only be observed when initially firing up the Streams 
> infrastructure to process the message 'backlog' that had been snapshotted by 
> Debezium. A manual snapshot triggered later (i.e. Streams applications 
> already running) seems not to show this behaviour. Additionally, as of yet we 
> observed this kind of message loss only when running multiple replicas of the 
> affected application. When carrying out the tests with only one replica, 
> everything seems to work as expected. We've tried to leverage 
> {{group.initial.rebalance.delay.ms}} in order to rule out possible 
> rebalancing issues, but to no avail.
> Our Kafka configuration:
> {code:yaml}
> offsets.topic.replication.factor: 3
> transaction.state.log.replication.factor: 3
> transaction.state.log.min.isr: 2
> default.replication.factor: 3
> min.insync.replicas: 2
> message.max.bytes: "20971520"
> {code}
> Our Kafka Streams application configuration:
> {code:yaml}
> kafka-streams.num.stream.threads: 5
> kafka-streams.num.standby.replicas: 1
> kafka-streams.auto.offset.reset: earliest
> kafka-streams.cache.max.bytes.buffering: "20971520"
> kafka-streams.commit.interval.ms: 100
> kafka-streams.fetch.max.bytes: "10485760"
> kafka-streams.max.request.size: "10485760"
> kafka-streams.max.partition.fetch.bytes: "10485760"
> kafka-streams.metadata.max.age.ms: 30
> kafka-streams.statestore.cache.max.bytes: 

[jira] [Commented] (KAFKA-16700) Kafka Streams: possible message loss on KTable-KTable FK Left Join

2024-05-13 Thread Ayoub Omari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845950#comment-17845950
 ] 

Ayoub Omari commented on KAFKA-16700:
-

[~stoeckmk] Did you check if the foreignKey column is never null in the source 
table (i.e. folderTopic) ?

For me, that's the only way  the internal topic 
_folder-to-agency-subscription-response (3048) **_ would have smaller number of 
records than {_}folderTopic{_}.

Also, assuming all records in _folderTopic_ have a different key, that's the 
only way the topic _folder-to-agency-subscription-registration (6967)_ would 
have higher number of records than _folderTopic (6828)_

> Kafka Streams: possible message loss on KTable-KTable FK Left Join
> --
>
> Key: KAFKA-16700
> URL: https://issues.apache.org/jira/browse/KAFKA-16700
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
> Environment: Apache Kafka 3.7.0 cluster in KRaft mode, 3 brokers and 
> 3 controllers, on managed Kubernetes 1.28.7, operated by Strimzi Kafka 
> Operators
>Reporter: Karsten Stöckmann
>Priority: Major
>  Labels: dsl, joins, streams
>
> We are experiencing significant, yet intermittent / non-deterministic / 
> unexplainable message loss on a Kafka Streams topology while performing a 
> *KTable-KTable* {*}FK Left Join{*}.
> Assume the following snippet:
> {code:java}
> streamsBuilder
> .table(
> folderTopicName,
> Consumed.with(
> folderKeySerde,
> folderSerde))
> .leftJoin(
> agencies, // KTable
> Folder::agencyIdValue,
> AggregateFolder::new,
> TableJoined.as("folder-to-agency"),
> Materialized
> .as("folder-to-agency-materialized")
> .withKeySerde(folderKeySerde)
> .withValueSerde(aggregateFolderSerde))
> .leftJoin(
> documents,
> {code}
> The setup is as follows:
> A Debezium Connector for PostgreSQL streams database changes into various 
> Kafka topics. A series of Quarkus Kafka Streams applications then performs 
> aggregation operations on those topics to create index documents later to be 
> sent into an OpenSearch system.
> When firing up the Kafka Streams infrastructure to work on initially 
> populated Kafka Topics (i.e. a snapshot of all relevant table data has been 
> streamed to Kafka), the above shown KTable-KTable FK Left Join seems to 
> produce message loss on the first of a series of FK Left Joins; the right 
> hand {{KTable}} is consumed from an aggregated 
> topic fed from another Kafka Streams topology / application.
> On a (heavily reduced) test data set of 6828 messages in the 
> {{folderTopicName}} Topic, we observe the following results:
>  * {{{}folder-to-agency-subscription-registration{}}}: *6967* messages
>  * {{{}folder-to-agency-subscription-response{}}}: *3048* messages
>  * {{{}folder-to-agency-subscription-store-changelog{}}}: *6359* messages
>  * {{{}folder-to-agency-materialized-changelog{}}}: *4644* messages.
> Telling from the nature of a (FK) Left Join, I'd expect all messages from the 
> left hand topic should produce an aggregate even if no matching message is 
> found in the right hand topic.
> Message loss unpredictably varies across tests and seems not to be bound to 
> specific keys or messages.
> As it seems, this can only be observed when initially firing up the Streams 
> infrastructure to process the message 'backlog' that had been snapshotted by 
> Debezium. A manual snapshot triggered later (i.e. Streams applications 
> already running) seems not to show this behaviour. Additionally, as of yet we 
> observed this kind of message loss only when running multiple replicas of the 
> affected application. When carrying out the tests with only one replica, 
> everything seems to work as expected. We've tried to leverage 
> {{group.initial.rebalance.delay.ms}} in order to rule out possible 
> rebalancing issues, but to no avail.
> Our Kafka configuration:
> {code:yaml}
> offsets.topic.replication.factor: 3
> transaction.state.log.replication.factor: 3
> transaction.state.log.min.isr: 2
> default.replication.factor: 3
> min.insync.replicas: 2
> message.max.bytes: "20971520"
> {code}
> Our Kafka Streams application configuration:
> {code:yaml}
> kafka-streams.num.stream.threads: 5
> kafka-streams.num.standby.replicas: 1
> kafka-streams.auto.offset.reset: earliest
> kafka-streams.cache.max.bytes.buffering: "20971520"
> kafka-streams.commit.interval.ms: 100
> kafka-streams.fetch.max.bytes: "10485760"
> kafka-streams.max.request.size: "10485760"
> kafka-streams.max.partition.fetch.bytes: "10485760"
> 

[jira] [Comment Edited] (KAFKA-16700) Kafka Streams: possible message loss on KTable-KTable FK Left Join

2024-05-13 Thread Ayoub Omari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845950#comment-17845950
 ] 

Ayoub Omari edited comment on KAFKA-16700 at 5/13/24 2:06 PM:
--

[~stoeckmk] Did you check if the foreignKey column is never null in the source 
table (i.e. folderTopic) ?

For me, that's the only way  the internal topic 
_folder-to-agency-subscription-response (3048)_ would have smaller number of 
records than {_}folderTopic{_}.

Also, assuming all records in _folderTopic_ have a different key, that's the 
only way the topic _folder-to-agency-subscription-registration (6967)_ would 
have higher number of records than _folderTopic (6828)_


was (Author: JIRAUSER302607):
[~stoeckmk] Did you check if the foreignKey column is never null in the source 
table (i.e. folderTopic) ?

For me, that's the only way  the internal topic 
_folder-to-agency-subscription-response (3048) **_ would have smaller number of 
records than {_}folderTopic{_}.

Also, assuming all records in _folderTopic_ have a different key, that's the 
only way the topic _folder-to-agency-subscription-registration (6967)_ would 
have higher number of records than _folderTopic (6828)_

> Kafka Streams: possible message loss on KTable-KTable FK Left Join
> --
>
> Key: KAFKA-16700
> URL: https://issues.apache.org/jira/browse/KAFKA-16700
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
> Environment: Apache Kafka 3.7.0 cluster in KRaft mode, 3 brokers and 
> 3 controllers, on managed Kubernetes 1.28.7, operated by Strimzi Kafka 
> Operators
>Reporter: Karsten Stöckmann
>Priority: Major
>  Labels: dsl, joins, streams
>
> We are experiencing significant, yet intermittent / non-deterministic / 
> unexplainable message loss on a Kafka Streams topology while performing a 
> *KTable-KTable* {*}FK Left Join{*}.
> Assume the following snippet:
> {code:java}
> streamsBuilder
> .table(
> folderTopicName,
> Consumed.with(
> folderKeySerde,
> folderSerde))
> .leftJoin(
> agencies, // KTable
> Folder::agencyIdValue,
> AggregateFolder::new,
> TableJoined.as("folder-to-agency"),
> Materialized
> .as("folder-to-agency-materialized")
> .withKeySerde(folderKeySerde)
> .withValueSerde(aggregateFolderSerde))
> .leftJoin(
> documents,
> {code}
> The setup is as follows:
> A Debezium Connector for PostgreSQL streams database changes into various 
> Kafka topics. A series of Quarkus Kafka Streams applications then performs 
> aggregation operations on those topics to create index documents later to be 
> sent into an OpenSearch system.
> When firing up the Kafka Streams infrastructure to work on initially 
> populated Kafka Topics (i.e. a snapshot of all relevant table data has been 
> streamed to Kafka), the above shown KTable-KTable FK Left Join seems to 
> produce message loss on the first of a series of FK Left Joins; the right 
> hand {{KTable}} is consumed from an aggregated 
> topic fed from another Kafka Streams topology / application.
> On a (heavily reduced) test data set of 6828 messages in the 
> {{folderTopicName}} Topic, we observe the following results:
>  * {{{}folder-to-agency-subscription-registration{}}}: *6967* messages
>  * {{{}folder-to-agency-subscription-response{}}}: *3048* messages
>  * {{{}folder-to-agency-subscription-store-changelog{}}}: *6359* messages
>  * {{{}folder-to-agency-materialized-changelog{}}}: *4644* messages.
> Telling from the nature of a (FK) Left Join, I'd expect all messages from the 
> left hand topic should produce an aggregate even if no matching message is 
> found in the right hand topic.
> Message loss unpredictably varies across tests and seems not to be bound to 
> specific keys or messages.
> As it seems, this can only be observed when initially firing up the Streams 
> infrastructure to process the message 'backlog' that had been snapshotted by 
> Debezium. A manual snapshot triggered later (i.e. Streams applications 
> already running) seems not to show this behaviour. Additionally, as of yet we 
> observed this kind of message loss only when running multiple replicas of the 
> affected application. When carrying out the tests with only one replica, 
> everything seems to work as expected. We've tried to leverage 
> {{group.initial.rebalance.delay.ms}} in order to rule out possible 
> rebalancing issues, but to no avail.
> Our Kafka configuration:
> {code:yaml}
> offsets.topic.replication.factor: 3
> transaction.state.log.replication.factor: 3
> transaction.state.log.min.isr: 2
> 

[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache

2024-05-09 Thread Ayoub Omari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845142#comment-17845142
 ] 

Ayoub Omari commented on KAFKA-4212:


My company was looking for such feature earlier this year. For work-around we 
use a processor+punctuator similar to what [~savulchik] suggests. The problem 
is that we end up doing this for every KS application in our system.

Potential suggestion: In _Topology_ class we can both _addStateStore_ and 
{_}addProcessor{_}, why not overloading _addStateStore_ with additional TTL 
parameter and defining the processor+punctuator within it (hence the user 
wouldn't have to do it himself). If we are unsure how frequently we should 
punctuate, we can give control to the user (TTL would be a class with 
additional period field)
{code:java}
TTL(value: Duration, punctuateInterval: Duration){code}

> Add a key-value store that is a TTL persistent cache
> 
>
> Key: KAFKA-4212
> URL: https://issues.apache.org/jira/browse/KAFKA-4212
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Priority: Major
>  Labels: api
>
> Some jobs needs to maintain as state a large set of key-values for some 
> period of time.  I.e. they need to maintain a TTL cache of values potentially 
> larger than memory. 
> Currently Kafka Streams provides non-windowed and windowed key-value stores.  
> Neither is an exact fit to this use case.  
> The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as 
> required, but does not support expiration.  The TTL option of RocksDB is 
> explicitly not used.
> The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment 
> dropping, but it stores multiple items per key, based on their timestamp.  
> But this store can be repurposed as a cache by fetching the items in reverse 
> chronological order and returning the first item found.
> KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here 
> we desire a variable-capacity memory-overflowing TTL caching store.
> Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be 
> useful to have an official and proper TTL cache API and implementation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete

2024-04-30 Thread Ayoub Omari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842285#comment-17842285
 ] 

Ayoub Omari commented on KAFKA-16644:
-

Hi [~mjsax], I think this is the same issue reported in KAFKA-16394

> FK join emits duplicate tombstone on left-side delete
> -
>
> Key: KAFKA-16644
> URL: https://issues.apache.org/jira/browse/KAFKA-16644
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Matthias J. Sax
>Priority: Major
>
> We introduced a regression bug in 3.7.0 release via KAFKA-14778. When a 
> left-hand side record is deleted, the join now emits two tombstone records 
> instead of one.
> The problem was not detected via unit test, because the tests use a `Map` 
> instead of a `List` when verifying the result topic records 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]
> We should update all test cases to use `List` when reading from the output 
> topic, and of course fix the introduced bug: The 
> `SubscriptionSendProcessorSupplier` is sending two subscription records 
> instead of just a single one: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16573) Streams does not specify where a Serde is needed

2024-04-20 Thread Ayoub Omari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839274#comment-17839274
 ] 

Ayoub Omari commented on KAFKA-16573:
-

Thanks [~mjsax], I am going to work on it in this case.

> Streams does not specify where a Serde is needed
> 
>
> Key: KAFKA-16573
> URL: https://issues.apache.org/jira/browse/KAFKA-16573
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Ayoub Omari
>Assignee: Ayoub Omari
>Priority: Minor
>
> Example topology:
> {code:java}
>  builder
>.table("input", Consumed.`with`(Serdes.String(), Serdes.String()))
>.groupBy((key, value) => new KeyValue(value, key))
>.count()
>.toStream()
>.to("output", Produced.`with`(Serdes.String(), Serdes.Long()))
>  {code}
> At runtime, we get the following exception 
> {code:java}
> Please specify a key serde or set one through 
> StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
> org.apache.kafka.common.config.ConfigException: Please specify a key serde or 
> set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
>     at 
> org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1857)
>     at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:92)
>     at 
> org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47)
>     at 
> org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63)
>     at 
> org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde(WrappingNullableUtils.java:90)
>     at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.initStoreSerde(MeteredKeyValueStore.java:188)
>     at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:143)
>     at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
>     at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258){code}
> The error does not give information about the line or the processor causing 
> the issue.
> Here a Grouped was missing inside the groupBy, but because the groupBy api 
> doesn't force to define Grouped, this one can be missed, and it could be 
> difficult to spot on a more complex topology. 
> Also, for someone who needs control over serdes in the topology and doesn't 
> want to define default serdes.
>  
>   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16573) Streams does not specify where a Serde is needed

2024-04-20 Thread Ayoub Omari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ayoub Omari reassigned KAFKA-16573:
---

Assignee: Ayoub Omari

> Streams does not specify where a Serde is needed
> 
>
> Key: KAFKA-16573
> URL: https://issues.apache.org/jira/browse/KAFKA-16573
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Ayoub Omari
>Assignee: Ayoub Omari
>Priority: Minor
>
> Example topology:
> {code:java}
>  builder
>.table("input", Consumed.`with`(Serdes.String(), Serdes.String()))
>.groupBy((key, value) => new KeyValue(value, key))
>.count()
>.toStream()
>.to("output", Produced.`with`(Serdes.String(), Serdes.Long()))
>  {code}
> At runtime, we get the following exception 
> {code:java}
> Please specify a key serde or set one through 
> StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
> org.apache.kafka.common.config.ConfigException: Please specify a key serde or 
> set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
>     at 
> org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1857)
>     at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:92)
>     at 
> org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47)
>     at 
> org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63)
>     at 
> org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde(WrappingNullableUtils.java:90)
>     at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.initStoreSerde(MeteredKeyValueStore.java:188)
>     at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:143)
>     at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
>     at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258){code}
> The error does not give information about the line or the processor causing 
> the issue.
> Here a Grouped was missing inside the groupBy, but because the groupBy api 
> doesn't force to define Grouped, this one can be missed, and it could be 
> difficult to spot on a more complex topology. 
> Also, for someone who needs control over serdes in the topology and doesn't 
> want to define default serdes.
>  
>   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16573) Streams does not specify where a Serde is needed

2024-04-18 Thread Ayoub Omari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838672#comment-17838672
 ] 

Ayoub Omari commented on KAFKA-16573:
-

[~ableegoldman] [~mjsax] I looked a bit into this, and found that this error 
may appear only in +three+ cases:
 * Source node serde absent
 * Sink node serde absent
 * Store serde absent

For the first two cases it is easy to improve the error by showing the type of 
node (source or sink) and its name.

For the third case, which is the case in the description, kafka streams doesn't 
know the processor node behind the error when it is checking serdes because 
this check happens during stores initialization. At that moment, it only knows 
the name of the store.

But I think showing the name of the store may help (even for internal stores) ? 
We could say something like "The serdes of the store 
KTABLE-AGGREGATE-STATE-STORE-04 are not specified". WDYT ?

> Streams does not specify where a Serde is needed
> 
>
> Key: KAFKA-16573
> URL: https://issues.apache.org/jira/browse/KAFKA-16573
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Ayoub Omari
>Priority: Major
>
> Example topology:
> {code:java}
>  builder
>.table("input", Consumed.`with`(Serdes.String(), Serdes.String()))
>.groupBy((key, value) => new KeyValue(value, key))
>.count()
>.toStream()
>.to("output", Produced.`with`(Serdes.String(), Serdes.Long()))
>  {code}
> At runtime, we get the following exception 
> {code:java}
> Please specify a key serde or set one through 
> StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
> org.apache.kafka.common.config.ConfigException: Please specify a key serde or 
> set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
>     at 
> org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1857)
>     at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:92)
>     at 
> org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47)
>     at 
> org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63)
>     at 
> org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde(WrappingNullableUtils.java:90)
>     at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.initStoreSerde(MeteredKeyValueStore.java:188)
>     at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:143)
>     at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
>     at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258){code}
> The error does not give information about the line or the processor causing 
> the issue.
> Here a Grouped was missing inside the groupBy, but because the groupBy api 
> doesn't force to define Grouped, this one can be missed, and it could be 
> difficult to spot on a more complex topology. 
> Also, for someone who needs control over serdes in the topology and doesn't 
> want to define default serdes.
>  
>   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16573) Streams does not specify where a Serde is needed

2024-04-17 Thread Ayoub Omari (Jira)
Ayoub Omari created KAFKA-16573:
---

 Summary: Streams does not specify where a Serde is needed
 Key: KAFKA-16573
 URL: https://issues.apache.org/jira/browse/KAFKA-16573
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 3.7.0
Reporter: Ayoub Omari


Example topology:
{code:java}
 builder
   .table("input", Consumed.`with`(Serdes.String(), Serdes.String()))
   .groupBy((key, value) => new KeyValue(value, key))
   .count()
   .toStream()
   .to("output", Produced.`with`(Serdes.String(), Serdes.Long()))
 {code}
At runtime, we get the following exception 
{code:java}
Please specify a key serde or set one through 
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
org.apache.kafka.common.config.ConfigException: Please specify a key serde or 
set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
    at 
org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1857)
    at 
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:92)
    at 
org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47)
    at 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63)
    at 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde(WrappingNullableUtils.java:90)
    at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.initStoreSerde(MeteredKeyValueStore.java:188)
    at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:143)
    at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
    at 
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258){code}
The error does not give information about the line or the processor causing the 
issue.

Here a Grouped was missing inside the groupBy, but because the groupBy api 
doesn't force to define Grouped, this one can be missed, and it could be 
difficult to spot on a more complex topology. 

Also, for someone who needs control over serdes in the topology and doesn't 
want to define default serdes.

 

  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16458) Add contains method in KeyValue store interface

2024-04-02 Thread Ayoub Omari (Jira)
Ayoub Omari created KAFKA-16458:
---

 Summary: Add contains method in KeyValue store interface
 Key: KAFKA-16458
 URL: https://issues.apache.org/jira/browse/KAFKA-16458
 Project: Kafka
  Issue Type: Wish
  Components: streams
Reporter: Ayoub Omari


In some stream processors, we sometimes just want to check if a key exists in 
the state store or not.

 

I find calling .get() and checking if the return value is null a little bit 
verbose
{code:java}
if (store.get(key) != null) {

}{code}
 

But I am not sure if it is on purpose that we would like to keep the store 
interface simple.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16434) ForeignKey INNER join does not unset join result when FK becomes null

2024-03-29 Thread Ayoub Omari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ayoub Omari updated KAFKA-16434:

Description: 
We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, 
String]_

where _LeftRecord_ :
{code:scala}
 case class LeftRecord(foreignKey: String, name: String){code}
we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
resulting join value is the value in right-topic. (same topology example as in 
KAFKA-16407)

 

*Scenario: Unset foreign key of a primary key*
{code:scala}
rightTopic.pipeInput("fk1", "1")

leftTopic.pipeInput("pk1", ProductValue("fk1", "pk1"))
leftTopic.pipeInput("pk1", ProductValue(null, "pk1"))
{code}
 

*+Actual result+*
{code:java}
KeyValue(pk1, 3) {code}
 

*+Expected result+*
{code:java}
KeyValue(pk1, 3)
KeyValue(pk1, null) // This unsets the join between pk1 and fk1{code}
 

However, in {+}other cases{+}, where the join result should be unset (e.g. the 
primary key is deleted, or the foreign key changes to a non existing FK), that 
record is {+}correctly emitted{+}.

 

Also, the importance of unsetting the join result is mentioned in the code: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L161C21-L163C36]
{code:java}
//[...] Additionally, propagate null if no FK is found there,
// since we must "unset" any output set by the previous FK-join. This is true 
for both INNER and LEFT join. {code}
 

 

  was:
We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, 
String]_

where _LeftRecord_ :
{code:scala}
 case class LeftRecord(foreignKey: String, name: String){code}
we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
resulting join value is the value in right-topic. (same topology example as in 
KAFKA-16407)

 

*Scenario: Unset foreign key of a primary key*
{code:scala}
rightTopic.pipeInput("fk1", "1")

leftTopic.pipeInput("pk1", ProductValue("fk1", "pk1")) 
leftTopic.pipeInput("pk1", ProductValue(null, "pk1"))
{code}
 

*+Actual result+*
{code:java}
KeyValue(pk1, 3) {code}
 

*+Expected result+*
{code:java}
KeyValue(pk1, 3)
KeyValue(pk1, null) // This unsets the join between pk1 and fk1{code}
 

However, in {+}other cases{+}, where the join result should be unset (e.g. the 
primary key is deleted, or the foreign key changes to a non existing FK), that 
record is {+}correctly emitted{+}.

 

Also, the importance of unsetting the join result is mentioned in the code: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L161C21-L163C36]
{code:java}
//[...] Additionally, propagate null if no FK is found there,
// since we must "unset" any output set by the previous FK-join. This is true 
for both INNER and LEFT join. {code}
 

 


> ForeignKey INNER join does not unset join result when FK becomes null
> -
>
> Key: KAFKA-16434
> URL: https://issues.apache.org/jira/browse/KAFKA-16434
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.2, 3.7.0
>Reporter: Ayoub Omari
>Assignee: Ayoub Omari
>Priority: Major
>
> We have two topics : _left-topic[String, LeftRecord]_ and 
> _right-topic[String, String]_
> where _LeftRecord_ :
> {code:scala}
>  case class LeftRecord(foreignKey: String, name: String){code}
> we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
> resulting join value is the value in right-topic. (same topology example as 
> in KAFKA-16407)
>  
> *Scenario: Unset foreign key of a primary key*
> {code:scala}
> rightTopic.pipeInput("fk1", "1")
> leftTopic.pipeInput("pk1", ProductValue("fk1", "pk1"))
> leftTopic.pipeInput("pk1", ProductValue(null, "pk1"))
> {code}
>  
> *+Actual result+*
> {code:java}
> KeyValue(pk1, 3) {code}
>  
> *+Expected result+*
> {code:java}
> KeyValue(pk1, 3)
> KeyValue(pk1, null) // This unsets the join between pk1 and fk1{code}
>  
> However, in {+}other cases{+}, where the join result should be unset (e.g. 
> the primary key is deleted, or the foreign key changes to a non existing FK), 
> that record is {+}correctly emitted{+}.
>  
> Also, the importance of unsetting the join result is mentioned in the code: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L161C21-L163C36]
> {code:java}
> //[...] Additionally, propagate null if no FK is found there,
> // since we must "unset" any output set by the previous FK-join. This is true 
> for both INNER and LEFT join. {code}
>  
>  



--
This 

[jira] [Commented] (KAFKA-16434) ForeignKey INNER join does not unset join result when FK becomes null

2024-03-28 Thread Ayoub Omari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831702#comment-17831702
 ] 

Ayoub Omari commented on KAFKA-16434:
-

I submitted a bug fix for this in the same PR as KAFKA-16407: 
https://github.com/apache/kafka/pull/15615

> ForeignKey INNER join does not unset join result when FK becomes null
> -
>
> Key: KAFKA-16434
> URL: https://issues.apache.org/jira/browse/KAFKA-16434
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.2, 3.7.0
>Reporter: Ayoub Omari
>Assignee: Ayoub Omari
>Priority: Major
>
> We have two topics : _left-topic[String, LeftRecord]_ and 
> _right-topic[String, String]_
> where _LeftRecord_ :
> {code:scala}
>  case class LeftRecord(foreignKey: String, name: String){code}
> we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
> resulting join value is the value in right-topic. (same topology example as 
> in KAFKA-16407)
>  
> *Scenario: Unset foreign key of a primary key*
> {code:scala}
> rightTopic.pipeInput("fk1", "1")
> leftTopic.pipeInput("pk1", ProductValue("fk1", "pk1")) 
> leftTopic.pipeInput("pk1", ProductValue(null, "pk1"))
> {code}
>  
> *+Actual result+*
> {code:java}
> KeyValue(pk1, 3) {code}
>  
> *+Expected result+*
> {code:java}
> KeyValue(pk1, 3)
> KeyValue(pk1, null) // This unsets the join between pk1 and fk1{code}
>  
> However, in {+}other cases{+}, where the join result should be unset (e.g. 
> the primary key is deleted, or the foreign key changes to a non existing FK), 
> that record is {+}correctly emitted{+}.
>  
> Also, the importance of unsetting the join result is mentioned in the code: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L161C21-L163C36]
> {code:java}
> //[...] Additionally, propagate null if no FK is found there,
> // since we must "unset" any output set by the previous FK-join. This is true 
> for both INNER and LEFT join. {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16434) ForeignKey INNER join does not unset join result when FK becomes null

2024-03-28 Thread Ayoub Omari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ayoub Omari reassigned KAFKA-16434:
---

Assignee: Ayoub Omari

> ForeignKey INNER join does not unset join result when FK becomes null
> -
>
> Key: KAFKA-16434
> URL: https://issues.apache.org/jira/browse/KAFKA-16434
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.2, 3.7.0
>Reporter: Ayoub Omari
>Assignee: Ayoub Omari
>Priority: Major
>
> We have two topics : _left-topic[String, LeftRecord]_ and 
> _right-topic[String, String]_
> where _LeftRecord_ :
> {code:scala}
>  case class LeftRecord(foreignKey: String, name: String){code}
> we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
> resulting join value is the value in right-topic. (same topology example as 
> in KAFKA-16407)
>  
> *Scenario: Unset foreign key of a primary key*
> {code:scala}
> rightTopic.pipeInput("fk1", "1")
> leftTopic.pipeInput("pk1", ProductValue("fk1", "pk1")) 
> leftTopic.pipeInput("pk1", ProductValue(null, "pk1"))
> {code}
>  
> *+Actual result+*
> {code:java}
> KeyValue(pk1, 3) {code}
>  
> *+Expected result+*
> {code:java}
> KeyValue(pk1, 3)
> KeyValue(pk1, null) // This unsets the join between pk1 and fk1{code}
>  
> However, in {+}other cases{+}, where the join result should be unset (e.g. 
> the primary key is deleted, or the foreign key changes to a non existing FK), 
> that record is {+}correctly emitted{+}.
>  
> Also, the importance of unsetting the join result is mentioned in the code: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L161C21-L163C36]
> {code:java}
> //[...] Additionally, propagate null if no FK is found there,
> // since we must "unset" any output set by the previous FK-join. This is true 
> for both INNER and LEFT join. {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16434) ForeignKey INNER join does not unset join result when FK becomes null

2024-03-28 Thread Ayoub Omari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ayoub Omari updated KAFKA-16434:

External issue URL:   (was: https://github.com/apache/kafka/pull/15615)

> ForeignKey INNER join does not unset join result when FK becomes null
> -
>
> Key: KAFKA-16434
> URL: https://issues.apache.org/jira/browse/KAFKA-16434
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.2, 3.7.0
>Reporter: Ayoub Omari
>Assignee: Ayoub Omari
>Priority: Major
>
> We have two topics : _left-topic[String, LeftRecord]_ and 
> _right-topic[String, String]_
> where _LeftRecord_ :
> {code:scala}
>  case class LeftRecord(foreignKey: String, name: String){code}
> we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
> resulting join value is the value in right-topic. (same topology example as 
> in KAFKA-16407)
>  
> *Scenario: Unset foreign key of a primary key*
> {code:scala}
> rightTopic.pipeInput("fk1", "1")
> leftTopic.pipeInput("pk1", ProductValue("fk1", "pk1")) 
> leftTopic.pipeInput("pk1", ProductValue(null, "pk1"))
> {code}
>  
> *+Actual result+*
> {code:java}
> KeyValue(pk1, 3) {code}
>  
> *+Expected result+*
> {code:java}
> KeyValue(pk1, 3)
> KeyValue(pk1, null) // This unsets the join between pk1 and fk1{code}
>  
> However, in {+}other cases{+}, where the join result should be unset (e.g. 
> the primary key is deleted, or the foreign key changes to a non existing FK), 
> that record is {+}correctly emitted{+}.
>  
> Also, the importance of unsetting the join result is mentioned in the code: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L161C21-L163C36]
> {code:java}
> //[...] Additionally, propagate null if no FK is found there,
> // since we must "unset" any output set by the previous FK-join. This is true 
> for both INNER and LEFT join. {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16434) ForeignKey INNER join does not unset join result when FK becomes null

2024-03-28 Thread Ayoub Omari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ayoub Omari updated KAFKA-16434:

External issue URL: https://github.com/apache/kafka/pull/15615

> ForeignKey INNER join does not unset join result when FK becomes null
> -
>
> Key: KAFKA-16434
> URL: https://issues.apache.org/jira/browse/KAFKA-16434
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.2, 3.7.0
>Reporter: Ayoub Omari
>Assignee: Ayoub Omari
>Priority: Major
>
> We have two topics : _left-topic[String, LeftRecord]_ and 
> _right-topic[String, String]_
> where _LeftRecord_ :
> {code:scala}
>  case class LeftRecord(foreignKey: String, name: String){code}
> we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
> resulting join value is the value in right-topic. (same topology example as 
> in KAFKA-16407)
>  
> *Scenario: Unset foreign key of a primary key*
> {code:scala}
> rightTopic.pipeInput("fk1", "1")
> leftTopic.pipeInput("pk1", ProductValue("fk1", "pk1")) 
> leftTopic.pipeInput("pk1", ProductValue(null, "pk1"))
> {code}
>  
> *+Actual result+*
> {code:java}
> KeyValue(pk1, 3) {code}
>  
> *+Expected result+*
> {code:java}
> KeyValue(pk1, 3)
> KeyValue(pk1, null) // This unsets the join between pk1 and fk1{code}
>  
> However, in {+}other cases{+}, where the join result should be unset (e.g. 
> the primary key is deleted, or the foreign key changes to a non existing FK), 
> that record is {+}correctly emitted{+}.
>  
> Also, the importance of unsetting the join result is mentioned in the code: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L161C21-L163C36]
> {code:java}
> //[...] Additionally, propagate null if no FK is found there,
> // since we must "unset" any output set by the previous FK-join. This is true 
> for both INNER and LEFT join. {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16407) ForeignKey INNER join ignores FK change when its previous value is null

2024-03-28 Thread Ayoub Omari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ayoub Omari updated KAFKA-16407:

Affects Version/s: 3.3.2
   3.2.3
   3.1.2
   3.0.2
   2.8.2

> ForeignKey INNER join ignores FK change when its previous value is null
> ---
>
> Key: KAFKA-16407
> URL: https://issues.apache.org/jira/browse/KAFKA-16407
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.2, 3.0.2, 3.1.2, 3.2.3, 3.3.2, 3.4.1, 3.5.2, 3.7.0, 
> 3.6.1
>Reporter: Ayoub Omari
>Assignee: Ayoub Omari
>Priority: Major
> Attachments: InnerFKJoinTest.scala, JsonSerde.scala
>
>
> We have two topics : _left-topic[String, LeftRecord]_ and 
> _right-topic[String, String]_
> where _LeftRecord_ :
> {code:scala}
>  case class LeftRecord(foreignKey: String, name: String){code}
> we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
> resulting join value is the value in right-topic.
>  
> *Scenario: Primary key pk1 gets mapped to a new FK after having a null FK*
> {code:scala}
> rightTopic.pipeInput("fk", "1")
> leftTopic.pipeInput("pk1", LeftRecord(null, "pk1"))
> leftTopic.pipeInput("pk1", LeftRecord("fk", "pk1")) {code}
>  
> *+Expected result+*
> {code:scala}
> KeyValue(pk1, 1){code}
>  
> *+Actual result+*
> {code:scala}
> # No output !
> # Logs:
> 20:14:29,723 WARN  
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier
>   - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] 
> topic=[left-topic] partition=[0] offset=[0]
> 20:14:29,728 WARN  
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier
>   - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] 
> topic=[left-topic] partition=[0] offset=[1]
> {code}
>  
> After looking into the code, I believe this is the line behind the issue : 
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L147



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16407) ForeignKey INNER join ignores FK change when its previous value is null

2024-03-28 Thread Ayoub Omari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ayoub Omari updated KAFKA-16407:

Affects Version/s: 3.6.1
   3.5.2
   3.4.1

> ForeignKey INNER join ignores FK change when its previous value is null
> ---
>
> Key: KAFKA-16407
> URL: https://issues.apache.org/jira/browse/KAFKA-16407
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.1, 3.5.2, 3.7.0, 3.6.1
>Reporter: Ayoub Omari
>Assignee: Ayoub Omari
>Priority: Major
> Attachments: InnerFKJoinTest.scala, JsonSerde.scala
>
>
> We have two topics : _left-topic[String, LeftRecord]_ and 
> _right-topic[String, String]_
> where _LeftRecord_ :
> {code:scala}
>  case class LeftRecord(foreignKey: String, name: String){code}
> we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
> resulting join value is the value in right-topic.
>  
> *Scenario: Primary key pk1 gets mapped to a new FK after having a null FK*
> {code:scala}
> rightTopic.pipeInput("fk", "1")
> leftTopic.pipeInput("pk1", LeftRecord(null, "pk1"))
> leftTopic.pipeInput("pk1", LeftRecord("fk", "pk1")) {code}
>  
> *+Expected result+*
> {code:scala}
> KeyValue(pk1, 1){code}
>  
> *+Actual result+*
> {code:scala}
> # No output !
> # Logs:
> 20:14:29,723 WARN  
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier
>   - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] 
> topic=[left-topic] partition=[0] offset=[0]
> 20:14:29,728 WARN  
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier
>   - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] 
> topic=[left-topic] partition=[0] offset=[1]
> {code}
>  
> After looking into the code, I believe this is the line behind the issue : 
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L147



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16434) ForeignKey INNER join does not unset join result when FK becomes null

2024-03-27 Thread Ayoub Omari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ayoub Omari updated KAFKA-16434:

Description: 
We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, 
String]_

where _LeftRecord_ :
{code:scala}
 case class LeftRecord(foreignKey: String, name: String){code}
we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
resulting join value is the value in right-topic. (same topology example as in 
KAFKA-16407)

 

*Scenario: Unset foreign key of a primary key*
{code:scala}
rightTopic.pipeInput("fk1", "1")

leftTopic.pipeInput("pk1", ProductValue("fk1", "pk1")) 
leftTopic.pipeInput("pk1", ProductValue(null, "pk1"))
{code}
 

*+Actual result+*
{code:java}
KeyValue(pk1, 3) {code}
 

*+Expected result+*
{code:java}
KeyValue(pk1, 3)
KeyValue(pk1, null) // This unsets the join between pk1 and fk1{code}
 

However, in {+}other cases{+}, where the join result should be unset (e.g. the 
primary key is deleted, or the foreign key changes to a non existing FK), that 
record is {+}correctly emitted{+}.

 

Also, the importance of unsetting the join result is mentioned in the code: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L161C21-L163C36]

 
{code:java}
//[...] Additionally, propagate null if no FK is found there,
// since we must "unset" any output set by the previous FK-join. This is true 
for both INNER and LEFT join. {code}
 

 

  was:
We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, 
String]_

where _LeftRecord_ :
{code:scala}
 case class LeftRecord(foreignKey: String, name: String){code}
we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
resulting join value is the value in right-topic. (same topology example as in 
KAFKA-16407)

 

*Scenario: Unset foreign key of a primary key*
{code:scala}
rightTopic.pipeInput("fk1", "1")

leftTopic.pipeInput("pk1", ProductValue("fk1", "pk1")) 
leftTopic.pipeInput("pk1", ProductValue(null, "pk1"))
{code}
 

*+Actual result+*

 
{code:java}
KeyValue(pk1, 3) {code}
 

*+Expected result+*

 
{code:java}
KeyValue(pk1, 3)
KeyValue(pk1, null) // This unsets the join between pk1 and fk1{code}
 

However, in {+}other cases{+}, where the join result should be unset (e.g. the 
primary key is deleted, or the foreign key changes to a non existing FK), that 
record is {+}correctly emitted{+}.

 

Also, the importance of unsetting the join result is mentioned in the code: 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L161C21-L163C36

 
{code:java}
//[...] Additionally, propagate null if no FK is found there,
// since we must "unset" any output set by the previous FK-join. This is true 
for both INNER and LEFT join. {code}
 

 


> ForeignKey INNER join does not unset join result when FK becomes null
> -
>
> Key: KAFKA-16434
> URL: https://issues.apache.org/jira/browse/KAFKA-16434
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.2, 3.7.0
>Reporter: Ayoub Omari
>Priority: Major
>
> We have two topics : _left-topic[String, LeftRecord]_ and 
> _right-topic[String, String]_
> where _LeftRecord_ :
> {code:scala}
>  case class LeftRecord(foreignKey: String, name: String){code}
> we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
> resulting join value is the value in right-topic. (same topology example as 
> in KAFKA-16407)
>  
> *Scenario: Unset foreign key of a primary key*
> {code:scala}
> rightTopic.pipeInput("fk1", "1")
> leftTopic.pipeInput("pk1", ProductValue("fk1", "pk1")) 
> leftTopic.pipeInput("pk1", ProductValue(null, "pk1"))
> {code}
>  
> *+Actual result+*
> {code:java}
> KeyValue(pk1, 3) {code}
>  
> *+Expected result+*
> {code:java}
> KeyValue(pk1, 3)
> KeyValue(pk1, null) // This unsets the join between pk1 and fk1{code}
>  
> However, in {+}other cases{+}, where the join result should be unset (e.g. 
> the primary key is deleted, or the foreign key changes to a non existing FK), 
> that record is {+}correctly emitted{+}.
>  
> Also, the importance of unsetting the join result is mentioned in the code: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L161C21-L163C36]
>  
> {code:java}
> //[...] Additionally, propagate null if no FK is found there,
> // since we must "unset" any output set by the previous FK-join. This is true 
> for both INNER and LEFT join. {code}
>  
>  



--
This message was sent by 

[jira] [Updated] (KAFKA-16434) ForeignKey INNER join does not unset join result when FK becomes null

2024-03-27 Thread Ayoub Omari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ayoub Omari updated KAFKA-16434:

Description: 
We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, 
String]_

where _LeftRecord_ :
{code:scala}
 case class LeftRecord(foreignKey: String, name: String){code}
we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
resulting join value is the value in right-topic. (same topology example as in 
KAFKA-16407)

 

*Scenario: Unset foreign key of a primary key*
{code:scala}
rightTopic.pipeInput("fk1", "1")

leftTopic.pipeInput("pk1", ProductValue("fk1", "pk1")) 
leftTopic.pipeInput("pk1", ProductValue(null, "pk1"))
{code}
 

*+Actual result+*
{code:java}
KeyValue(pk1, 3) {code}
 

*+Expected result+*
{code:java}
KeyValue(pk1, 3)
KeyValue(pk1, null) // This unsets the join between pk1 and fk1{code}
 

However, in {+}other cases{+}, where the join result should be unset (e.g. the 
primary key is deleted, or the foreign key changes to a non existing FK), that 
record is {+}correctly emitted{+}.

 

Also, the importance of unsetting the join result is mentioned in the code: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L161C21-L163C36]
{code:java}
//[...] Additionally, propagate null if no FK is found there,
// since we must "unset" any output set by the previous FK-join. This is true 
for both INNER and LEFT join. {code}
 

 

  was:
We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, 
String]_

where _LeftRecord_ :
{code:scala}
 case class LeftRecord(foreignKey: String, name: String){code}
we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
resulting join value is the value in right-topic. (same topology example as in 
KAFKA-16407)

 

*Scenario: Unset foreign key of a primary key*
{code:scala}
rightTopic.pipeInput("fk1", "1")

leftTopic.pipeInput("pk1", ProductValue("fk1", "pk1")) 
leftTopic.pipeInput("pk1", ProductValue(null, "pk1"))
{code}
 

*+Actual result+*
{code:java}
KeyValue(pk1, 3) {code}
 

*+Expected result+*
{code:java}
KeyValue(pk1, 3)
KeyValue(pk1, null) // This unsets the join between pk1 and fk1{code}
 

However, in {+}other cases{+}, where the join result should be unset (e.g. the 
primary key is deleted, or the foreign key changes to a non existing FK), that 
record is {+}correctly emitted{+}.

 

Also, the importance of unsetting the join result is mentioned in the code: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L161C21-L163C36]

 
{code:java}
//[...] Additionally, propagate null if no FK is found there,
// since we must "unset" any output set by the previous FK-join. This is true 
for both INNER and LEFT join. {code}
 

 


> ForeignKey INNER join does not unset join result when FK becomes null
> -
>
> Key: KAFKA-16434
> URL: https://issues.apache.org/jira/browse/KAFKA-16434
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.2, 3.7.0
>Reporter: Ayoub Omari
>Priority: Major
>
> We have two topics : _left-topic[String, LeftRecord]_ and 
> _right-topic[String, String]_
> where _LeftRecord_ :
> {code:scala}
>  case class LeftRecord(foreignKey: String, name: String){code}
> we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
> resulting join value is the value in right-topic. (same topology example as 
> in KAFKA-16407)
>  
> *Scenario: Unset foreign key of a primary key*
> {code:scala}
> rightTopic.pipeInput("fk1", "1")
> leftTopic.pipeInput("pk1", ProductValue("fk1", "pk1")) 
> leftTopic.pipeInput("pk1", ProductValue(null, "pk1"))
> {code}
>  
> *+Actual result+*
> {code:java}
> KeyValue(pk1, 3) {code}
>  
> *+Expected result+*
> {code:java}
> KeyValue(pk1, 3)
> KeyValue(pk1, null) // This unsets the join between pk1 and fk1{code}
>  
> However, in {+}other cases{+}, where the join result should be unset (e.g. 
> the primary key is deleted, or the foreign key changes to a non existing FK), 
> that record is {+}correctly emitted{+}.
>  
> Also, the importance of unsetting the join result is mentioned in the code: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L161C21-L163C36]
> {code:java}
> //[...] Additionally, propagate null if no FK is found there,
> // since we must "unset" any output set by the previous FK-join. This is true 
> for both INNER and LEFT join. {code}
>  
>  



--
This message was sent by Atlassian 

[jira] [Created] (KAFKA-16434) ForeignKey INNER join does not unset join result when FK becomes null

2024-03-27 Thread Ayoub Omari (Jira)
Ayoub Omari created KAFKA-16434:
---

 Summary: ForeignKey INNER join does not unset join result when FK 
becomes null
 Key: KAFKA-16434
 URL: https://issues.apache.org/jira/browse/KAFKA-16434
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.7.0, 2.8.2
Reporter: Ayoub Omari


We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, 
String]_

where _LeftRecord_ :
{code:scala}
 case class LeftRecord(foreignKey: String, name: String){code}
we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
resulting join value is the value in right-topic. (same topology example as in 
KAFKA-16407)

 

*Scenario: Unset foreign key of a primary key*
{code:scala}
rightTopic.pipeInput("fk1", "1")

leftTopic.pipeInput("pk1", ProductValue("fk1", "pk1")) 
leftTopic.pipeInput("pk1", ProductValue(null, "pk1"))
{code}
 

*+Actual result+*

 
{code:java}
KeyValue(pk1, 3) {code}
 

*+Expected result+*

 
{code:java}
KeyValue(pk1, 3)
KeyValue(pk1, null) // This unsets the join between pk1 and fk1{code}
 

However, in {+}other cases{+}, where the join result should be unset (e.g. the 
primary key is deleted, or the foreign key changes to a non existing FK), that 
record is {+}correctly emitted{+}.

 

Also, the importance of unsetting the join result is mentioned in the code: 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L161C21-L163C36

 
{code:java}
//[...] Additionally, propagate null if no FK is found there,
// since we must "unset" any output set by the previous FK-join. This is true 
for both INNER and LEFT join. {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16407) ForeignKey INNER join ignores FK change when its previous value is null

2024-03-27 Thread Ayoub Omari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831501#comment-17831501
 ] 

Ayoub Omari commented on KAFKA-16407:
-

Actually this behavior is not concerned by this KIP, it's a condition that 
existed since 2.8 which wrongly assumes if old foreign key is null then we 
should skip the current record. From what I saw, the implementation of the KIP 
didn't change that code.

> ForeignKey INNER join ignores FK change when its previous value is null
> ---
>
> Key: KAFKA-16407
> URL: https://issues.apache.org/jira/browse/KAFKA-16407
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.2, 3.0.2, 3.1.2, 3.2.3, 3.3.2, 3.4.1, 3.5.2, 3.7.0, 
> 3.6.1
>Reporter: Ayoub Omari
>Assignee: Ayoub Omari
>Priority: Major
> Attachments: InnerFKJoinTest.scala, JsonSerde.scala
>
>
> We have two topics : _left-topic[String, LeftRecord]_ and 
> _right-topic[String, String]_
> where _LeftRecord_ :
> {code:scala}
>  case class LeftRecord(foreignKey: String, name: String){code}
> we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
> resulting join value is the value in right-topic.
>  
> *Scenario: Primary key pk1 gets mapped to a new FK after having a null FK*
> {code:scala}
> rightTopic.pipeInput("fk", "1")
> leftTopic.pipeInput("pk1", LeftRecord(null, "pk1"))
> leftTopic.pipeInput("pk1", LeftRecord("fk", "pk1")) {code}
>  
> *+Expected result+*
> {code:scala}
> KeyValue(pk1, 1){code}
>  
> *+Actual result+*
> {code:scala}
> # No output !
> # Logs:
> 20:14:29,723 WARN  
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier
>   - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] 
> topic=[left-topic] partition=[0] offset=[0]
> 20:14:29,728 WARN  
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier
>   - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] 
> topic=[left-topic] partition=[0] offset=[1]
> {code}
>  
> After looking into the code, I believe this is the line behind the issue : 
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L147



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16407) ForeignKey INNER join ignores FK change when its previous value is null

2024-03-26 Thread Ayoub Omari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ayoub Omari reassigned KAFKA-16407:
---

Assignee: Ayoub Omari

> ForeignKey INNER join ignores FK change when its previous value is null
> ---
>
> Key: KAFKA-16407
> URL: https://issues.apache.org/jira/browse/KAFKA-16407
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.2, 3.0.2, 3.1.2, 3.2.3, 3.3.2, 3.4.1, 3.5.2, 3.7.0, 
> 3.6.1
>Reporter: Ayoub Omari
>Assignee: Ayoub Omari
>Priority: Major
> Attachments: InnerFKJoinTest.scala, JsonSerde.scala
>
>
> We have two topics : _left-topic[String, LeftRecord]_ and 
> _right-topic[String, String]_
> where _LeftRecord_ :
> {code:scala}
>  case class LeftRecord(foreignKey: String, name: String){code}
> we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
> resulting join value is the value in right-topic.
>  
> *Scenario: Primary key pk1 gets mapped to a new FK after having a null FK*
> {code:scala}
> rightTopic.pipeInput("fk", "1")
> leftTopic.pipeInput("pk1", LeftRecord(null, "pk1"))
> leftTopic.pipeInput("pk1", LeftRecord("fk", "pk1")) {code}
>  
> *+Expected result+*
> {code:scala}
> KeyValue(pk1, 1){code}
>  
> *+Actual result+*
> {code:scala}
> # No output !
> # Logs:
> 20:14:29,723 WARN  
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier
>   - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] 
> topic=[left-topic] partition=[0] offset=[0]
> 20:14:29,728 WARN  
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier
>   - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] 
> topic=[left-topic] partition=[0] offset=[1]
> {code}
>  
> After looking into the code, I believe this is the line behind the issue : 
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L147



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16407) ForeignKey INNER join ignores FK change when its previous value is null

2024-03-22 Thread Ayoub Omari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ayoub Omari updated KAFKA-16407:

Affects Version/s: 3.6.1
   3.5.2
   3.4.1
   3.3.2
   3.2.3
   3.1.2
   3.0.2
   2.8.2

> ForeignKey INNER join ignores FK change when its previous value is null
> ---
>
> Key: KAFKA-16407
> URL: https://issues.apache.org/jira/browse/KAFKA-16407
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.2, 3.0.2, 3.1.2, 3.2.3, 3.3.2, 3.4.1, 3.5.2, 3.7.0, 
> 3.6.1
>Reporter: Ayoub Omari
>Priority: Major
> Attachments: InnerFKJoinTest.scala, JsonSerde.scala
>
>
> We have two topics : _left-topic[String, LeftRecord]_ and 
> _right-topic[String, String]_
> where _LeftRecord_ :
> {code:scala}
>  case class LeftRecord(foreignKey: String, name: String){code}
> we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
> resulting join value is the value in right-topic.
>  
> *Scenario: Primary key pk1 gets mapped to a new FK after having a null FK*
> {code:scala}
> rightTopic.pipeInput("fk", "1")
> leftTopic.pipeInput("pk1", LeftRecord(null, "pk1"))
> leftTopic.pipeInput("pk1", LeftRecord("fk", "pk1")) {code}
>  
> *+Expected result+*
> {code:scala}
> KeyValue(pk1, 1){code}
>  
> *+Actual result+*
> {code:scala}
> # No output !
> # Logs:
> 20:14:29,723 WARN  
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier
>   - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] 
> topic=[left-topic] partition=[0] offset=[0]
> 20:14:29,728 WARN  
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier
>   - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] 
> topic=[left-topic] partition=[0] offset=[1]
> {code}
>  
> After looking into the code, I believe this is the line behind the issue : 
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L147



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16407) ForeignKey INNER join ignores FK change when its previous value is null

2024-03-22 Thread Ayoub Omari (Jira)
Ayoub Omari created KAFKA-16407:
---

 Summary: ForeignKey INNER join ignores FK change when its previous 
value is null
 Key: KAFKA-16407
 URL: https://issues.apache.org/jira/browse/KAFKA-16407
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.7.0
Reporter: Ayoub Omari


We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, 
String]_

where _LeftRecord_ :
{code:scala}
 case class LeftRecord(foreignKey: String, name: String){code}
we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
resulting join value is the value in right-topic.

 

*Scenario: Primary key pk1 gets mapped to a new FK after having a null FK*
{code:scala}
rightTopic.pipeInput("fk", "1")

leftTopic.pipeInput("pk1", LeftRecord(null, "pk1"))
leftTopic.pipeInput("pk1", LeftRecord("fk", "pk1")) {code}
 

*+Expected result+*
{code:scala}
KeyValue(pk1, 1){code}
 

*+Actual result+*
{code:scala}
# No output !

# Logs:

20:14:29,723 WARN  
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier
  - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] 
topic=[left-topic] partition=[0] offset=[0]

20:14:29,728 WARN  
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier
  - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] 
topic=[left-topic] partition=[0] offset=[1]
{code}
 

After looking into the code, I believe this is the line behind the issue : 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L147



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16407) ForeignKey INNER join ignores FK change when its previous value is null

2024-03-22 Thread Ayoub Omari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ayoub Omari updated KAFKA-16407:

Attachment: InnerFKJoinTest.scala
JsonSerde.scala

> ForeignKey INNER join ignores FK change when its previous value is null
> ---
>
> Key: KAFKA-16407
> URL: https://issues.apache.org/jira/browse/KAFKA-16407
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Ayoub Omari
>Priority: Major
> Attachments: InnerFKJoinTest.scala, JsonSerde.scala
>
>
> We have two topics : _left-topic[String, LeftRecord]_ and 
> _right-topic[String, String]_
> where _LeftRecord_ :
> {code:scala}
>  case class LeftRecord(foreignKey: String, name: String){code}
> we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
> resulting join value is the value in right-topic.
>  
> *Scenario: Primary key pk1 gets mapped to a new FK after having a null FK*
> {code:scala}
> rightTopic.pipeInput("fk", "1")
> leftTopic.pipeInput("pk1", LeftRecord(null, "pk1"))
> leftTopic.pipeInput("pk1", LeftRecord("fk", "pk1")) {code}
>  
> *+Expected result+*
> {code:scala}
> KeyValue(pk1, 1){code}
>  
> *+Actual result+*
> {code:scala}
> # No output !
> # Logs:
> 20:14:29,723 WARN  
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier
>   - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] 
> topic=[left-topic] partition=[0] offset=[0]
> 20:14:29,728 WARN  
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier
>   - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] 
> topic=[left-topic] partition=[0] offset=[1]
> {code}
>  
> After looking into the code, I believe this is the line behind the issue : 
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L147



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16394) ForeignKey LEFT join propagates null value on foreignKey change

2024-03-22 Thread Ayoub Omari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ayoub Omari updated KAFKA-16394:

Description: 
We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, 
String]_

where _LeftRecord_ :
{code:scala}
 case class LeftRecord(foreignKey: String, name: String){code}
we do a simple LEFT foreign key join on left-topic's foreignKey field. The 
resulting join value is the value in right-topic.

 

+*Scenario1: change foreignKey*+

Input the following
{code:scala}
rightTopic.pipeInput("fk1", "1")
rightTopic.pipeInput("fk2", "2") 

leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1"))
leftTopic.pipeInput("pk1", LeftRecord("fk2", "pk1"))
{code}
 

*+Expected result+*
{code:scala}
KeyValue(pk1, 1)
KeyValue(pk1, 2){code}
 

*+Actual result+*
{code:scala}
KeyValue(pk1, 1)
KeyValue(pk1, null)
KeyValue(pk1, 2){code}
 

A null is propagated to the join result when the foreign key changes

 

+*Scenario 2: Delete PrimaryKey*+

Input
{code:scala}
rightTopic.pipeInput("fk1", "1")
rightTopic.pipeInput("fk2", "2")

leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1"))
leftTopic.pipeInput("pk1", null) {code}
 

*+Expected result+*
{code:scala}
KeyValue(pk1, 1)
KeyValue(pk1, null) {code}
 

*+Actual result+*
{code:java}
KeyValue(pk1, 1)
KeyValue(pk1, null)
KeyValue(pk1, null) {code}
An additional null is propagated to the join result.

 

This bug doesn't exist on versions 3.6.0 and below.

 

I believe the issue comes from the line 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L134]

where we propagate the deletion in the two scenarios above

 

Attaching the topology I used.

  was:
We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, 
String]_

where _LeftRecord_ :
{code:scala}
 case class LeftRecord(foreignKey: String, name: String){code}
we do a simple foreign key join on left-topic's foreignKey field which returns 
the value in right-topic.

 

+*Scenario1: change foreignKey*+

Input the following
{code:scala}
rightTopic.pipeInput("fk1", "1")
rightTopic.pipeInput("fk2", "2") 

leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1"))
leftTopic.pipeInput("pk1", LeftRecord("fk2", "pk1"))
{code}
 

*+Expected result+*
{code:scala}
KeyValue(pk1, 1)
KeyValue(pk1, 2){code}
 

*+Actual result+*
{code:scala}
KeyValue(pk1, 1)
KeyValue(pk1, null)
KeyValue(pk1, 2){code}
 

A null is propagated to the join result when the foreign key changes

 

+*Scenario 2: Delete PrimaryKey*+

Input
{code:scala}
rightTopic.pipeInput("fk1", "1")
rightTopic.pipeInput("fk2", "2")

leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1"))
leftTopic.pipeInput("pk1", null) {code}
 

*+Expected result+*
{code:scala}
KeyValue(pk1, 1)
KeyValue(pk1, null) {code}
 

*+Actual result+*
{code:java}
KeyValue(pk1, 1)
KeyValue(pk1, null)
KeyValue(pk1, null) {code}
An additional null is propagated to the join result.

 

This bug doesn't exist on versions 3.6.0 and below.

 

I believe the issue comes from the line 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L134]

where we propagate the deletion in the two scenarios above

 

Attaching the topology I used.


> ForeignKey LEFT join propagates null value on foreignKey change
> ---
>
> Key: KAFKA-16394
> URL: https://issues.apache.org/jira/browse/KAFKA-16394
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Ayoub Omari
>Assignee: Ayoub Omari
>Priority: Major
> Attachments: ForeignJoinTest.scala, JsonSerde.scala
>
>
> We have two topics : _left-topic[String, LeftRecord]_ and 
> _right-topic[String, String]_
> where _LeftRecord_ :
> {code:scala}
>  case class LeftRecord(foreignKey: String, name: String){code}
> we do a simple LEFT foreign key join on left-topic's foreignKey field. The 
> resulting join value is the value in right-topic.
>  
> +*Scenario1: change foreignKey*+
> Input the following
> {code:scala}
> rightTopic.pipeInput("fk1", "1")
> rightTopic.pipeInput("fk2", "2") 
> leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1"))
> leftTopic.pipeInput("pk1", LeftRecord("fk2", "pk1"))
> {code}
>  
> *+Expected result+*
> {code:scala}
> KeyValue(pk1, 1)
> KeyValue(pk1, 2){code}
>  
> *+Actual result+*
> {code:scala}
> KeyValue(pk1, 1)
> KeyValue(pk1, null)
> KeyValue(pk1, 2){code}
>  
> A null is propagated to the join result when the foreign key changes
>  
> +*Scenario 2: Delete PrimaryKey*+
> Input
> {code:scala}
> rightTopic.pipeInput("fk1", "1")
> rightTopic.pipeInput("fk2", "2")
> leftTopic.pipeInput("pk1", 

[jira] [Updated] (KAFKA-16394) ForeignKey LEFT join propagates null value on foreignKey change

2024-03-22 Thread Ayoub Omari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ayoub Omari updated KAFKA-16394:

Description: 
We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, 
String]_

where _LeftRecord_ :
{code:scala}
 case class LeftRecord(foreignKey: String, name: String){code}
we do a simple *LEFT* foreign key join on left-topic's foreignKey field. The 
resulting join value is the value in right-topic.

 

+*Scenario1: change foreignKey*+

Input the following
{code:scala}
rightTopic.pipeInput("fk1", "1")
rightTopic.pipeInput("fk2", "2") 

leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1"))
leftTopic.pipeInput("pk1", LeftRecord("fk2", "pk1"))
{code}
 

*+Expected result+*
{code:scala}
KeyValue(pk1, 1)
KeyValue(pk1, 2){code}
 

*+Actual result+*
{code:scala}
KeyValue(pk1, 1)
KeyValue(pk1, null)
KeyValue(pk1, 2){code}
 

A null is propagated to the join result when the foreign key changes

 

+*Scenario 2: Delete PrimaryKey*+

Input
{code:scala}
rightTopic.pipeInput("fk1", "1")
rightTopic.pipeInput("fk2", "2")

leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1"))
leftTopic.pipeInput("pk1", null) {code}
 

*+Expected result+*
{code:scala}
KeyValue(pk1, 1)
KeyValue(pk1, null) {code}
 

*+Actual result+*
{code:java}
KeyValue(pk1, 1)
KeyValue(pk1, null)
KeyValue(pk1, null) {code}
An additional null is propagated to the join result.

 

This bug doesn't exist on versions 3.6.0 and below.

 

I believe the issue comes from the line 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L134]

where we propagate the deletion in the two scenarios above

 

Attaching the topology I used.

  was:
We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, 
String]_

where _LeftRecord_ :
{code:scala}
 case class LeftRecord(foreignKey: String, name: String){code}
we do a simple LEFT foreign key join on left-topic's foreignKey field. The 
resulting join value is the value in right-topic.

 

+*Scenario1: change foreignKey*+

Input the following
{code:scala}
rightTopic.pipeInput("fk1", "1")
rightTopic.pipeInput("fk2", "2") 

leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1"))
leftTopic.pipeInput("pk1", LeftRecord("fk2", "pk1"))
{code}
 

*+Expected result+*
{code:scala}
KeyValue(pk1, 1)
KeyValue(pk1, 2){code}
 

*+Actual result+*
{code:scala}
KeyValue(pk1, 1)
KeyValue(pk1, null)
KeyValue(pk1, 2){code}
 

A null is propagated to the join result when the foreign key changes

 

+*Scenario 2: Delete PrimaryKey*+

Input
{code:scala}
rightTopic.pipeInput("fk1", "1")
rightTopic.pipeInput("fk2", "2")

leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1"))
leftTopic.pipeInput("pk1", null) {code}
 

*+Expected result+*
{code:scala}
KeyValue(pk1, 1)
KeyValue(pk1, null) {code}
 

*+Actual result+*
{code:java}
KeyValue(pk1, 1)
KeyValue(pk1, null)
KeyValue(pk1, null) {code}
An additional null is propagated to the join result.

 

This bug doesn't exist on versions 3.6.0 and below.

 

I believe the issue comes from the line 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L134]

where we propagate the deletion in the two scenarios above

 

Attaching the topology I used.


> ForeignKey LEFT join propagates null value on foreignKey change
> ---
>
> Key: KAFKA-16394
> URL: https://issues.apache.org/jira/browse/KAFKA-16394
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Ayoub Omari
>Assignee: Ayoub Omari
>Priority: Major
> Attachments: ForeignJoinTest.scala, JsonSerde.scala
>
>
> We have two topics : _left-topic[String, LeftRecord]_ and 
> _right-topic[String, String]_
> where _LeftRecord_ :
> {code:scala}
>  case class LeftRecord(foreignKey: String, name: String){code}
> we do a simple *LEFT* foreign key join on left-topic's foreignKey field. The 
> resulting join value is the value in right-topic.
>  
> +*Scenario1: change foreignKey*+
> Input the following
> {code:scala}
> rightTopic.pipeInput("fk1", "1")
> rightTopic.pipeInput("fk2", "2") 
> leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1"))
> leftTopic.pipeInput("pk1", LeftRecord("fk2", "pk1"))
> {code}
>  
> *+Expected result+*
> {code:scala}
> KeyValue(pk1, 1)
> KeyValue(pk1, 2){code}
>  
> *+Actual result+*
> {code:scala}
> KeyValue(pk1, 1)
> KeyValue(pk1, null)
> KeyValue(pk1, 2){code}
>  
> A null is propagated to the join result when the foreign key changes
>  
> +*Scenario 2: Delete PrimaryKey*+
> Input
> {code:scala}
> rightTopic.pipeInput("fk1", "1")
> rightTopic.pipeInput("fk2", "2")
> 

[jira] [Assigned] (KAFKA-16394) ForeignKey LEFT join propagates null value on foreignKey change

2024-03-22 Thread Ayoub Omari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ayoub Omari reassigned KAFKA-16394:
---

Assignee: Ayoub Omari

> ForeignKey LEFT join propagates null value on foreignKey change
> ---
>
> Key: KAFKA-16394
> URL: https://issues.apache.org/jira/browse/KAFKA-16394
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Ayoub Omari
>Assignee: Ayoub Omari
>Priority: Major
> Attachments: ForeignJoinTest.scala, JsonSerde.scala
>
>
> We have two topics : _left-topic[String, LeftRecord]_ and 
> _right-topic[String, String]_
> where _LeftRecord_ :
> {code:scala}
>  case class LeftRecord(foreignKey: String, name: String){code}
> we do a simple foreign key join on left-topic's foreignKey field which 
> returns the value in right-topic.
>  
> +*Scenario1: change foreignKey*+
> Input the following
> {code:scala}
> rightTopic.pipeInput("fk1", "1")
> rightTopic.pipeInput("fk2", "2") 
> leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1"))
> leftTopic.pipeInput("pk1", LeftRecord("fk2", "pk1"))
> {code}
>  
> *+Expected result+*
> {code:scala}
> KeyValue(pk1, 1)
> KeyValue(pk1, 2){code}
>  
> *+Actual result+*
> {code:scala}
> KeyValue(pk1, 1)
> KeyValue(pk1, null)
> KeyValue(pk1, 2){code}
>  
> A null is propagated to the join result when the foreign key changes
>  
> +*Scenario 2: Delete PrimaryKey*+
> Input
> {code:scala}
> rightTopic.pipeInput("fk1", "1")
> rightTopic.pipeInput("fk2", "2")
> leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1"))
> leftTopic.pipeInput("pk1", null) {code}
>  
> *+Expected result+*
> {code:scala}
> KeyValue(pk1, 1)
> KeyValue(pk1, null) {code}
>  
> *+Actual result+*
> {code:java}
> KeyValue(pk1, 1)
> KeyValue(pk1, null)
> KeyValue(pk1, null) {code}
> An additional null is propagated to the join result.
>  
> This bug doesn't exist on versions 3.6.0 and below.
>  
> I believe the issue comes from the line 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L134]
> where we propagate the deletion in the two scenarios above
>  
> Attaching the topology I used.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16394) ForeignKey LEFT join propagates null value on foreignKey change

2024-03-20 Thread Ayoub Omari (Jira)
Ayoub Omari created KAFKA-16394:
---

 Summary: ForeignKey LEFT join propagates null value on foreignKey 
change
 Key: KAFKA-16394
 URL: https://issues.apache.org/jira/browse/KAFKA-16394
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.7.0
Reporter: Ayoub Omari
 Attachments: ForeignJoinTest.scala, JsonSerde.scala

We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, 
String]_

where _LeftRecord_ :
{code:scala}
 case class LeftRecord(foreignKey: String, name: String){code}
we do a simple foreign key join on left-topic's foreignKey field which returns 
the value in right-topic.

 

+*Scenario1: change foreignKey*+

Input the following
{code:scala}
rightTopic.pipeInput("fk1", "1")
rightTopic.pipeInput("fk2", "2") 

leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1"))
leftTopic.pipeInput("pk1", LeftRecord("fk2", "pk1"))
{code}
 

*+Expected result+*
{code:scala}
KeyValue(pk1, 1)
KeyValue(pk1, 2){code}
 

*+Actual result+*
{code:scala}
KeyValue(pk1, 1)
KeyValue(pk1, null)
KeyValue(pk1, 2){code}
 

A null is propagated to the join result when the foreign key changes

 

+*Scenario 2: Delete PrimaryKey*+

Input
{code:scala}
rightTopic.pipeInput("fk1", "1")
rightTopic.pipeInput("fk2", "2")

leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1"))
leftTopic.pipeInput("pk1", null) {code}
 

*+Expected result+*
{code:scala}
KeyValue(pk1, 1)
KeyValue(pk1, null) {code}
 

*+Actual result+*
{code:java}
KeyValue(pk1, 1)
KeyValue(pk1, null)
KeyValue(pk1, null) {code}
An additional null is propagated to the join result.

 

This bug doesn't exist on versions 3.6.0 and below.

 

I believe the issue comes from the line 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L134]

where we propagate the deletion in the two scenarios above

 

Attaching the topology I used.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16343) Improve tests of streams foreignkeyjoin package

2024-03-05 Thread Ayoub Omari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ayoub Omari updated KAFKA-16343:

Summary: Improve tests of streams foreignkeyjoin package  (was: Improve 
tests of streams foreignkey package)

> Improve tests of streams foreignkeyjoin package
> ---
>
> Key: KAFKA-16343
> URL: https://issues.apache.org/jira/browse/KAFKA-16343
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Ayoub Omari
>Assignee: Ayoub Omari
>Priority: Major
>
> Some classes are not tested in streams foreignkey package, such as 
> SubscriptionSendProcessorSupplier and ForeignTableJoinProcessorSupplier. 
> Corresponding tests should be added.
> The class ForeignTableJoinProcessorSupplierTest should be renamed as it is 
> not testing ForeignTableJoinProcessor, but rather 
> SubscriptionJoinProcessorSupplier.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16343) Improve tests of streams foreignkeyjoin package

2024-03-05 Thread Ayoub Omari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ayoub Omari updated KAFKA-16343:

Description: 
Some classes are not tested in streams foreignkeyjoin package, such as 
SubscriptionSendProcessorSupplier and ForeignTableJoinProcessorSupplier. 
Corresponding tests should be added.

The class ForeignTableJoinProcessorSupplierTest should be renamed as it is not 
testing ForeignTableJoinProcessor, but rather SubscriptionJoinProcessorSupplier.

 

  was:
Some classes are not tested in streams foreignkey package, such as 
SubscriptionSendProcessorSupplier and ForeignTableJoinProcessorSupplier. 
Corresponding tests should be added.

The class ForeignTableJoinProcessorSupplierTest should be renamed as it is not 
testing ForeignTableJoinProcessor, but rather SubscriptionJoinProcessorSupplier.

 


> Improve tests of streams foreignkeyjoin package
> ---
>
> Key: KAFKA-16343
> URL: https://issues.apache.org/jira/browse/KAFKA-16343
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Ayoub Omari
>Assignee: Ayoub Omari
>Priority: Major
>
> Some classes are not tested in streams foreignkeyjoin package, such as 
> SubscriptionSendProcessorSupplier and ForeignTableJoinProcessorSupplier. 
> Corresponding tests should be added.
> The class ForeignTableJoinProcessorSupplierTest should be renamed as it is 
> not testing ForeignTableJoinProcessor, but rather 
> SubscriptionJoinProcessorSupplier.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16343) Improve tests of streams foreignkey package

2024-03-04 Thread Ayoub Omari (Jira)
Ayoub Omari created KAFKA-16343:
---

 Summary: Improve tests of streams foreignkey package
 Key: KAFKA-16343
 URL: https://issues.apache.org/jira/browse/KAFKA-16343
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 3.7.0
Reporter: Ayoub Omari
Assignee: Ayoub Omari


Some classes are not tested in streams foreignkey package, such as 
SubscriptionSendProcessorSupplier and ForeignTableJoinProcessorSupplier. 
Corresponding tests should be added.

The class ForeignTableJoinProcessorSupplierTest should be renamed as it is not 
testing ForeignTableJoinProcessor, but rather SubscriptionJoinProcessorSupplier.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14747) FK join should record discarded subscription responses

2024-02-20 Thread Ayoub Omari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ayoub Omari reassigned KAFKA-14747:
---

Assignee: Ayoub Omari  (was: Koma Zhang)

> FK join should record discarded subscription responses
> --
>
> Key: KAFKA-14747
> URL: https://issues.apache.org/jira/browse/KAFKA-14747
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Ayoub Omari
>Priority: Minor
>  Labels: beginner, newbie
>
> FK-joins are subject to a race condition: If the left-hand side record is 
> updated, a subscription is sent to the right-hand side (including a hash 
> value of the left-hand side record), and the right-hand side might send back 
> join responses (also including the original hash). The left-hand side only 
> processed the responses if the returned hash matches to current hash of the 
> left-hand side record, because a different hash implies that the lef- hand 
> side record was updated in the mean time (including sending a new 
> subscription to the right hand side), and thus the data is stale and the 
> response should not be processed (joining the response to the new record 
> could lead to incorrect results).
> A similar thing can happen on a right-hand side update that triggers a 
> response, that might be dropped if the left-hand side record was updated in 
> parallel.
> While the behavior is correct, we don't record if this happens. We should 
> consider to record this using the existing "dropped record" sensor or maybe 
> add a new sensor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14747) FK join should record discarded subscription responses

2024-02-20 Thread Ayoub Omari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818919#comment-17818919
 ] 

Ayoub Omari commented on KAFKA-14747:
-

Picking this up since [~kma] didn't respond :)

> FK join should record discarded subscription responses
> --
>
> Key: KAFKA-14747
> URL: https://issues.apache.org/jira/browse/KAFKA-14747
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Koma Zhang
>Priority: Minor
>  Labels: beginner, newbie
>
> FK-joins are subject to a race condition: If the left-hand side record is 
> updated, a subscription is sent to the right-hand side (including a hash 
> value of the left-hand side record), and the right-hand side might send back 
> join responses (also including the original hash). The left-hand side only 
> processed the responses if the returned hash matches to current hash of the 
> left-hand side record, because a different hash implies that the lef- hand 
> side record was updated in the mean time (including sending a new 
> subscription to the right hand side), and thus the data is stale and the 
> response should not be processed (joining the response to the new record 
> could lead to incorrect results).
> A similar thing can happen on a right-hand side update that triggers a 
> response, that might be dropped if the left-hand side record was updated in 
> parallel.
> While the behavior is correct, we don't record if this happens. We should 
> consider to record this using the existing "dropped record" sensor or maybe 
> add a new sensor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14747) FK join should record discarded subscription responses

2024-02-07 Thread Ayoub Omari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815424#comment-17815424
 ] 

Ayoub Omari commented on KAFKA-14747:
-

I see that the ticket is open for a while. May I take it over ?

[~mjsax] I have one question, I saw that we don't test the dropped sensor count 
within tests of Processor nodes (fkResponseJoin, KTableKTableJoin, etc...). Is 
it because we have no way to access or mock StreamsMetrics from there ? 

> FK join should record discarded subscription responses
> --
>
> Key: KAFKA-14747
> URL: https://issues.apache.org/jira/browse/KAFKA-14747
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Koma Zhang
>Priority: Minor
>  Labels: beginner, newbie
>
> FK-joins are subject to a race condition: If the left-hand side record is 
> updated, a subscription is sent to the right-hand side (including a hash 
> value of the left-hand side record), and the right-hand side might send back 
> join responses (also including the original hash). The left-hand side only 
> processed the responses if the returned hash matches to current hash of the 
> left-hand side record, because a different hash implies that the lef- hand 
> side record was updated in the mean time (including sending a new 
> subscription to the right hand side), and thus the data is stale and the 
> response should not be processed (joining the response to the new record 
> could lead to incorrect results).
> A similar thing can happen on a right-hand side update that triggers a 
> response, that might be dropped if the left-hand side record was updated in 
> parallel.
> While the behavior is correct, we don't record if this happens. We should 
> consider to record this using the existing "dropped record" sensor or maybe 
> add a new sensor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15625) Do not flush global state store at each commit

2024-02-02 Thread Ayoub Omari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ayoub Omari reassigned KAFKA-15625:
---

Assignee: Ayoub Omari

> Do not flush global state store at each commit
> --
>
> Key: KAFKA-15625
> URL: https://issues.apache.org/jira/browse/KAFKA-15625
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Cadonna
>Assignee: Ayoub Omari
>Priority: Major
>
> Global state stores are flushed at each commit. While that is not a big issue 
> with at-least-once processing mode since the commit interval is by default 
> 30s, it might become an issue with EOS where the commit interval is 200ms by 
> default.
> One option would be to flush and checkpoint global state stores when the 
> delta of the content exceeds a given threshold as we do for other stores. See 
> https://github.com/apache/kafka/blob/a1f3c6d16061566a4f53c72a95e2679b8ee229e0/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L97
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15625) Do not flush global state store at each commit

2024-02-02 Thread Ayoub Omari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17813682#comment-17813682
 ] 

Ayoub Omari commented on KAFKA-15625:
-

[~cadonna] I am assigning myself to the ticket for now. Please let me know if 
you have any thoughts.

> Do not flush global state store at each commit
> --
>
> Key: KAFKA-15625
> URL: https://issues.apache.org/jira/browse/KAFKA-15625
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Cadonna
>Priority: Major
>
> Global state stores are flushed at each commit. While that is not a big issue 
> with at-least-once processing mode since the commit interval is by default 
> 30s, it might become an issue with EOS where the commit interval is 200ms by 
> default.
> One option would be to flush and checkpoint global state stores when the 
> delta of the content exceeds a given threshold as we do for other stores. See 
> https://github.com/apache/kafka/blob/a1f3c6d16061566a4f53c72a95e2679b8ee229e0/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L97
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15625) Do not flush global state store at each commit

2024-02-01 Thread Ayoub Omari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17813099#comment-17813099
 ] 

Ayoub Omari commented on KAFKA-15625:
-

Hi [~cadonna], is this improvement ready for development ? I am interested in 
taking it.

One question, do you have any idea about the delta we will be considering for 
global state stores ?

I see that it is set to 10.000 records for task state stores. My understanding 
is that flushing will only help when restoring the store from the same 
instance, so that it reads less records from the source topic. So, I am 
thinking it's a fair value ?

> Do not flush global state store at each commit
> --
>
> Key: KAFKA-15625
> URL: https://issues.apache.org/jira/browse/KAFKA-15625
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Cadonna
>Priority: Major
>
> Global state stores are flushed at each commit. While that is not a big issue 
> with at-least-once processing mode since the commit interval is by default 
> 30s, it might become an issue with EOS where the commit interval is 200ms by 
> default.
> One option would be to flush and checkpoint global state stores when the 
> delta of the content exceeds a given threshold as we do for other stores. See 
> https://github.com/apache/kafka/blob/a1f3c6d16061566a4f53c72a95e2679b8ee229e0/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L97
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14404) Fix & update docs on client configs controlled by Streams

2024-01-09 Thread Ayoub Omari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ayoub Omari updated KAFKA-14404:

External issue URL:   (was: https://github.com/apache/kafka/pull/15162)

> Fix & update docs on client configs controlled by Streams
> -
>
> Key: KAFKA-14404
> URL: https://issues.apache.org/jira/browse/KAFKA-14404
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Ayoub Omari
>Priority: Major
>  Labels: docs, newbie
>
> There are a handful of client configs that can't be set by Streams users for 
> various reasons, such as the group id, but we seem to have missed a few of 
> them in the documentation 
> [here|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]:
>  the partitioner assignor (Consumer) and partitioner (Producer).
> This section of the docs also just needs to be cleaned up in general as there 
> is overlap between the [Default 
> Values|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#default-values]
>  and [Parameters controlled by Kafka 
> Streams|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]
>  sections, and the table of contents is messed up presumably due to an issue 
> with the section headers.
> We should separate these with one section covering (only) configs where 
> Streams sets a different default but this can still be overridden by the 
> user, and the other section covering the configs that Streams hardcodes and 
> users can never override.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14404) Fix & update docs on client configs controlled by Streams

2024-01-09 Thread Ayoub Omari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ayoub Omari updated KAFKA-14404:

External issue URL: https://github.com/apache/kafka/pull/15162

> Fix & update docs on client configs controlled by Streams
> -
>
> Key: KAFKA-14404
> URL: https://issues.apache.org/jira/browse/KAFKA-14404
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Ayoub Omari
>Priority: Major
>  Labels: docs, newbie
>
> There are a handful of client configs that can't be set by Streams users for 
> various reasons, such as the group id, but we seem to have missed a few of 
> them in the documentation 
> [here|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]:
>  the partitioner assignor (Consumer) and partitioner (Producer).
> This section of the docs also just needs to be cleaned up in general as there 
> is overlap between the [Default 
> Values|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#default-values]
>  and [Parameters controlled by Kafka 
> Streams|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]
>  sections, and the table of contents is messed up presumably due to an issue 
> with the section headers.
> We should separate these with one section covering (only) configs where 
> Streams sets a different default but this can still be overridden by the 
> user, and the other section covering the configs that Streams hardcodes and 
> users can never override.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14404) Fix & update docs on client configs controlled by Streams

2024-01-05 Thread Ayoub Omari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ayoub Omari reassigned KAFKA-14404:
---

Assignee: Ayoub Omari  (was: Sujay Hegde)

> Fix & update docs on client configs controlled by Streams
> -
>
> Key: KAFKA-14404
> URL: https://issues.apache.org/jira/browse/KAFKA-14404
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Ayoub Omari
>Priority: Major
>  Labels: docs, newbie
>
> There are a handful of client configs that can't be set by Streams users for 
> various reasons, such as the group id, but we seem to have missed a few of 
> them in the documentation 
> [here|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]:
>  the partitioner assignor (Consumer) and partitioner (Producer).
> This section of the docs also just needs to be cleaned up in general as there 
> is overlap between the [Default 
> Values|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#default-values]
>  and [Parameters controlled by Kafka 
> Streams|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]
>  sections, and the table of contents is messed up presumably due to an issue 
> with the section headers.
> We should separate these with one section covering (only) configs where 
> Streams sets a different default but this can still be overridden by the 
> user, and the other section covering the configs that Streams hardcodes and 
> users can never override.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14404) Fix & update docs on client configs controlled by Streams

2024-01-04 Thread Ayoub Omari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17803252#comment-17803252
 ] 

Ayoub Omari commented on KAFKA-14404:
-

Hi [~ableegoldman] & [~sujayopensource] 

I see that this ticket is still open since some time now.

Can I take it up ?

> Fix & update docs on client configs controlled by Streams
> -
>
> Key: KAFKA-14404
> URL: https://issues.apache.org/jira/browse/KAFKA-14404
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sujay Hegde
>Priority: Major
>  Labels: docs, newbie
>
> There are a handful of client configs that can't be set by Streams users for 
> various reasons, such as the group id, but we seem to have missed a few of 
> them in the documentation 
> [here|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]:
>  the partitioner assignor (Consumer) and partitioner (Producer).
> This section of the docs also just needs to be cleaned up in general as there 
> is overlap between the [Default 
> Values|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#default-values]
>  and [Parameters controlled by Kafka 
> Streams|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]
>  sections, and the table of contents is messed up presumably due to an issue 
> with the section headers.
> We should separate these with one section covering (only) configs where 
> Streams sets a different default but this can still be overridden by the 
> user, and the other section covering the configs that Streams hardcodes and 
> users can never override.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)