[ 
https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tomas Forsman updated KAFKA-13261:
----------------------------------
    Description: 
Two incoming streams A and B. 

Stream A uses a composite key [a, b]

Stream B has key [b]

Stream B has 4 partitions and steams A has 1 partition.

What we try to do is repartition stream B to 4 partitions, then put both A and 
B into KTable and do a foreign key join on B

When doing this, all messages does not end up in the output topic.

Repartitioning both to only use 1 partition each solve the problem so it seem 
like it has something to do with the foreign key join in combination with 
several partitions. 

One suspicion would be the internal topics created for the join and that it is 
not possible to define what partitioner to use for the join.

Any insight or help is greatly appreciated.

*Example code of the problem*
{code:java}
static Topology createTopoology(){
    var builder = new StreamsBuilder();

    KTable<String, String> tableB = builder.table("B",  
stringMaterialized("table.b"));

    builder
        .stream("A", Consumed.with(Serde.of(KeyA.class), 
Serde.of(EventA.class)))
        .repartition(repartitionTopicA())
        .toTable(Named.as("table.a"), aMaterialized("table.a"))
        .join(tableB, EventA::getKeyB, topicAandBeJoiner(), 
Named.as("join.ab"), joinMaterialized("join.ab"))
        .toStream()
        .to("output", with(...));

    return builder.build();
}

private static Materialized<KeyA, EventA> aMaterialized(String name) {
  Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> table = 
Materialized.as(name);
  return 
table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
}

private static Repartitioned<DriverPeriod, DriverCosts> repartitionTopicA() {
    Repartitioned<DriverPeriod, DriverCosts> repartitioned = 
Repartitioned.as("driverperiod");
    return 
repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class))
        .withStreamPartitioner(topicAPartitioner())
        .withNumberOfPartitions(4);
}

private static StreamPartitioner<DriverPeriod, DriverCosts> topicAPartitioner() 
{
    return (topic, key, value, numPartitions) -> 
Math.abs(key.getKeyB().hashCode()) % numPartitions;
}

private static Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> 
joinMaterialized(String name) {
    Materialized<DriverPeriod, DriverCosts, KeyValueStore<Bytes, byte[]>> table 
= Materialized.as(name);
    return 
table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
}

{code}

  was:
Two incoming streams A and B. 

Stream A uses a composite key [a, b]

Stream B has key [b]

Stream B has 4 partitions and steams A has 1 partition.

What we try to do is repartition stream B to 4 partitions, then put both A and 
B into KTable and do a foreign key join on B

When doing this, all messages does not end up in the output topic.

Repartitioning both to only use 1 partition each solve the problem so it seem 
like it has something to do with the foreign key join in combination with 
several partitions. 

One suspicion would be the internal topics created for the join and that it is 
not possible to define what partitioner to use for the join.

Any insight or help is greatly appreciated.

*Example code of the problem*
{code:java}
static Topology createTopoology(){
    var builder = new StreamsBuilder();

    KTable<String, String> tableB = builder.table("B",  
stringMaterialized("table.b"));

    builder
        .stream("A", Consumed.with(Serde.of(KeyA.class), 
Serde.of(EventA.class)))
        .repartition(repartitionTopicA())
        .toTable(Named.as("table.a"), driverPeriodMaterialized("table.a"))
        .join(tableB, EventA::getKeyB, topicAandBeJoiner(), 
Named.as("join.ab"), joinMaterialized("join.ab"))
        .toStream()
        .to("output", with(...));

    return builder.build();
}

private static Repartitioned<DriverPeriod, DriverCosts> repartitionTopicA() {
    Repartitioned<DriverPeriod, DriverCosts> repartitioned = 
Repartitioned.as("driverperiod");
    return 
repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class))
        .withStreamPartitioner(topicAPartitioner())
        .withNumberOfPartitions(4);
}

private static StreamPartitioner<DriverPeriod, DriverCosts> topicAPartitioner() 
{
    return (topic, key, value, numPartitions) -> 
Math.abs(key.getKeyB().hashCode()) % numPartitions;
}

private static Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> 
joinMaterialized(String name) {
    Materialized<DriverPeriod, DriverCosts, KeyValueStore<Bytes, byte[]>> table 
= Materialized.as(name);
    return 
table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
}

{code}


> KTable to KTable foreign key join does not produce output when using several 
> partitions
> ---------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13261
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13261
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.8.0, 2.7.1
>            Reporter: Tomas Forsman
>            Priority: Major
>
> Two incoming streams A and B. 
> Stream A uses a composite key [a, b]
> Stream B has key [b]
> Stream B has 4 partitions and steams A has 1 partition.
> What we try to do is repartition stream B to 4 partitions, then put both A 
> and B into KTable and do a foreign key join on B
> When doing this, all messages does not end up in the output topic.
> Repartitioning both to only use 1 partition each solve the problem so it seem 
> like it has something to do with the foreign key join in combination with 
> several partitions. 
> One suspicion would be the internal topics created for the join and that it 
> is not possible to define what partitioner to use for the join.
> Any insight or help is greatly appreciated.
> *Example code of the problem*
> {code:java}
> static Topology createTopoology(){
>     var builder = new StreamsBuilder();
>     KTable<String, String> tableB = builder.table("B",  
> stringMaterialized("table.b"));
>     builder
>         .stream("A", Consumed.with(Serde.of(KeyA.class), 
> Serde.of(EventA.class)))
>         .repartition(repartitionTopicA())
>         .toTable(Named.as("table.a"), aMaterialized("table.a"))
>         .join(tableB, EventA::getKeyB, topicAandBeJoiner(), 
> Named.as("join.ab"), joinMaterialized("join.ab"))
>         .toStream()
>         .to("output", with(...));
>     return builder.build();
> }
> private static Materialized<KeyA, EventA> aMaterialized(String name) {
>   Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> table = 
> Materialized.as(name);
>   return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> private static Repartitioned<DriverPeriod, DriverCosts> repartitionTopicA() {
>     Repartitioned<DriverPeriod, DriverCosts> repartitioned = 
> Repartitioned.as("driverperiod");
>     return 
> repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class))
>         .withStreamPartitioner(topicAPartitioner())
>         .withNumberOfPartitions(4);
> }
> private static StreamPartitioner<DriverPeriod, DriverCosts> 
> topicAPartitioner() {
>     return (topic, key, value, numPartitions) -> 
> Math.abs(key.getKeyB().hashCode()) % numPartitions;
> }
> private static Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> 
> joinMaterialized(String name) {
>     Materialized<DriverPeriod, DriverCosts, KeyValueStore<Bytes, byte[]>> 
> table = Materialized.as(name);
>     return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to