[ 
https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17409883#comment-17409883
 ] 

Tomas Forsman edited comment on KAFKA-13261 at 9/6/21, 2:26 PM:
----------------------------------------------------------------

Hi [~guozhang], the local one is full-fledged, however scaled down, setup 
running the actual services needed to reproduce the problem. We do have test 
environment and staging environment with complete setup where we also can 
reproduce the problem.

Thanks [~vvcephei], i'll look into setting up a test case based on the one you 
sent me.

About removing the partitioned, the example I've given is the minimal one 
needed to show the problem. Our topologies are bigger in general and we where 
hoping to be able to use several partitions, with custom partitioner, instead 
of just one. 
I will follow your suggestion and try without it.

 

 

 


was (Author: xnix):
Hi [~guozhang], the local one is full-fledged, however scaled down, setup 
running the actual services needed to reproduce the problem. We do have test 
environment and staging environment with complete setup where we also can 
reproduce the problem.

Thanks [~vvcephei], i'll look into setting up a test case based on the one you 
sent me.

About removing the partitioned, the example I've given is the minimal one 
needed to show the problem. Our typologies are bigger in general and we where 
hoping to be able to use several partitions instead of just one. However I will 
follow your suggestion and try without it.

 

 

 

> KTable to KTable foreign key join loose events when using several partitions
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-13261
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13261
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.8.0, 2.7.1
>            Reporter: Tomas Forsman
>            Priority: Major
>         Attachments: KafkaTest.java
>
>
> Two incoming streams A and B. 
> Stream A uses a composite key [a, b]
> Stream B has key [b]
> Stream B has 4 partitions and steams A has 1 partition.
> What we try to do is repartition stream A to have 4 partitions too, then put 
> both A and B into KTable and do a foreign key join on from A to B
> When doing this, all messages does not end up in the output topic.
> Repartitioning both to only use 1 partition each solve the problem so it seem 
> like it has something to do with the foreign key join in combination with 
> several partitions. 
> One suspicion would be that it is not possible to define what partitioner to 
> use for the join.
> Any insight or help is greatly appreciated.
> *Example code of the problem*
> {code:java}
> static Topology createTopoology(){
>     var builder = new StreamsBuilder();
>     KTable<String, String> tableB = builder.table("B",  
> stringMaterialized("table.b"));
>     builder
>         .stream("A", Consumed.with(Serde.of(KeyA.class), 
> Serde.of(EventA.class)))
>         .repartition(repartitionTopicA())
>         .toTable(Named.as("table.a"), aMaterialized("table.a"))
>         .join(tableB, EventA::getKeyB, topicAandBeJoiner(), 
> Named.as("join.ab"), joinMaterialized("join.ab"))
>         .toStream()
>         .to("output", with(...));
>     return builder.build();
> }
> private static Materialized<KeyA, EventA> aMaterialized(String name) {
>   Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> table = 
> Materialized.as(name);
>   return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> private static Repartitioned<DriverPeriod, DriverCosts> repartitionTopicA() {
>     Repartitioned<DriverPeriod, DriverCosts> repartitioned = 
> Repartitioned.as("driverperiod");
>     return 
> repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class))
>         .withStreamPartitioner(topicAPartitioner())
>         .withNumberOfPartitions(4);
> }
> private static StreamPartitioner<DriverPeriod, DriverCosts> 
> topicAPartitioner() {
>     return (topic, key, value, numPartitions) -> 
> Math.abs(key.getKeyB().hashCode()) % numPartitions;
> }
> private static Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> 
> joinMaterialized(String name) {
>     Materialized<DriverPeriod, DriverCosts, KeyValueStore<Bytes, byte[]>> 
> table = Materialized.as(name);
>     return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to