[ 
https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15358566#comment-15358566
 ] 

Jan Filipiak commented on KAFKA-3705:
-------------------------------------

Hi, yes that is kinda where I am coming from. I completely understand where you 
are. 
Doing the change log case ( logging Change<> objects) is just one 
implementation of this repartitioning and mine is another one. I am very 
familiar with my approach as I wrote some Samza apps using this approach. It 
has many benefits that may or may not be of interest. (repartition-topics can 
also be used to bootstrap, fewer copies of the data (no need to make state HA, 
see previous) etc.). What we are still missing here is a mutual understanding 
of what I think keywidening does and how to expose that to users in a non 
insane manner.

Maybe I try it with your Json syntax. This is the very example we have and 
where this tickets feature would allow me to build it in the dsl level of the 
api.

So lets say I have 3 tables. A, B, C, i want to reach a point where I have C => 
<C,List<Join<A,B>> this will then be read by our application servers and 
servers them as a faster way to retrieves this than lets say the original 
mysql. B has foreign keys in A and C.

 
All tables start of as one topic. keyed by this tables primary key
Topic mysq__jadajadajada_A
A.PK => A
Topic mysq_B
B.PK => B
Topic mysq_C
C.PK => C

I am going to repartition B to A.PK now. In the first example without a widened 
key.
Then it stays B.PK => B but partitioned by A.PK accordingly.

then I can do the join with A and get
B.PK => joined<B,A>

as of your previous comment:
{quote}
Then a join result of
{a="a1", joined = join("a1-pre", "c1")} 
{quote}
Note the Key stays B.PK (unwindened).
Now I am going to repartition based on C.PK still maintaining
B.PK => joined<B,A>
as the topic layout. 
Now, shit hits the fan. As I am doing my aggregation to become 
C,PK => List<Joined<A,B>>

How would this aggregator looks now?

{code:java}
List<Joined<A,B>> apply(B key, Joined<A,B> value, List<Joined<A,B>> current)
{
   Map m = listToMap(current, bKeyExtractorValueMapper<List<Joined<A,B>,B.PK>);
   if(value == null)
   { 
      m.remove(key)
   }else
   {
     m.put(key,value)
   }
   return m.entrySet.asList

}
{code}

This wouldn't be much different with logged Changes<Joined<A,B>> only the 
remove and add would be to methods. The problem is, that it doesn't
look wrong. But this code now has race conditions. Think about an update to the 
A.PK field of a B record that forces it to switch partitions.
(the C.PK value remains) then we publish a delete to the old partition and the 
new value to the new partition. Then we do the join. then we repartition on the 
non changed C.PK. This will make out code above see B.PK => null /remove B.PK 
=> Joined<A,B> /add in no particular order. Hence the output is undefined. If 
we had forcefully by api widened the key to be Joined<A.PK,B.PK> the error 
would not happen and users would be aware of what happens on repartitioning. I 
thought this through and it also happens with logging Change<>, as it is really 
just another implementation.

I hope this finally clarifies that key widening I am talking about. If not, 
maybe we should have a small skype or something. 
My recommendation is further to not implement this joins as logged Changes<> as 
it is just more resource intensive and less efficient also making the api more 
complicated.

PS.: Hive has seen all join types with MapJoins, Skewed Joines, you name it. 
all these are applicable to streams aswell. Maybe have them in the back of your 
head.







> Support non-key joining in KTable
> ---------------------------------
>
>                 Key: KAFKA-3705
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3705
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Liquan Pei
>              Labels: api
>             Fix For: 0.10.1.0
>
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users 
> want to join a KTable A by key {{a}} with another KTable B by key {{b}} but 
> with a "foreign key" {{a}}, and assuming they are read from two topics which 
> are partitioned on {{a}} and {{b}} respectively, they need to do the 
> following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' 
> is partitioned on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables are read from two topics which are already 
> partitioned on {{a}}, users still need to do the pre-aggregation in order to 
> make the two joining streams to be on the same key. This is a draw-back from 
> programability and we should fix it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to