[ https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17417203#comment-17417203 ]
Matthias J. Sax commented on KAFKA-13261: ----------------------------------------- Let keep the discussion on the mailing list. – But I agree to the problem you describe. The FK-join can only work, if both tables are only partitioned using their respective key. I don't think it will become a problem in practice, but it's worth to point out in the JavaDocs. Furthermore, this problem seems to be very fundamental and I don't think we could solve it. As you pointed out, we don't have access to the value of the foreign key and thus could not pass it into the partitioner. I also agree that we should not send the full left-hand value to the right side just for this purpose. As a matter of fact, in the original FK-KIP, there was a discussion about sending the left value and computing the join result on the right side to avoid the response topics – this idea was rejected for various reasons. After I read the first paragraphs, my thought was also "let's make the value-type {{Void}}", so I am happy that you propose the exact same thing! I am on-board with it! – I'll reply to the mailing list, too. > KTable to KTable foreign key join loose events when using several partitions > ---------------------------------------------------------------------------- > > Key: KAFKA-13261 > URL: https://issues.apache.org/jira/browse/KAFKA-13261 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.8.0, 2.7.1 > Reporter: Tomas Forsman > Assignee: Victoria Xia > Priority: Major > Labels: kip > Attachments: KafkaTest.java > > > KIP-775: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-775%3A+Custom+partitioners+in+foreign+key+joins] > > Two incoming streams A and B. > Stream A uses a composite key [a, b] > Stream B has key [b] > Stream B has 4 partitions and steams A has 1 partition. > What we try to do is repartition stream A to have 4 partitions too, then put > both A and B into KTable and do a foreign key join on from A to B > When doing this, all messages does not end up in the output topic. > Repartitioning both to only use 1 partition each solve the problem so it seem > like it has something to do with the foreign key join in combination with > several partitions. > One suspicion would be that it is not possible to define what partitioner to > use for the join. > Any insight or help is greatly appreciated. > *Example code of the problem* > {code:java} > static Topology createTopoology(){ > var builder = new StreamsBuilder(); > KTable<String, String> tableB = builder.table("B", > stringMaterialized("table.b")); > builder > .stream("A", Consumed.with(Serde.of(KeyA.class), > Serde.of(EventA.class))) > .repartition(repartitionTopicA()) > .toTable(Named.as("table.a"), aMaterialized("table.a")) > .join(tableB, EventA::getKeyB, topicAandBeJoiner(), > Named.as("join.ab"), joinMaterialized("join.ab")) > .toStream() > .to("output", with(...)); > return builder.build(); > } > private static Materialized<KeyA, EventA> aMaterialized(String name) { > Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> table = > Materialized.as(name); > return > table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)); > } > private static Repartitioned<DriverPeriod, DriverCosts> repartitionTopicA() { > Repartitioned<DriverPeriod, DriverCosts> repartitioned = > Repartitioned.as("driverperiod"); > return > repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)) > .withStreamPartitioner(topicAPartitioner()) > .withNumberOfPartitions(4); > } > private static StreamPartitioner<DriverPeriod, DriverCosts> > topicAPartitioner() { > return (topic, key, value, numPartitions) -> > Math.abs(key.getKeyB().hashCode()) % numPartitions; > } > private static Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> > joinMaterialized(String name) { > Materialized<DriverPeriod, DriverCosts, KeyValueStore<Bytes, byte[]>> > table = Materialized.as(name); > return > table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)