[ 
https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15340779#comment-15340779
 ] 

Jan Filipiak commented on KAFKA-3705:
-------------------------------------

A few things I came accross building the current implementation based on the 
processor API.

1. Partitioning
I ended up with the need of passing an additional ValueMapper<K,K1> into the 
method. I had to use it in the Sinks partitioner to extract the 
_partition/join-key_ from the key that is used for the repartition topic. It 
had to be extracted from the key as I still need to be able to pass nullvalues 
to the correct partition for deletes. This came from not knowing the number of 
partitions in the processor but only in the partitoner, this made the "API" 
kinda complicated. 

2. Range Select
This ValueMapper mentioned above also had to be passed into the 
RocksDBIterator. Havin KeyValueIterator<K,V> range(K from, K to) is not 
"natural" for prefix range querries. KeyValueIterator<K,V> range(K1 prefix) 
where Serde<K1> needs to produce prefixbytes of Serde<K>

3. Key expansion
After a join in this fashion, the key is what I started refering to as widened. 
Say you have KTable<AK,AV> and it is the table that needs to be repartitioned 
and KP is the repartition key, then, independently on the other table the new 
Key of the table must include KR and AK, wich is a wired thing compared to the 
traditonal relational database way. Imagin having a result table as 
KTable<Pair<AK,KP>,Pair<AV,XV>> then the used to be unique key AK is not unique 
anymore, the processor might see the insert in the one partition before the 
delete in the other (eg when the rows KP was update). I think this should be 
embrased, because that is how it is. It should just be apparent for the user 
maybe as it needs to be dealt with in downstream processors.

Unrelated to the topic of joining, the processor api not necessarily 
comfortable, I appreaceate the beauty of the threading model but stiching 
graphs together based on processornames and strings is more tricky than I 
tought. Anyhow really nice stream processing framework. It feels and looks so 
much better than what is out there spark or storm. Watching their desprate 
attempts to put state in is a joy. Nice work. As soon as our implementation is 
hardend in production, Ill probably can share.


> Support non-key joining in KTable
> ---------------------------------
>
>                 Key: KAFKA-3705
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3705
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Liquan Pei
>              Labels: api
>             Fix For: 0.10.1.0
>
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users 
> want to join a KTable A by key {{a}} with another KTable B by key {{b}} but 
> with a "foreign key" {{a}}, and assuming they are read from two topics which 
> are partitioned on {{a}} and {{b}} respectively, they need to do the 
> following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' 
> is partitioned on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables are read from two topics which are already 
> partitioned on {{a}}, users still need to do the pre-aggregation in order to 
> make the two joining streams to be on the same key. This is a draw-back from 
> programability and we should fix it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to