[ 
https://issues.apache.org/jira/browse/KAFKA-13261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17416776#comment-17416776
 ] 

Victoria Xia edited comment on KAFKA-13261 at 9/17/21, 11:46 PM:
-----------------------------------------------------------------

Hey [~mjsax] [~guozhang], after opening the KIP above I noticed a slight 
"problem" with the interfaces for users to pass custom partitioners. Can you 
double-check my reasoning below?

The "problem" with the interfaces in the KIP is that interfaces currently ask 
users to provide {{StreamPartitioner<K, V> thisPartitioner }}(representing the 
partitioning strategy for the right/foreign-key table) – as these are the 
partitioners that users should know – but the FK join implementation actually 
needs a partitioner of the form {{StreamPartitioner<KO, 
SubscriptionWrapper<K>>}} instead of {{otherPartitioner}}, and a partitioner of 
the form {{StreamPartitioner<K, SubscriptionResponseWrapper<VO>>}} instead of 
{{thisPartitioner}}, based on the actual messages being passed in the 
subscription and response topics.

It doesn't make sense to ask users to pass these other partitioners since 
{{SubscriptionWrapper}} and {{SubscriptionResponseWrapper}} are internal 
implementation details. What we really want is to require that any custom 
partitioners select a partition based only on the message key, without taking 
the message value into consideration. I think this is a reasonable requirement 
for the right/foreign-key table since the subscription store mechanism (in the 
FK join implementation) won't work at all unless all messages with the same 
(foreign) key are always sent to the same partition. We don't technically need 
to require this for the left table since, if we wanted to, we could send the 
original value along with the subscription in order to ensure that the response 
is routed back to the correct partition, but this seems unnecessarily 
complicated and also bloats the size of the subscription store and topic.

As such, I think we should require that any custom partitioner used for tables 
must partition messages based only on message key (and not value), in order to 
be used in FK joins. Do you agree?

If so, we could make this requirement explicit in the FK join interfaces by 
asking users for {{StreamPartitioner<K, Void> thisPartitioner }}and{{ 
}}{{StreamPartitioner<KO, Void> otherPartitioner }}instead of the original 
partitioners. Or we could keep the interfaces as is ({{StreamPartitioner<K, V> 
thisPartitioner and }}{{StreamPartitioner<KO, VO> otherPartitioner}}), since 
users probably already have these handy anyway, and leave a note in the 
javadocs to explain the requirement. I have a preference for the latter but am 
curious what you think. Thanks!


was (Author: vcrfxia):
Hey [~mjsax] [~guozhang], after opening the KIP above I noticed a slight 
"problem" with the interfaces for users to pass custom partitioners. Can you 
double-check my reasoning below?

The "problem" with the interfaces in the KIP is that interfaces currently ask 
users to provide {{StreamPartitioner<K, V> thisPartitioner }}(representing the 
partitioning strategy for the right/foreign-key table) – as these are the 
partitioners that users should know – but the FK join implementation actually 
needs a partitioner of the form {{StreamPartitioner<KO, 
SubscriptionWrapper<K>>}} instead of {{otherPartitioner}}, and a partitioner of 
the form {{StreamPartitioner<K, SubscriptionWrapper<VO>>}} instead of 
{{thisPartitioner}}, based on the actual messages being passed in the 
subscription and response topics.

It doesn't make sense to ask users to pass these other partitioners since 
{{SubscriptionWrapper}} is an internal implementation detail. What we really 
want is to require that any custom partitioners select a partition based only 
on the message key, without taking the message value into consideration. I 
think this is a reasonable requirement for the right/foreign-key table since 
the subscription store mechanism (in the FK join implementation) won't work at 
all unless all messages with the same (foreign) key are always sent to the same 
partition. We don't technically need to require this for the left table since, 
if we wanted to, we could send the original value along with the subscription 
in order to ensure that the response is routed back to the correct partition, 
but this seems unnecessarily complicated and also bloats the size of the 
subscription store and topic.

As such, I think we should require that any custom partitioner used for tables 
must partition messages based only on message key (and not value), in order to 
be used in FK joins. Do you agree?

If so, we could make this requirement explicit in the FK join interfaces by 
asking users for {{StreamPartitioner<K, Void> thisPartitioner }}and{{ 
}}{{StreamPartitioner<KO, Void> otherPartitioner }}instead of the original 
partitioners. Or we could keep the interfaces as is ({{StreamPartitioner<K, V> 
thisPartitioner }}and{{ }}{{StreamPartitioner<KO, VO> otherPartitioner}}), 
since users probably already have these handy anyway, and leave a note in the 
javadocs to explain the requirement. I have a preference for the latter but am 
curious what you think. Thanks!{{}}

> KTable to KTable foreign key join loose events when using several partitions
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-13261
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13261
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.8.0, 2.7.1
>            Reporter: Tomas Forsman
>            Assignee: Victoria Xia
>            Priority: Major
>              Labels: kip
>         Attachments: KafkaTest.java
>
>
> KIP-775: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-775%3A+Custom+partitioners+in+foreign+key+joins]
>  
> Two incoming streams A and B. 
> Stream A uses a composite key [a, b]
> Stream B has key [b]
> Stream B has 4 partitions and steams A has 1 partition.
> What we try to do is repartition stream A to have 4 partitions too, then put 
> both A and B into KTable and do a foreign key join on from A to B
> When doing this, all messages does not end up in the output topic.
> Repartitioning both to only use 1 partition each solve the problem so it seem 
> like it has something to do with the foreign key join in combination with 
> several partitions. 
> One suspicion would be that it is not possible to define what partitioner to 
> use for the join.
> Any insight or help is greatly appreciated.
> *Example code of the problem*
> {code:java}
> static Topology createTopoology(){
>     var builder = new StreamsBuilder();
>     KTable<String, String> tableB = builder.table("B",  
> stringMaterialized("table.b"));
>     builder
>         .stream("A", Consumed.with(Serde.of(KeyA.class), 
> Serde.of(EventA.class)))
>         .repartition(repartitionTopicA())
>         .toTable(Named.as("table.a"), aMaterialized("table.a"))
>         .join(tableB, EventA::getKeyB, topicAandBeJoiner(), 
> Named.as("join.ab"), joinMaterialized("join.ab"))
>         .toStream()
>         .to("output", with(...));
>     return builder.build();
> }
> private static Materialized<KeyA, EventA> aMaterialized(String name) {
>   Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> table = 
> Materialized.as(name);
>   return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> private static Repartitioned<DriverPeriod, DriverCosts> repartitionTopicA() {
>     Repartitioned<DriverPeriod, DriverCosts> repartitioned = 
> Repartitioned.as("driverperiod");
>     return 
> repartitioned.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class))
>         .withStreamPartitioner(topicAPartitioner())
>         .withNumberOfPartitions(4);
> }
> private static StreamPartitioner<DriverPeriod, DriverCosts> 
> topicAPartitioner() {
>     return (topic, key, value, numPartitions) -> 
> Math.abs(key.getKeyB().hashCode()) % numPartitions;
> }
> private static Materialized<KeyA, EventA, KeyValueStore<Bytes, byte[]>> 
> joinMaterialized(String name) {
>     Materialized<DriverPeriod, DriverCosts, KeyValueStore<Bytes, byte[]>> 
> table = Materialized.as(name);
>     return 
> table.withKeySerde(Serde.of(KeyA.class)).withValueSerde(Serde.of(EventA.class));
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to