[ https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17408786#comment-17408786 ]
Tomas Forsman edited comment on KAFKA-13261 at 9/2/21, 12:50 PM: ----------------------------------------------------------------- Hi [~guozhang] and [~abellemare], thank you for your answers. In the scenario we have the output topic is compacted on the same key as Topic A, so if intermediate events would not come - it would be fine. We want the final result. What we're seeing though is combinations missing completely. We've read out all events from the A,B and output topics but also the internal topics created by the join. Expected is that all would have 66 ids. As said, running with same data with one partition create a perfect match where all events has passed through. {noformat} #Using 4 partitions A : ids: 66 B : ids: 66 table.b-changelog : ids: 66 table.a-changelog : ids: 66 join.ab-changelog : ids: 20 output : ids: 20{noformat} We have put up several junit test cases with the TopologyTestDriver but are unable to reproduce it. In a local docker environment we can reproduce the above scenario every time when using 4 partitions and the problem goes away when using 1 partition. Below is when reading from the different topics directly for a specific id "ID123" Columns are 'offset, timestamp, partition | [key]value' Using 4 partitions {noformat} table.a-changelog 5 1612435945196 0 | [ID123, 202101] A01 15 1614863137136 0 | [ID123, 202102] A02 25 1617882052260 0 | [ID123, 202103] A03 35 1620299210336 0 | [ID123, 202104] A04 45 1622804606823 0 | [ID123, 202105] A05 table.b-changelog 6 1622617868856 0 | [ID123] BBB join.ab-changelog 0 1622617868856 0 | [ID123, 202104] A04, BBB output 0 1622617868856 0 | [ID123, 202104] A04, BBB {noformat} Using 1 partition {noformat} table.a-changelog 28 1612435945196 0 | [ID123, 202101] A01 88 1614863137136 0 | [ID123, 202102] A02 149 1617882052260 0 | [ID123, 202103] A03 210 1620299210336 0 | [ID123, 202104] A04 269 1622804606823 0 | [ID123, 202105] A05 table.b-changelog 7 1622617868856 0 | [ID123] BBB join.ab-changelog 28 1622617868856 0 | [ID123, 202101] A01, BBB 88 1622617868856 0 | [ID123, 202102] A02, BBB 149 1622617868856 0 | [ID123, 202103] A03, BBB 210 1622617868856 0 | [ID123, 202104] A04, BBB 269 1622804606823 0 | [ID123, 202105] A05, BBB output 5 1622617868856 0 | [ID123, 202101] A01, BBB 15 1622617868856 0 | [ID123, 202102] A02, BBB 25 1622617868856 0 | [ID123, 202103] A03, BBB 35 1622617868856 0 | [ID123, 202104] A04, BBB 45 1622804606823 0 | [ID123, 202105] A05, BBB{noformat} was (Author: xnix): Hi [~guozhang] and [~abellemare], thank you for your answers. In the scenario we have the output topic is compacted on the same key as Topic A, so if intermediate events would not come - it would be fine. We want the final result. What we're seeing though is combinations missing completely. We've read out all events from the A,B and output topics but also the internal topics created by the join. Expected is that all would have 66 ids. As said, running with same data with one partition create a perfect match where all events has passed through. {noformat} #Using 4 partitions A : ids: 66 B : ids: 66 table.b-changelog : ids: 66 table.a-changelog : ids: 66 join.ab-changelog : ids: 20 output : ids: 20{noformat} We have put up several junit test cases with the TopologyTestDriver but are unable to reproduce it. In a local docker environment we can reproduce the above scenario every time when using 4 partitions and the problem goes away when using 1 partition. Below is when reading from the different topics directly for a specific id "ID123" Using 4 partitions {noformat} table.a-changelog 5 1612435945196 0 | [ID123, 202101] A01 15 1614863137136 0 | [ID123, 202102] A02 25 1617882052260 0 | [ID123, 202103] A03 35 1620299210336 0 | [ID123, 202104] A04 45 1622804606823 0 | [ID123, 202105] A05 table.b-changelog 6 1622617868856 0 | [ID123] BBB join.ab-changelog 0 1622617868856 0 | [ID123, 202104] A04, BBB output 0 1622617868856 0 | [ID123, 202104] A04, BBB {noformat} Using 1 partition {noformat} table.a-changelog 28 1612435945196 0 | [ID123, 202101] A01 88 1614863137136 0 | [ID123, 202102] A02 149 1617882052260 0 | [ID123, 202103] A03 210 1620299210336 0 | [ID123, 202104] A04 269 1622804606823 0 | [ID123, 202105] A05 table.b-changelog 7 1622617868856 0 | [ID123] BBB join.ab-changelog 28 1622617868856 0 | [ID123, 202101] A01, BBB 88 1622617868856 0 | [ID123, 202102] A02, BBB 149 1622617868856 0 | [ID123, 202103] A03, BBB 210 1622617868856 0 | [ID123, 202104] A04, BBB 269 1622804606823 0 | [ID123, 202105] A05, BBB output 5 1622617868856 0 | [ID123, 202101] A01, BBB 15 1622617868856 0 | [ID123, 202102] A02, BBB 25 1622617868856 0 | [ID123, 202103] A03, BBB 35 1622617868856 0 | [ID123, 202104] A04, BBB 45 1622804606823 0 | [ID123, 202105] A05, BBB{noformat} > KTable to KTable foreign key join loose events when using several partitions > ---------------------------------------------------------------------------- > > Key: KAFKA-13261 > URL: https://issues.apache.org/jira/browse/KAFKA-13261 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.8.0, 2.7.1 > Reporter: Tomas Forsman > Priority: Major > > Two incoming streams A and B. > Stream A uses a composite key [a, b] > Stream B has key [b] > Stream B has 4 partitions and steams A has 1 partition. > What we try to do is repartition stream A to have 4 partitions too, then put > both A and B into KTable and do a foreign key join on from A to B > When doing this, all messages does not end up in the output topic. > Repartitioning both to only use 1 partition each solve the problem so it seem > like it has something to do with the foreign key join in combination with > several partitions. > One suspicion would be that it is not possible to define what partitioner to > use for the join. > Any insight or help is greatly appreciated. > *Example code of the problem* > {code:java} > static Topology createTopoology(){ > var builder = new StreamsBuilder(); > KTable<String, String> tableB = builder.table("B", > stringMaterialized("table.b")); > builder > .stream("A", Consumed.with(Serde.of(KeyA.class), > Serde.of(EventA.class))) > .repartition(repartitionTopicA()) > .toTable(Named.as("table.a"), aMaterialized("table.a")) > .join(tableB, EventA::getKeyB, topicAandBeJoiner(), > Named.as("join.ab"), joinMaterialized("join.ab")) > .toStream() > .to("output", with(...)); > return builder.build(); > } > private static Materialized<KeyA, EventA> aMaterialized(String name) { > Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> table = > Materialized.as(name); > return > table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)); > } > private static Repartitioned<DriverPeriod, DriverCosts> repartitionTopicA() { > Repartitioned<DriverPeriod, DriverCosts> repartitioned = > Repartitioned.as("driverperiod"); > return > repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)) > .withStreamPartitioner(topicAPartitioner()) > .withNumberOfPartitions(4); > } > private static StreamPartitioner<DriverPeriod, DriverCosts> > topicAPartitioner() { > return (topic, key, value, numPartitions) -> > Math.abs(key.getKeyB().hashCode()) % numPartitions; > } > private static Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> > joinMaterialized(String name) { > Materialized<DriverPeriod, DriverCosts, KeyValueStore<Bytes, byte[]>> > table = Materialized.as(name); > return > table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class)); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)