[ 
https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17408977#comment-17408977
 ] 

John Roesler commented on KAFKA-13261:
--------------------------------------

Hi [~xnix] ,

 

Your suspicion is correct. TopologyTestDriver doesn't simulate partitions at 
all, so you won't be able to use it to test this case.

 

When it comes to a repro, you might be interested in this class, which verifies 
that foreign-key joins perform correctly when the input topics have different 
partitions: 
[https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java]

If we have a bug, my suspicion would be whether we're correctly capturing the 
partitioner that you're setting via Repartitioned. I'd suggest modifying that 
test to be closer to your example and seeing whether or not we still get the 
correct result.

 

On the subject of Repartitioned, I didn't quite follow why you're doing it. To 
be clear, when you're doing foreign-key joins, you do not need the two tables 
to have the same number of partitions, nor do you need them to be 
co-partitioned. This should work just fine:
{code:java}
  KTable<String, String> tableB = builder.table("B",  
stringMaterialized("table.b"));

    builder
        .stream("A", Consumed.with(Serde.of(KeyA.class), 
Serde.of(EventA.class)))
        .toTable(Named.as("table.a"), aMaterialized("table.a"))
        .join(tableB, EventA::getKeyB, topicAandBeJoiner(), 
Named.as("join.ab"), joinMaterialized("join.ab"))
        .toStream()
        .to("output", with(...));
{code}
Unless you have some other requirement for which you need the repartition 
operation, I'd suggest just completely dropping those repartition steps. At 
least, I'd suggest trying out removing them from the topology and verifying if 
you get the correct join results.

 

I hope this helps!

> KTable to KTable foreign key join loose events when using several partitions
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-13261
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13261
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.8.0, 2.7.1
>            Reporter: Tomas Forsman
>            Priority: Major
>         Attachments: KafkaTest.java
>
>
> Two incoming streams A and B. 
> Stream A uses a composite key [a, b]
> Stream B has key [b]
> Stream B has 4 partitions and steams A has 1 partition.
> What we try to do is repartition stream A to have 4 partitions too, then put 
> both A and B into KTable and do a foreign key join on from A to B
> When doing this, all messages does not end up in the output topic.
> Repartitioning both to only use 1 partition each solve the problem so it seem 
> like it has something to do with the foreign key join in combination with 
> several partitions. 
> One suspicion would be that it is not possible to define what partitioner to 
> use for the join.
> Any insight or help is greatly appreciated.
> *Example code of the problem*
> {code:java}
> static Topology createTopoology(){
>     var builder = new StreamsBuilder();
>     KTable<String, String> tableB = builder.table("B",  
> stringMaterialized("table.b"));
>     builder
>         .stream("A", Consumed.with(Serde.of(KeyA.class), 
> Serde.of(EventA.class)))
>         .repartition(repartitionTopicA())
>         .toTable(Named.as("table.a"), aMaterialized("table.a"))
>         .join(tableB, EventA::getKeyB, topicAandBeJoiner(), 
> Named.as("join.ab"), joinMaterialized("join.ab"))
>         .toStream()
>         .to("output", with(...));
>     return builder.build();
> }
> private static Materialized<KeyA, EventA> aMaterialized(String name) {
>   Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> table = 
> Materialized.as(name);
>   return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> private static Repartitioned<DriverPeriod, DriverCosts> repartitionTopicA() {
>     Repartitioned<DriverPeriod, DriverCosts> repartitioned = 
> Repartitioned.as("driverperiod");
>     return 
> repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class))
>         .withStreamPartitioner(topicAPartitioner())
>         .withNumberOfPartitions(4);
> }
> private static StreamPartitioner<DriverPeriod, DriverCosts> 
> topicAPartitioner() {
>     return (topic, key, value, numPartitions) -> 
> Math.abs(key.getKeyB().hashCode()) % numPartitions;
> }
> private static Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> 
> joinMaterialized(String name) {
>     Materialized<DriverPeriod, DriverCosts, KeyValueStore<Bytes, byte[]>> 
> table = Materialized.as(name);
>     return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to