[ 
https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17423388#comment-17423388
 ] 

Victoria Xia commented on KAFKA-13261:
--------------------------------------

Hi [~xnix], KIP-775 for adding interfaces to support custom partitioners in 
foreign key joins has been accepted and I've opened a PR with an 
implementation. The integration test you shared for demonstrating the feature 
gap was super convenient to work with, and I've also checked it in as part of 
the PR. Please let me know if you'd like to be added as a contributor on the 
PR. Thanks again for raising this gap! :)

> KTable to KTable foreign key join loose events when using several partitions
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-13261
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13261
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.8.0, 2.7.1
>            Reporter: Tomas Forsman
>            Assignee: Victoria Xia
>            Priority: Major
>              Labels: kip
>             Fix For: 3.1.0
>
>         Attachments: KafkaTest.java
>
>
> KIP-775: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-775%3A+Custom+partitioners+in+foreign+key+joins]
>  
> Two incoming streams A and B. 
> Stream A uses a composite key [a, b]
> Stream B has key [b]
> Stream B has 4 partitions and steams A has 1 partition.
> What we try to do is repartition stream A to have 4 partitions too, then put 
> both A and B into KTable and do a foreign key join on from A to B
> When doing this, all messages does not end up in the output topic.
> Repartitioning both to only use 1 partition each solve the problem so it seem 
> like it has something to do with the foreign key join in combination with 
> several partitions. 
> One suspicion would be that it is not possible to define what partitioner to 
> use for the join.
> Any insight or help is greatly appreciated.
> *Example code of the problem*
> {code:java}
> static Topology createTopoology(){
>     var builder = new StreamsBuilder();
>     KTable<String, String> tableB = builder.table("B",  
> stringMaterialized("table.b"));
>     builder
>         .stream("A", Consumed.with(Serde.of(KeyA.class), 
> Serde.of(EventA.class)))
>         .repartition(repartitionTopicA())
>         .toTable(Named.as("table.a"), aMaterialized("table.a"))
>         .join(tableB, EventA::getKeyB, topicAandBeJoiner(), 
> Named.as("join.ab"), joinMaterialized("join.ab"))
>         .toStream()
>         .to("output", with(...));
>     return builder.build();
> }
> private static Materialized<KeyA, EventA> aMaterialized(String name) {
>   Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> table = 
> Materialized.as(name);
>   return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> private static Repartitioned<DriverPeriod, DriverCosts> repartitionTopicA() {
>     Repartitioned<DriverPeriod, DriverCosts> repartitioned = 
> Repartitioned.as("driverperiod");
>     return 
> repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class))
>         .withStreamPartitioner(topicAPartitioner())
>         .withNumberOfPartitions(4);
> }
> private static StreamPartitioner<DriverPeriod, DriverCosts> 
> topicAPartitioner() {
>     return (topic, key, value, numPartitions) -> 
> Math.abs(key.getKeyB().hashCode()) % numPartitions;
> }
> private static Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> 
> joinMaterialized(String name) {
>     Materialized<DriverPeriod, DriverCosts, KeyValueStore<Bytes, byte[]>> 
> table = Materialized.as(name);
>     return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to