[jira] [Updated] (KAFKA-14834) Improved processor semantics for versioned stores

2023-04-10 Thread Victoria Xia (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victoria Xia updated KAFKA-14834:
-
Description: 
With the introduction of versioned state stores in 
[KIP-889|https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores],
 we should leverage them to provide improved join semantics. 

As described in 
[KIP-914|https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+DSL+Processor+Semantics+for+Versioned+Stores],
 we will make the following four improvements:
 * stream-table joins will perform a timestamped lookup (using the stream-side 
record timestamp) if the table is versioned
 * table-table joins, including foreign key joins, will not produce new join 
results on out-of-order records (by key) from versioned tables
 * table filters will disable the existing optimization to not send duplicate 
tombstones when applied to a versioned table
 * table aggregations will ignore out-of-order records when aggregating a 
versioned table

  was:
With the introduction of versioned state stores in 
[KIP-889|https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores],
 we should leverage them to provide improved join semantics. 

As described in 
[KIP-914|https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+Join+Processor+Semantics+for+Versioned+Stores],
 we will make the following two improvements:
 * stream-table joins will perform a timestamped lookup (using the stream-side 
record timestamp) if the table is materialized with a versioned store
 * table-table joins, including foreign key joins, will not produce new join 
results on out-of-order records (by key) from tables materialized with 
versioned stores

Summary: Improved processor semantics for versioned stores  (was: 
Improved stream-table and table-table join semantics for versioned stores)

> Improved processor semantics for versioned stores
> -
>
> Key: KAFKA-14834
> URL: https://issues.apache.org/jira/browse/KAFKA-14834
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Victoria Xia
>Assignee: Victoria Xia
>Priority: Major
>  Labels: kip, streams
>
> With the introduction of versioned state stores in 
> [KIP-889|https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores],
>  we should leverage them to provide improved join semantics. 
> As described in 
> [KIP-914|https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+DSL+Processor+Semantics+for+Versioned+Stores],
>  we will make the following four improvements:
>  * stream-table joins will perform a timestamped lookup (using the 
> stream-side record timestamp) if the table is versioned
>  * table-table joins, including foreign key joins, will not produce new join 
> results on out-of-order records (by key) from versioned tables
>  * table filters will disable the existing optimization to not send duplicate 
> tombstones when applied to a versioned table
>  * table aggregations will ignore out-of-order records when aggregating a 
> versioned table



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14911) Add system tests for rolling upgrade path of KIP-904

2023-04-25 Thread Victoria Xia (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17716134#comment-17716134
 ] 

Victoria Xia commented on KAFKA-14911:
--

I can help pick this up, in light of the upcoming 3.5 code freeze.

> Add system tests for rolling upgrade path of KIP-904
> 
>
> Key: KAFKA-14911
> URL: https://issues.apache.org/jira/browse/KAFKA-14911
> Project: Kafka
>  Issue Type: Test
>Reporter: Farooq Qaiser
>Priority: Major
> Fix For: 3.5.0
>
>
> As per [~mjsax] comment 
> [here|https://github.com/apache/kafka/pull/10747#pullrequestreview-1376539752],
>  we should add a system test to test the rolling upgrade path for 
> [KIP-904|https://cwiki.apache.org/confluence/x/P5VbDg] which introduces a new 
> serialization format for groupBy internal repartition topics and was 
> implemented as part of https://issues.apache.org/jira/browse/KAFKA-12446 
> There is `StreamsUpgradeTest.java` and `streams_upgrade_test.py` (cf 
> `test_rolling_upgrade_with_2_bounces`) as a starting point.
> Might be best to do a similar thing as for FK-joins, and add a new test 
> variation. 
> The tricky thing about the test would be, to ensure that the repartition 
> topic is not empty when we do the bounce, so the test should be setup 
> accordingly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14949) Add Streams upgrade tests from AK 3.4

2023-04-27 Thread Victoria Xia (Jira)
Victoria Xia created KAFKA-14949:


 Summary: Add Streams upgrade tests from AK 3.4
 Key: KAFKA-14949
 URL: https://issues.apache.org/jira/browse/KAFKA-14949
 Project: Kafka
  Issue Type: Task
  Components: streams
Reporter: Victoria Xia


Streams upgrade tests currently only test upgrading from 3.3 and earlier 
versions 
([link|https://github.com/apache/kafka/blob/056657d84d84e116ffc9460872945b4d2b479ff3/tests/kafkatest/tests/streams/streams_application_upgrade_test.py#L30]).
 We should add 3.4 as an "upgrade_from" version into these tests, in light of 
the upcoming 3.5 release.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions

2021-09-14 Thread Victoria Xia (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victoria Xia reassigned KAFKA-13261:


Assignee: Victoria Xia

> KTable to KTable foreign key join loose events when using several partitions
> 
>
> Key: KAFKA-13261
> URL: https://issues.apache.org/jira/browse/KAFKA-13261
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0, 2.7.1
>Reporter: Tomas Forsman
>Assignee: Victoria Xia
>Priority: Major
> Attachments: KafkaTest.java
>
>
> Two incoming streams A and B. 
> Stream A uses a composite key [a, b]
> Stream B has key [b]
> Stream B has 4 partitions and steams A has 1 partition.
> What we try to do is repartition stream A to have 4 partitions too, then put 
> both A and B into KTable and do a foreign key join on from A to B
> When doing this, all messages does not end up in the output topic.
> Repartitioning both to only use 1 partition each solve the problem so it seem 
> like it has something to do with the foreign key join in combination with 
> several partitions. 
> One suspicion would be that it is not possible to define what partitioner to 
> use for the join.
> Any insight or help is greatly appreciated.
> *Example code of the problem*
> {code:java}
> static Topology createTopoology(){
> var builder = new StreamsBuilder();
> KTable tableB = builder.table("B",  
> stringMaterialized("table.b"));
> builder
> .stream("A", Consumed.with(Serde.of(KeyA.class), 
> Serde.of(EventA.class)))
> .repartition(repartitionTopicA())
> .toTable(Named.as("table.a"), aMaterialized("table.a"))
> .join(tableB, EventA::getKeyB, topicAandBeJoiner(), 
> Named.as("join.ab"), joinMaterialized("join.ab"))
> .toStream()
> .to("output", with(...));
> return builder.build();
> }
> private static Materialized aMaterialized(String name) {
>   Materialized> table = 
> Materialized.as(name);
>   return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> private static Repartitioned repartitionTopicA() {
> Repartitioned repartitioned = 
> Repartitioned.as("driverperiod");
> return 
> repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class))
> .withStreamPartitioner(topicAPartitioner())
> .withNumberOfPartitions(4);
> }
> private static StreamPartitioner 
> topicAPartitioner() {
> return (topic, key, value, numPartitions) -> 
> Math.abs(key.getKeyB().hashCode()) % numPartitions;
> }
> private static Materialized> 
> joinMaterialized(String name) {
> Materialized> 
> table = Materialized.as(name);
> return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-13268) Add more integration tests for Table Table FK joins with repartitioning

2021-09-14 Thread Victoria Xia (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victoria Xia reassigned KAFKA-13268:


Assignee: Victoria Xia

> Add more integration tests for Table Table FK joins with repartitioning
> ---
>
> Key: KAFKA-13268
> URL: https://issues.apache.org/jira/browse/KAFKA-13268
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Assignee: Victoria Xia
>Priority: Major
>
> We should add to the FK join multipartition integration test with a 
> Repartitioned for:
> 1) just the new partition count
> 2) a custom partitioner
> This is to test if there's a bug where the internal topics don't pick up a 
> partitioner provided that way.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions

2021-09-14 Thread Victoria Xia (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17415337#comment-17415337
 ] 

Victoria Xia commented on KAFKA-13261:
--

Hey [~vvcephei] [~abellemare] [~guozhang] I had a look at the code and it seems 
to support Adam's theory that the custom partitioners from the repartition() 
step aren't taken into account by the foreign key join. In particular, both the 
subscription sink topic and the response sink topic are created without 
partitioners specified in the StreamSinkNode:

[https://github.com/apache/kafka/blob/75795d1ed8402f185e00b5f3aedcc2bcbb914ca9/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L1051]
 
[https://github.com/apache/kafka/blob/75795d1ed8402f185e00b5f3aedcc2bcbb914ca9/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L1122]

IIUC, this means the default partitioner is used for both topics despite the 
custom partitioners on the source tables, which explains the missing join 
results.

One thing I don't understand: even if we fix this bug by propagating the 
partitioner information from the repartition() step to the foreign key join, 
wouldn't we still have an analogous bug if either of the topics for the source 
tables had custom partitioning logic created from outside Streams (i.e., 
without a repartition() step in the Streams topology)? In this case, Streams 
has no way of determining the partitioning of the source tables, which means we 
need an update to the interface for foreign key joins so that users can specify 
a partitioner to use in order to ensure copartitioning of the subscription and 
response topics with the relevant tables. Is this reasoning sound?

If so, does it make sense to add logic into Streams to propagate information 
about the partitioner from the repartition() step to the foreign key join, or 
would it be better to require users to use the new interface to pass the same 
partitioner from the repartition() step(s) to the foreign key join as well? The 
latter seems more consistent with how copartitioning for joins is typically the 
user's responsibility, and also avoids the need to update Streams with logic 
for tracking partitioners throughout the topology.

> KTable to KTable foreign key join loose events when using several partitions
> 
>
> Key: KAFKA-13261
> URL: https://issues.apache.org/jira/browse/KAFKA-13261
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0, 2.7.1
>Reporter: Tomas Forsman
>Assignee: Victoria Xia
>Priority: Major
> Attachments: KafkaTest.java
>
>
> Two incoming streams A and B. 
> Stream A uses a composite key [a, b]
> Stream B has key [b]
> Stream B has 4 partitions and steams A has 1 partition.
> What we try to do is repartition stream A to have 4 partitions too, then put 
> both A and B into KTable and do a foreign key join on from A to B
> When doing this, all messages does not end up in the output topic.
> Repartitioning both to only use 1 partition each solve the problem so it seem 
> like it has something to do with the foreign key join in combination with 
> several partitions. 
> One suspicion would be that it is not possible to define what partitioner to 
> use for the join.
> Any insight or help is greatly appreciated.
> *Example code of the problem*
> {code:java}
> static Topology createTopoology(){
> var builder = new StreamsBuilder();
> KTable tableB = builder.table("B",  
> stringMaterialized("table.b"));
> builder
> .stream("A", Consumed.with(Serde.of(KeyA.class), 
> Serde.of(EventA.class)))
> .repartition(repartitionTopicA())
> .toTable(Named.as("table.a"), aMaterialized("table.a"))
> .join(tableB, EventA::getKeyB, topicAandBeJoiner(), 
> Named.as("join.ab"), joinMaterialized("join.ab"))
> .toStream()
> .to("output", with(...));
> return builder.build();
> }
> private static Materialized aMaterialized(String name) {
>   Materialized> table = 
> Materialized.as(name);
>   return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> private static Repartitioned repartitionTopicA() {
> Repartitioned repartitioned = 
> Repartitioned.as("driverperiod");
> return 
> repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class))
> .withStreamPartitioner(topicAPartitioner())
> .withNumberOfPartitions(4);
> }
> private static StreamPartitioner 
> topicAPartitioner() {
> return (topic, key, value, numPartitions) -> 
> Math.abs(key.getKeyB().hashCode()) % numPartitions;
> }
> private static Materialized> 
> joinMaterialized(String name) {
> Materialized> 
> table = Materi

[jira] [Commented] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions

2021-09-15 Thread Victoria Xia (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17415792#comment-17415792
 ] 

Victoria Xia commented on KAFKA-13261:
--

Thanks for the confirmation, Matthias and Guozhang! I've opened a small KIP 
with the proposed interface changes to allow users to pass in custom 
partitioners for FK joins: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-775%3A+Custom+partitioners+in+foreign+key+joins]

I tried sending an email to the 
[d...@kafka.apache.org|mailto:d...@kafka.apache.org] mailing list to start 
discussion but either it's taking a while to send or my permissions aren't yet 
set up correctly. Hopefully it will be ready for discussion soon.

> KTable to KTable foreign key join loose events when using several partitions
> 
>
> Key: KAFKA-13261
> URL: https://issues.apache.org/jira/browse/KAFKA-13261
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0, 2.7.1
>Reporter: Tomas Forsman
>Assignee: Victoria Xia
>Priority: Major
> Attachments: KafkaTest.java
>
>
> Two incoming streams A and B. 
> Stream A uses a composite key [a, b]
> Stream B has key [b]
> Stream B has 4 partitions and steams A has 1 partition.
> What we try to do is repartition stream A to have 4 partitions too, then put 
> both A and B into KTable and do a foreign key join on from A to B
> When doing this, all messages does not end up in the output topic.
> Repartitioning both to only use 1 partition each solve the problem so it seem 
> like it has something to do with the foreign key join in combination with 
> several partitions. 
> One suspicion would be that it is not possible to define what partitioner to 
> use for the join.
> Any insight or help is greatly appreciated.
> *Example code of the problem*
> {code:java}
> static Topology createTopoology(){
> var builder = new StreamsBuilder();
> KTable tableB = builder.table("B",  
> stringMaterialized("table.b"));
> builder
> .stream("A", Consumed.with(Serde.of(KeyA.class), 
> Serde.of(EventA.class)))
> .repartition(repartitionTopicA())
> .toTable(Named.as("table.a"), aMaterialized("table.a"))
> .join(tableB, EventA::getKeyB, topicAandBeJoiner(), 
> Named.as("join.ab"), joinMaterialized("join.ab"))
> .toStream()
> .to("output", with(...));
> return builder.build();
> }
> private static Materialized aMaterialized(String name) {
>   Materialized> table = 
> Materialized.as(name);
>   return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> private static Repartitioned repartitionTopicA() {
> Repartitioned repartitioned = 
> Repartitioned.as("driverperiod");
> return 
> repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class))
> .withStreamPartitioner(topicAPartitioner())
> .withNumberOfPartitions(4);
> }
> private static StreamPartitioner 
> topicAPartitioner() {
> return (topic, key, value, numPartitions) -> 
> Math.abs(key.getKeyB().hashCode()) % numPartitions;
> }
> private static Materialized> 
> joinMaterialized(String name) {
> Materialized> 
> table = Materialized.as(name);
> return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions

2021-09-17 Thread Victoria Xia (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17415792#comment-17415792
 ] 

Victoria Xia edited comment on KAFKA-13261 at 9/17/21, 3:12 PM:


Thanks for the confirmation, Matthias and Guozhang! I've opened a small KIP 
with the proposed interface changes to allow users to pass in custom 
partitioners for FK joins: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-775%3A+Custom+partitioners+in+foreign+key+joins]

I tried sending an email to the 
[d...@kafka.apache.org|mailto:d...@kafka.apache.org] mailing list to start 
discussion but either it's taking a while to send or my permissions aren't yet 
set up correctly. Hopefully it will be ready for discussion soon.

UPDATE: Email for KIP discussion has been sent.


was (Author: vcrfxia):
Thanks for the confirmation, Matthias and Guozhang! I've opened a small KIP 
with the proposed interface changes to allow users to pass in custom 
partitioners for FK joins: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-775%3A+Custom+partitioners+in+foreign+key+joins]

I tried sending an email to the 
[d...@kafka.apache.org|mailto:d...@kafka.apache.org] mailing list to start 
discussion but either it's taking a while to send or my permissions aren't yet 
set up correctly. Hopefully it will be ready for discussion soon.

> KTable to KTable foreign key join loose events when using several partitions
> 
>
> Key: KAFKA-13261
> URL: https://issues.apache.org/jira/browse/KAFKA-13261
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0, 2.7.1
>Reporter: Tomas Forsman
>Assignee: Victoria Xia
>Priority: Major
>  Labels: kip
> Attachments: KafkaTest.java
>
>
> KIP-775: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-775%3A+Custom+partitioners+in+foreign+key+joins]
>  
> Two incoming streams A and B. 
> Stream A uses a composite key [a, b]
> Stream B has key [b]
> Stream B has 4 partitions and steams A has 1 partition.
> What we try to do is repartition stream A to have 4 partitions too, then put 
> both A and B into KTable and do a foreign key join on from A to B
> When doing this, all messages does not end up in the output topic.
> Repartitioning both to only use 1 partition each solve the problem so it seem 
> like it has something to do with the foreign key join in combination with 
> several partitions. 
> One suspicion would be that it is not possible to define what partitioner to 
> use for the join.
> Any insight or help is greatly appreciated.
> *Example code of the problem*
> {code:java}
> static Topology createTopoology(){
> var builder = new StreamsBuilder();
> KTable tableB = builder.table("B",  
> stringMaterialized("table.b"));
> builder
> .stream("A", Consumed.with(Serde.of(KeyA.class), 
> Serde.of(EventA.class)))
> .repartition(repartitionTopicA())
> .toTable(Named.as("table.a"), aMaterialized("table.a"))
> .join(tableB, EventA::getKeyB, topicAandBeJoiner(), 
> Named.as("join.ab"), joinMaterialized("join.ab"))
> .toStream()
> .to("output", with(...));
> return builder.build();
> }
> private static Materialized aMaterialized(String name) {
>   Materialized> table = 
> Materialized.as(name);
>   return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> private static Repartitioned repartitionTopicA() {
> Repartitioned repartitioned = 
> Repartitioned.as("driverperiod");
> return 
> repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class))
> .withStreamPartitioner(topicAPartitioner())
> .withNumberOfPartitions(4);
> }
> private static StreamPartitioner 
> topicAPartitioner() {
> return (topic, key, value, numPartitions) -> 
> Math.abs(key.getKeyB().hashCode()) % numPartitions;
> }
> private static Materialized> 
> joinMaterialized(String name) {
> Materialized> 
> table = Materialized.as(name);
> return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions

2021-09-17 Thread Victoria Xia (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17416776#comment-17416776
 ] 

Victoria Xia commented on KAFKA-13261:
--

Hey [~mjsax] [~guozhang], after opening the KIP above I noticed a slight 
"problem" with the interfaces for users to pass custom partitioners. Can you 
double-check my reasoning below?

The "problem" with the interfaces in the KIP is that interfaces currently ask 
users to provide {{StreamPartitioner thisPartitioner }}(representing the 
partitioning strategy for the right/foreign-key table) – as these are the 
partitioners that users should know – but the FK join implementation actually 
needs a partitioner of the form {{StreamPartitioner>}} instead of {{otherPartitioner}}, and a partitioner of 
the form {{StreamPartitioner>}} instead of 
{{thisPartitioner}}, based on the actual messages being passed in the 
subscription and response topics.

It doesn't make sense to ask users to pass these other partitioners since 
{{SubscriptionWrapper}} is an internal implementation detail. What we really 
want is to require that any custom partitioners select a partition based only 
on the message key, without taking the message value into consideration. I 
think this is a reasonable requirement for the right/foreign-key table since 
the subscription store mechanism (in the FK join implementation) won't work at 
all unless all messages with the same (foreign) key are always sent to the same 
partition. We don't technically need to require this for the left table since, 
if we wanted to, we could send the original value along with the subscription 
in order to ensure that the response is routed back to the correct partition, 
but this seems unnecessarily complicated and also bloats the size of the 
subscription store and topic.

As such, I think we should require that any custom partitioner used for tables 
must partition messages based only on message key (and not value), in order to 
be used in FK joins. Do you agree?

If so, we could make this requirement explicit in the FK join interfaces by 
asking users for {{StreamPartitioner thisPartitioner }}and{{ 
}}{{StreamPartitioner otherPartitioner }}instead of the original 
partitioners. Or we could keep the interfaces as is ({{StreamPartitioner 
thisPartitioner }}and{{ }}{{StreamPartitioner otherPartitioner}}), 
since users probably already have these handy anyway, and leave a note in the 
javadocs to explain the requirement. I have a preference for the latter but am 
curious what you think. Thanks!{{}}

> KTable to KTable foreign key join loose events when using several partitions
> 
>
> Key: KAFKA-13261
> URL: https://issues.apache.org/jira/browse/KAFKA-13261
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0, 2.7.1
>Reporter: Tomas Forsman
>Assignee: Victoria Xia
>Priority: Major
>  Labels: kip
> Attachments: KafkaTest.java
>
>
> KIP-775: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-775%3A+Custom+partitioners+in+foreign+key+joins]
>  
> Two incoming streams A and B. 
> Stream A uses a composite key [a, b]
> Stream B has key [b]
> Stream B has 4 partitions and steams A has 1 partition.
> What we try to do is repartition stream A to have 4 partitions too, then put 
> both A and B into KTable and do a foreign key join on from A to B
> When doing this, all messages does not end up in the output topic.
> Repartitioning both to only use 1 partition each solve the problem so it seem 
> like it has something to do with the foreign key join in combination with 
> several partitions. 
> One suspicion would be that it is not possible to define what partitioner to 
> use for the join.
> Any insight or help is greatly appreciated.
> *Example code of the problem*
> {code:java}
> static Topology createTopoology(){
> var builder = new StreamsBuilder();
> KTable tableB = builder.table("B",  
> stringMaterialized("table.b"));
> builder
> .stream("A", Consumed.with(Serde.of(KeyA.class), 
> Serde.of(EventA.class)))
> .repartition(repartitionTopicA())
> .toTable(Named.as("table.a"), aMaterialized("table.a"))
> .join(tableB, EventA::getKeyB, topicAandBeJoiner(), 
> Named.as("join.ab"), joinMaterialized("join.ab"))
> .toStream()
> .to("output", with(...));
> return builder.build();
> }
> private static Materialized aMaterialized(String name) {
>   Materialized> table = 
> Materialized.as(name);
>   return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> private static Repartitioned repartitionTopicA() {
> Repartitioned repartitioned = 
> Repartitioned.as("driverperiod");
> return 
> repartit

[jira] [Comment Edited] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions

2021-09-17 Thread Victoria Xia (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17416776#comment-17416776
 ] 

Victoria Xia edited comment on KAFKA-13261 at 9/17/21, 11:46 PM:
-

Hey [~mjsax] [~guozhang], after opening the KIP above I noticed a slight 
"problem" with the interfaces for users to pass custom partitioners. Can you 
double-check my reasoning below?

The "problem" with the interfaces in the KIP is that interfaces currently ask 
users to provide {{StreamPartitioner thisPartitioner }}(representing the 
partitioning strategy for the right/foreign-key table) – as these are the 
partitioners that users should know – but the FK join implementation actually 
needs a partitioner of the form {{StreamPartitioner>}} instead of {{otherPartitioner}}, and a partitioner of 
the form {{StreamPartitioner>}} instead of 
{{thisPartitioner}}, based on the actual messages being passed in the 
subscription and response topics.

It doesn't make sense to ask users to pass these other partitioners since 
{{SubscriptionWrapper}} and {{SubscriptionResponseWrapper}} are internal 
implementation details. What we really want is to require that any custom 
partitioners select a partition based only on the message key, without taking 
the message value into consideration. I think this is a reasonable requirement 
for the right/foreign-key table since the subscription store mechanism (in the 
FK join implementation) won't work at all unless all messages with the same 
(foreign) key are always sent to the same partition. We don't technically need 
to require this for the left table since, if we wanted to, we could send the 
original value along with the subscription in order to ensure that the response 
is routed back to the correct partition, but this seems unnecessarily 
complicated and also bloats the size of the subscription store and topic.

As such, I think we should require that any custom partitioner used for tables 
must partition messages based only on message key (and not value), in order to 
be used in FK joins. Do you agree?

If so, we could make this requirement explicit in the FK join interfaces by 
asking users for {{StreamPartitioner thisPartitioner }}and{{ 
}}{{StreamPartitioner otherPartitioner }}instead of the original 
partitioners. Or we could keep the interfaces as is ({{StreamPartitioner 
thisPartitioner and }}{{StreamPartitioner otherPartitioner}}), since 
users probably already have these handy anyway, and leave a note in the 
javadocs to explain the requirement. I have a preference for the latter but am 
curious what you think. Thanks!


was (Author: vcrfxia):
Hey [~mjsax] [~guozhang], after opening the KIP above I noticed a slight 
"problem" with the interfaces for users to pass custom partitioners. Can you 
double-check my reasoning below?

The "problem" with the interfaces in the KIP is that interfaces currently ask 
users to provide {{StreamPartitioner thisPartitioner }}(representing the 
partitioning strategy for the right/foreign-key table) – as these are the 
partitioners that users should know – but the FK join implementation actually 
needs a partitioner of the form {{StreamPartitioner>}} instead of {{otherPartitioner}}, and a partitioner of 
the form {{StreamPartitioner>}} instead of 
{{thisPartitioner}}, based on the actual messages being passed in the 
subscription and response topics.

It doesn't make sense to ask users to pass these other partitioners since 
{{SubscriptionWrapper}} is an internal implementation detail. What we really 
want is to require that any custom partitioners select a partition based only 
on the message key, without taking the message value into consideration. I 
think this is a reasonable requirement for the right/foreign-key table since 
the subscription store mechanism (in the FK join implementation) won't work at 
all unless all messages with the same (foreign) key are always sent to the same 
partition. We don't technically need to require this for the left table since, 
if we wanted to, we could send the original value along with the subscription 
in order to ensure that the response is routed back to the correct partition, 
but this seems unnecessarily complicated and also bloats the size of the 
subscription store and topic.

As such, I think we should require that any custom partitioner used for tables 
must partition messages based only on message key (and not value), in order to 
be used in FK joins. Do you agree?

If so, we could make this requirement explicit in the FK join interfaces by 
asking users for {{StreamPartitioner thisPartitioner }}and{{ 
}}{{StreamPartitioner otherPartitioner }}instead of the original 
partitioners. Or we could keep the interfaces as is ({{StreamPartitioner 
thisPartitioner }}and{{ }}{{StreamPartitioner otherPartitioner}}), 
since users probably already have these handy anyway, and leave a note 

[jira] [Commented] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions

2021-10-01 Thread Victoria Xia (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17423388#comment-17423388
 ] 

Victoria Xia commented on KAFKA-13261:
--

Hi [~xnix], KIP-775 for adding interfaces to support custom partitioners in 
foreign key joins has been accepted and I've opened a PR with an 
implementation. The integration test you shared for demonstrating the feature 
gap was super convenient to work with, and I've also checked it in as part of 
the PR. Please let me know if you'd like to be added as a contributor on the 
PR. Thanks again for raising this gap! :)

> KTable to KTable foreign key join loose events when using several partitions
> 
>
> Key: KAFKA-13261
> URL: https://issues.apache.org/jira/browse/KAFKA-13261
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0, 2.7.1
>Reporter: Tomas Forsman
>Assignee: Victoria Xia
>Priority: Major
>  Labels: kip
> Fix For: 3.1.0
>
> Attachments: KafkaTest.java
>
>
> KIP-775: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-775%3A+Custom+partitioners+in+foreign+key+joins]
>  
> Two incoming streams A and B. 
> Stream A uses a composite key [a, b]
> Stream B has key [b]
> Stream B has 4 partitions and steams A has 1 partition.
> What we try to do is repartition stream A to have 4 partitions too, then put 
> both A and B into KTable and do a foreign key join on from A to B
> When doing this, all messages does not end up in the output topic.
> Repartitioning both to only use 1 partition each solve the problem so it seem 
> like it has something to do with the foreign key join in combination with 
> several partitions. 
> One suspicion would be that it is not possible to define what partitioner to 
> use for the join.
> Any insight or help is greatly appreciated.
> *Example code of the problem*
> {code:java}
> static Topology createTopoology(){
> var builder = new StreamsBuilder();
> KTable tableB = builder.table("B",  
> stringMaterialized("table.b"));
> builder
> .stream("A", Consumed.with(Serde.of(KeyA.class), 
> Serde.of(EventA.class)))
> .repartition(repartitionTopicA())
> .toTable(Named.as("table.a"), aMaterialized("table.a"))
> .join(tableB, EventA::getKeyB, topicAandBeJoiner(), 
> Named.as("join.ab"), joinMaterialized("join.ab"))
> .toStream()
> .to("output", with(...));
> return builder.build();
> }
> private static Materialized aMaterialized(String name) {
>   Materialized> table = 
> Materialized.as(name);
>   return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> private static Repartitioned repartitionTopicA() {
> Repartitioned repartitioned = 
> Repartitioned.as("driverperiod");
> return 
> repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class))
> .withStreamPartitioner(topicAPartitioner())
> .withNumberOfPartitions(4);
> }
> private static StreamPartitioner 
> topicAPartitioner() {
> return (topic, key, value, numPartitions) -> 
> Math.abs(key.getKeyB().hashCode()) % numPartitions;
> }
> private static Materialized> 
> joinMaterialized(String name) {
> Materialized> 
> table = Materialized.as(name);
> return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13268) Add more integration tests for Table Table FK joins with repartitioning

2021-10-01 Thread Victoria Xia (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17423390#comment-17423390
 ] 

Victoria Xia commented on KAFKA-13268:
--

Hey [~guozhang] we've already got 
KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java for testing foreign 
key joins with differing number of partitions on the source tables, and 
[https://github.com/apache/kafka/pull/11368] is adding coverage for source 
tables with custom partitioners. Is this the extent of the test coverage you 
had in mind with this ticket, or were you imagining something else? Thanks! 

> Add more integration tests for Table Table FK joins with repartitioning
> ---
>
> Key: KAFKA-13268
> URL: https://issues.apache.org/jira/browse/KAFKA-13268
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Assignee: Victoria Xia
>Priority: Major
>
> We should add to the FK join multipartition integration test with a 
> Repartitioned for:
> 1) just the new partition count
> 2) a custom partitioner
> This is to test if there's a bug where the internal topics don't pick up a 
> partitioner provided that way.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13261) KTable to KTable foreign key join loose events when using several partitions

2021-10-08 Thread Victoria Xia (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17426245#comment-17426245
 ] 

Victoria Xia commented on KAFKA-13261:
--

Hi [~xnix] , of course! It's no trouble at all. Could you please share an email 
address to include as a coauthor on the commit when the PR is merged? 
[https://docs.github.com/en/github/committing-changes-to-your-project/creating-and-editing-commits/creating-a-commit-with-multiple-authors#required-co-author-information]
 

> KTable to KTable foreign key join loose events when using several partitions
> 
>
> Key: KAFKA-13261
> URL: https://issues.apache.org/jira/browse/KAFKA-13261
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0, 2.7.1
>Reporter: Tomas Forsman
>Assignee: Victoria Xia
>Priority: Major
>  Labels: kip
> Fix For: 3.1.0
>
> Attachments: KafkaTest.java
>
>
> KIP-775: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-775%3A+Custom+partitioners+in+foreign+key+joins]
>  
> Two incoming streams A and B. 
> Stream A uses a composite key [a, b]
> Stream B has key [b]
> Stream B has 4 partitions and steams A has 1 partition.
> What we try to do is repartition stream A to have 4 partitions too, then put 
> both A and B into KTable and do a foreign key join on from A to B
> When doing this, all messages does not end up in the output topic.
> Repartitioning both to only use 1 partition each solve the problem so it seem 
> like it has something to do with the foreign key join in combination with 
> several partitions. 
> One suspicion would be that it is not possible to define what partitioner to 
> use for the join.
> Any insight or help is greatly appreciated.
> *Example code of the problem*
> {code:java}
> static Topology createTopoology(){
> var builder = new StreamsBuilder();
> KTable tableB = builder.table("B",  
> stringMaterialized("table.b"));
> builder
> .stream("A", Consumed.with(Serde.of(KeyA.class), 
> Serde.of(EventA.class)))
> .repartition(repartitionTopicA())
> .toTable(Named.as("table.a"), aMaterialized("table.a"))
> .join(tableB, EventA::getKeyB, topicAandBeJoiner(), 
> Named.as("join.ab"), joinMaterialized("join.ab"))
> .toStream()
> .to("output", with(...));
> return builder.build();
> }
> private static Materialized aMaterialized(String name) {
>   Materialized> table = 
> Materialized.as(name);
>   return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> private static Repartitioned repartitionTopicA() {
> Repartitioned repartitioned = 
> Repartitioned.as("driverperiod");
> return 
> repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class))
> .withStreamPartitioner(topicAPartitioner())
> .withNumberOfPartitions(4);
> }
> private static StreamPartitioner 
> topicAPartitioner() {
> return (topic, key, value, numPartitions) -> 
> Math.abs(key.getKeyB().hashCode()) % numPartitions;
> }
> private static Materialized> 
> joinMaterialized(String name) {
> Materialized> 
> table = Materialized.as(name);
> return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13373) ValueTransformerWithKeySupplier doesn't work with store()

2021-10-19 Thread Victoria Xia (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victoria Xia updated KAFKA-13373:
-
Labels: newbie  (was: )

> ValueTransformerWithKeySupplier doesn't work with store()
> -
>
> Key: KAFKA-13373
> URL: https://issues.apache.org/jira/browse/KAFKA-13373
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Anatoly Tsyganenko
>Priority: Minor
>  Labels: newbie
>
> I'm trying to utilize stores() method in ValueTransformerWithKeySupplier like 
> this:
>  
> {code:java}
> public final class CustomSupplier implements 
> ValueTransformerWithKeySupplier, JsonNode, JsonNode> {
> private final String storeName = "my-store";
> public Set> stores() {
> final Deserializer jsonDeserializer = new 
> JsonDeserializer();
> final Serializer jsonSerializer = new JsonSerializer();
> final Serde jsonSerde = Serdes.serdeFrom(jsonSerializer, 
> jsonDeserializer);
> final Serde stringSerde = Serdes.String();
> final StoreBuilder> store 
> = 
> Stores.timestampedKeyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName),
> stringSerde, jsonSerde).withLoggingDisabled();
> return Collections.singleton(store);
> }
> @Override
> public ValueTransformerWithKey, JsonNode, JsonNode> 
> get() {
> return new ValueTransformerWithKey, JsonNode, 
> JsonNode>() {
> private ProcessorContext context;
> private TimestampedKeyValueStore store;
> @Override
> public void init(final ProcessorContext context) {
> this.store = context.getStateStore(storeName);
> this.context = context;
> }
> //
> }{code}
>  
> But got next error for line "this.store = context.getStateStore(storeName);" 
> in init():
> {code:java}
> Caused by: org.apache.kafka.streams.errors.StreamsException: Processor 
> KTABLE-TRANSFORMVALUES-08 has no access to StateStore my-store as the 
> store is not connected to the processor. If you add stores manually via 
> '.addStateStore()' make sure to connect the added store to the processor by 
> providing the processor name to '.addStateStore()' or connect them via 
> '.connectProcessorAndStateStores()'. DSL users need to provide the store name 
> to '.process()', '.transform()', or '.transformValues()' to connect the store 
> to the corresponding operator, or they can provide a StoreBuilder by 
> implementing the stores() method on the Supplier itself. If you do not add 
> stores manually, please file a bug report at 
> https://issues.apache.org/jira/projects/KAFKA.{code}
>  
> The same code works perfect with Transform or when I adding store to builder. 
> Looks like something wrong when ConnectedStoreProvider and 
> ValueTransformerWithKeySupplier used together.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-14723) Do not write expired store records to changelog

2023-02-15 Thread Victoria Xia (Jira)
Victoria Xia created KAFKA-14723:


 Summary: Do not write expired store records to changelog
 Key: KAFKA-14723
 URL: https://issues.apache.org/jira/browse/KAFKA-14723
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Victoria Xia


Window stores and versioned stores both have concepts of "retention" and 
"expiration." Records which are expired are not written to the store, e.g., 
[this 
example|https://github.com/apache/kafka/blob/b9754747d6eaa029c4bb69b073d749ff8df15908/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java#L265-L266]
 for segments stores. However, these expired records are still written to the 
changelog topic, in the case of persistent stores. This does not cause any 
problems because the records are once again omitted from the store during 
restore, but it is inefficient. It'd be good to avoid writing expired records 
to the changelog topic in the first place. Another benefit is that doing so 
would allow us to simplify the restoration code for versioned stores (see 
[relevant 
discussion|https://github.com/apache/kafka/pull/13243#discussion_r1106568364]). 

The reason expired records are still written to the changelog topic is because 
the whether records are expired or not is only tracked at the innermost store 
layer, and not any of the outer store layers such as the changelogging layer. 
The innermost store layer keeps its own `observedStreamTime` which is advanced 
on calls to put() and during restoration, and uses this variable to determine 
when a record is expired. Because the return type from put() is void, the 
changelogging layer has no way to tell whether the inner store's put() actually 
put the record or dropped it as expired, and always writes to the changelog 
topic regardless.

In order to avoid this, we could:
 * update the put() interface to return a boolean indicating whether the record 
was actually put or not, or
 * move the logic for determining when a record is expired into an outer store 
layer, or
 * reorder/restructure the wrapped store layers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14723) Do not write expired store records to changelog

2023-02-15 Thread Victoria Xia (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victoria Xia updated KAFKA-14723:
-
Description: 
Window stores and versioned stores both have concepts of "retention" and 
"expiration." Records which are expired are not written to the store, e.g., 
[this 
example|https://github.com/apache/kafka/blob/b9754747d6eaa029c4bb69b073d749ff8df15908/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java#L265-L266]
 for segments stores. However, these expired records are still written to the 
changelog topic, in the case of persistent stores. This does not cause any 
problems because the records are once again omitted from the store during 
restore, but it is inefficient. It'd be good to avoid writing expired records 
to the changelog topic in the first place. Another benefit is that doing so 
would allow us to simplify the restoration code for versioned stores (see 
[relevant 
discussion|https://github.com/apache/kafka/pull/13243#discussion_r1106568364]). 

The reason expired records are still written to the changelog topic is because 
whether the records are expired or not is only tracked at the innermost store 
layer, and not any of the outer store layers such as the changelogging layer. 
The innermost store layer keeps its own `observedStreamTime` which is advanced 
on calls to put() and during restoration, and uses this variable to determine 
when a record is expired. Because the return type from put() is void, the 
changelogging layer has no way to tell whether the inner store's put() actually 
put the record or dropped it as expired, and always writes to the changelog 
topic regardless.

In order to avoid this, we could:
 * update the put() interface to return a boolean indicating whether the record 
was actually put or not, or
 * move the logic for determining when a record is expired into an outer store 
layer, or
 * reorder/restructure the wrapped store layers.

  was:
Window stores and versioned stores both have concepts of "retention" and 
"expiration." Records which are expired are not written to the store, e.g., 
[this 
example|https://github.com/apache/kafka/blob/b9754747d6eaa029c4bb69b073d749ff8df15908/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java#L265-L266]
 for segments stores. However, these expired records are still written to the 
changelog topic, in the case of persistent stores. This does not cause any 
problems because the records are once again omitted from the store during 
restore, but it is inefficient. It'd be good to avoid writing expired records 
to the changelog topic in the first place. Another benefit is that doing so 
would allow us to simplify the restoration code for versioned stores (see 
[relevant 
discussion|https://github.com/apache/kafka/pull/13243#discussion_r1106568364]). 

The reason expired records are still written to the changelog topic is because 
the whether records are expired or not is only tracked at the innermost store 
layer, and not any of the outer store layers such as the changelogging layer. 
The innermost store layer keeps its own `observedStreamTime` which is advanced 
on calls to put() and during restoration, and uses this variable to determine 
when a record is expired. Because the return type from put() is void, the 
changelogging layer has no way to tell whether the inner store's put() actually 
put the record or dropped it as expired, and always writes to the changelog 
topic regardless.

In order to avoid this, we could:
 * update the put() interface to return a boolean indicating whether the record 
was actually put or not, or
 * move the logic for determining when a record is expired into an outer store 
layer, or
 * reorder/restructure the wrapped store layers.


> Do not write expired store records to changelog
> ---
>
> Key: KAFKA-14723
> URL: https://issues.apache.org/jira/browse/KAFKA-14723
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Victoria Xia
>Priority: Major
>
> Window stores and versioned stores both have concepts of "retention" and 
> "expiration." Records which are expired are not written to the store, e.g., 
> [this 
> example|https://github.com/apache/kafka/blob/b9754747d6eaa029c4bb69b073d749ff8df15908/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java#L265-L266]
>  for segments stores. However, these expired records are still written to the 
> changelog topic, in the case of persistent stores. This does not cause any 
> problems because the records are once again omitted from the store during 
> restore, but it is inefficient. It'd be good to avoid writing expired records 
> to the changelog topic in the 

[jira] [Created] (KAFKA-14834) Improved stream-table and table-table join semantics for versioned stores

2023-03-22 Thread Victoria Xia (Jira)
Victoria Xia created KAFKA-14834:


 Summary: Improved stream-table and table-table join semantics for 
versioned stores
 Key: KAFKA-14834
 URL: https://issues.apache.org/jira/browse/KAFKA-14834
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Victoria Xia
Assignee: Victoria Xia


With the introduction of versioned state stores in 
[KIP-889|https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores],
 we should leverage them to provide improved join semantics. 

As described in 
[KIP-914|https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+Join+Processor+Semantics+for+Versioned+Stores],
 we will make the following two improvements:
 * stream-table joins will perform a timestamped lookup (using the stream-side 
record timestamp) if the table is materialized with a versioned store
 * table-table joins, including foreign key joins, will not produce new join 
results on out-of-order records (by key) from tables materialized with 
versioned stores



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14864) Memory leak in KStreamWindowAggregate with ON_WINDOW_CLOSE emit strategy

2023-03-28 Thread Victoria Xia (Jira)
Victoria Xia created KAFKA-14864:


 Summary: Memory leak in KStreamWindowAggregate with 
ON_WINDOW_CLOSE emit strategy
 Key: KAFKA-14864
 URL: https://issues.apache.org/jira/browse/KAFKA-14864
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Victoria Xia
Assignee: Victoria Xia
 Fix For: 3.5.0


The Streams DSL processor implementation for the ON_WINDOW_CLOSE emit strategy 
during KStream windowed aggregations opens a key-value iterator but does not 
call `close()` on it 
([link|https://github.com/apache/kafka/blob/5afedd9ac37c4d740f47867cfd31eaed15dc542f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java#L203]),
 despite the Javadocs for the iterator making clear that users must do so in 
order to release resources 
([link|https://github.com/apache/kafka/blob/5afedd9ac37c4d740f47867cfd31eaed15dc542f/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java#L27]).
  

I discovered this bug while running load testing benchmarks and noticed that 
some runs were sporadically hitting OOMs, so it is definitely possible to hit 
this in practice.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14491) Introduce Versioned Key-Value Stores to Kafka Streams

2022-12-14 Thread Victoria Xia (Jira)
Victoria Xia created KAFKA-14491:


 Summary: Introduce Versioned Key-Value Stores to Kafka Streams
 Key: KAFKA-14491
 URL: https://issues.apache.org/jira/browse/KAFKA-14491
 Project: Kafka
  Issue Type: Improvement
Reporter: Victoria Xia
Assignee: Victoria Xia


The key-value state stores used by Kafka Streams today maintain only the latest 
value associated with each key. In order to support applications which require 
access to older record versions, Kafka Streams should have versioned state 
stores. Versioned state stores are similar to key-value stores except they can 
store multiple record versions for a single key. An example use case for 
versioned key-value stores is in providing proper temporal join semantics for 
stream-tables joins with regards to out-of-order data.

See KIP for more: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-12366) Performance regression in stream-table joins on trunk

2021-02-23 Thread Victoria Xia (Jira)
Victoria Xia created KAFKA-12366:


 Summary: Performance regression in stream-table joins on trunk
 Key: KAFKA-12366
 URL: https://issues.apache.org/jira/browse/KAFKA-12366
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Victoria Xia


Stream-table join benchmarks have revealed a significant performance regression 
on trunk as compared to the latest release version. We should investigate as a 
blocker prior to the 2.8 release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10008) Symbol not found when running Kafka Streams with RocksDB dependency on MacOS 10.13.6

2020-05-15 Thread Victoria Xia (Jira)
Victoria Xia created KAFKA-10008:


 Summary: Symbol not found when running Kafka Streams with RocksDB 
dependency on MacOS 10.13.6
 Key: KAFKA-10008
 URL: https://issues.apache.org/jira/browse/KAFKA-10008
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.6.0
 Environment: MacOS 10.13.6
Reporter: Victoria Xia


In bumping the RocksDB dependency version from 5.18.3 to 5.18.4 on trunk, Kafka 
Streams apps that require initializing RocksDB state stores fail on MacOS 
10.13.6 with
{code:java}
dyld: lazy symbol binding failed: Symbol not found: chkstk_darwin
  Referenced from: 
/private/var/folders/y4/v3q4tgb559sb0x6kwpll19bmgn/T/librocksdbjni4028367213086899694.jnilib
 (which was built for Mac OS X 10.15)
  Expected in: /usr/lib/libSystem.B.dylib
{code}
as a result of [https://github.com/facebook/rocksdb/issues/6852]

2.5.0 is unaffected.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10008) Symbol not found when running Kafka Streams with RocksDB dependency on MacOS 10.13.6

2020-05-15 Thread Victoria Xia (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17108848#comment-17108848
 ] 

Victoria Xia commented on KAFKA-10008:
--

Ah, thanks for the context. Sounds like the decision then is to knowingly break 
compatibility with MacOS 10.13.6 in Kafka Streams 2.6? Definitely a bummer :( 

> Symbol not found when running Kafka Streams with RocksDB dependency on MacOS 
> 10.13.6
> 
>
> Key: KAFKA-10008
> URL: https://issues.apache.org/jira/browse/KAFKA-10008
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
> Environment: MacOS 10.13.6
>Reporter: Victoria Xia
>Priority: Major
>
> In bumping the RocksDB dependency version from 5.18.3 to 5.18.4 on trunk, 
> Kafka Streams apps that require initializing RocksDB state stores fail on 
> MacOS 10.13.6 with
> {code:java}
> dyld: lazy symbol binding failed: Symbol not found: chkstk_darwin
>   Referenced from: 
> /private/var/folders/y4/v3q4tgb559sb0x6kwpll19bmgn/T/librocksdbjni4028367213086899694.jnilib
>  (which was built for Mac OS X 10.15)
>   Expected in: /usr/lib/libSystem.B.dylib
> {code}
> as a result of [https://github.com/facebook/rocksdb/issues/6852]
> 2.5.0 is unaffected.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)