[ https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17424158#comment-17424158 ]
Tomas Forsman commented on KAFKA-13261: --------------------------------------- [~vcrfxia] thank you so much for solving this issue, it has been very interesting to follow the progress and how the process works. Great job :) I'm glad the integration test could be of use! If it cause no trouble, then it would be an honor to be added as contributor. > KTable to KTable foreign key join loose events when using several partitions > ---------------------------------------------------------------------------- > > Key: KAFKA-13261 > URL: https://issues.apache.org/jira/browse/KAFKA-13261 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.8.0, 2.7.1 > Reporter: Tomas Forsman > Assignee: Victoria Xia > Priority: Major > Labels: kip > Fix For: 3.1.0 > > Attachments: KafkaTest.java > > > KIP-775: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-775%3A+Custom+partitioners+in+foreign+key+joins] > > Two incoming streams A and B. > Stream A uses a composite key [a, b] > Stream B has key [b] > Stream B has 4 partitions and steams A has 1 partition. > What we try to do is repartition stream A to have 4 partitions too, then put > both A and B into KTable and do a foreign key join on from A to B > When doing this, all messages does not end up in the output topic. > Repartitioning both to only use 1 partition each solve the problem so it seem > like it has something to do with the foreign key join in combination with > several partitions. > One suspicion would be that it is not possible to define what partitioner to > use for the join. > Any insight or help is greatly appreciated. > *Example code of the problem* > {code:java} > static Topology createTopoology(){ > var builder = new StreamsBuilder(); > KTable<String, String> tableB = builder.table("B", > stringMaterialized("table.b")); > builder > .stream("A", Consumed.with(Serde.of(KeyA.class), > Serde.of(EventA.class))) > .repartition(repartitionTopicA()) > .toTable(Named.as("table.a"), aMaterialized("table.a")) > .join(tableB, EventA::getKeyB, topicAandBeJoiner(), > Named.as("join.ab"), joinMaterialized("join.ab")) > .toStream() > .to("output", with(...)); > return builder.build(); > } > private static Materialized<KeyA, EventA> aMaterialized(String name) { > Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> table = > Materialized.as(name); > return > table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)); > } > private static Repartitioned<DriverPeriod, DriverCosts> repartitionTopicA() { > Repartitioned<DriverPeriod, DriverCosts> repartitioned = > Repartitioned.as("driverperiod"); > return > repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)) > .withStreamPartitioner(topicAPartitioner()) > .withNumberOfPartitions(4); > } > private static StreamPartitioner<DriverPeriod, DriverCosts> > topicAPartitioner() { > return (topic, key, value, numPartitions) -> > Math.abs(key.getKeyB().hashCode()) % numPartitions; > } > private static Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> > joinMaterialized(String name) { > Materialized<DriverPeriod, DriverCosts, KeyValueStore<Bytes, byte[]>> > table = Materialized.as(name); > return > table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)