[
https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15340814#comment-15340814
]
Guozhang Wang commented on KAFKA-3705:
--------------------------------------
Thanks for the feedbacks!
Re 1: Not sure I fully understand this. I thought you can pass a
{{StreamPartitioner}} when calling {{addSink}} which should be sufficient?
Re 2: We are aware of this, and as discussed in the wiki our current proposal
is that we can use sth. similar to what you mentioned as {{range(K1 prefix)}}
and check if {{key.startsWith(prefix)}} to stop iterating. There are some
optimizations with prefix seeking in RocksDB but we need to contribute back to
RocksDB's JNI to make use of it.
https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Non-key+KTable-KTable+Joins#Discussion:Non-keyKTable-KTableJoins-Simpleapproach:seekwithkeydirectly
Re 3: The idea is that for the repatitioning, we are going to first transform
the old key-value pair <AK, AV> into <PK, <AK, AV>>, but when sending the
key-value pair to the re-partition topic, specify the {{StreamPartitioner}} to
partition based on combo <PK, AK> (remember its assign API takes both the key
and value), and let the joiner after the repartitioning to be applied on <AV>
only.
When old value needs to be sent as well, we are going to send the {new, old}
pair separately as two record: <PK-new, <AK, AV-new>>, and <PK-new, <AK,
AV-old>>, and still partition on combo <PK-new, AK> and <PK-old, AK>. These two
records may be sent to two different partitions and hence processed by two
different processors, which are expected behavior. Does that look reasonable to
you?
> Support non-key joining in KTable
> ---------------------------------
>
> Key: KAFKA-3705
> URL: https://issues.apache.org/jira/browse/KAFKA-3705
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Reporter: Guozhang Wang
> Assignee: Liquan Pei
> Labels: api
> Fix For: 0.10.1.0
>
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users
> want to join a KTable A by key {{a}} with another KTable B by key {{b}} but
> with a "foreign key" {{a}}, and assuming they are read from two topics which
> are partitioned on {{a}} and {{b}} respectively, they need to do the
> following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB'
> is partitioned on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables are read from two topics which are already
> partitioned on {{a}}, users still need to do the pre-aggregation in order to
> make the two joining streams to be on the same key. This is a draw-back from
> programability and we should fix it.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)