[ https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17415690#comment-17415690 ]
Matthias J. Sax edited comment on KAFKA-13261 at 9/15/21, 6:56 PM: ------------------------------------------------------------------- I would prefer to do a KIP and allow user to pass in a custom partitioner. If we really get follow up requests, we could extend the DSL logic to auto-forward an upstream partitioner later. Overall, it seems not that there is no bug in the strong sense, but a missing feature. We never designed FK-join to support custom partitioning. Might be worth to update the docs for 3.0 and earlier to point out this limitation. was (Author: mjsax): I would prefer to do a KIP and allow user to pass in a custom partitioner. If we really get follow up requests, we could extend the DSL logic to auto-forward an upstream partitioner later. > KTable to KTable foreign key join loose events when using several partitions > ---------------------------------------------------------------------------- > > Key: KAFKA-13261 > URL: https://issues.apache.org/jira/browse/KAFKA-13261 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.8.0, 2.7.1 > Reporter: Tomas Forsman > Assignee: Victoria Xia > Priority: Major > Attachments: KafkaTest.java > > > Two incoming streams A and B. > Stream A uses a composite key [a, b] > Stream B has key [b] > Stream B has 4 partitions and steams A has 1 partition. > What we try to do is repartition stream A to have 4 partitions too, then put > both A and B into KTable and do a foreign key join on from A to B > When doing this, all messages does not end up in the output topic. > Repartitioning both to only use 1 partition each solve the problem so it seem > like it has something to do with the foreign key join in combination with > several partitions. > One suspicion would be that it is not possible to define what partitioner to > use for the join. > Any insight or help is greatly appreciated. > *Example code of the problem* > {code:java} > static Topology createTopoology(){ > var builder = new StreamsBuilder(); > KTable<String, String> tableB = builder.table("B", > stringMaterialized("table.b")); > builder > .stream("A", Consumed.with(Serde.of(KeyA.class), > Serde.of(EventA.class))) > .repartition(repartitionTopicA()) > .toTable(Named.as("table.a"), aMaterialized("table.a")) > .join(tableB, EventA::getKeyB, topicAandBeJoiner(), > Named.as("join.ab"), joinMaterialized("join.ab")) > .toStream() > .to("output", with(...)); > return builder.build(); > } > private static Materialized<KeyA, EventA> aMaterialized(String name) { > Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> table = > Materialized.as(name); > return > table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)); > } > private static Repartitioned<DriverPeriod, DriverCosts> repartitionTopicA() { > Repartitioned<DriverPeriod, DriverCosts> repartitioned = > Repartitioned.as("driverperiod"); > return > repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)) > .withStreamPartitioner(topicAPartitioner()) > .withNumberOfPartitions(4); > } > private static StreamPartitioner<DriverPeriod, DriverCosts> > topicAPartitioner() { > return (topic, key, value, numPartitions) -> > Math.abs(key.getKeyB().hashCode()) % numPartitions; > } > private static Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> > joinMaterialized(String name) { > Materialized<DriverPeriod, DriverCosts, KeyValueStore<Bytes, byte[]>> > table = Materialized.as(name); > return > table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)