[
https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15358566#comment-15358566
]
Jan Filipiak commented on KAFKA-3705:
-------------------------------------
Hi, yes that is kinda where I am coming from. I completely understand where you
are.
Doing the change log case ( logging Change<> objects) is just one
implementation of this repartitioning and mine is another one. I am very
familiar with my approach as I wrote some Samza apps using this approach. It
has many benefits that may or may not be of interest. (repartition-topics can
also be used to bootstrap, fewer copies of the data (no need to make state HA,
see previous) etc.). What we are still missing here is a mutual understanding
of what I think keywidening does and how to expose that to users in a non
insane manner.
Maybe I try it with your Json syntax. This is the very example we have and
where this tickets feature would allow me to build it in the dsl level of the
api.
So lets say I have 3 tables. A, B, C, i want to reach a point where I have C =>
<C,List<Join<A,B>> this will then be read by our application servers and
servers them as a faster way to retrieves this than lets say the original
mysql. B has foreign keys in A and C.
All tables start of as one topic. keyed by this tables primary key
Topic mysq__jadajadajada_A
A.PK => A
Topic mysq_B
B.PK => B
Topic mysq_C
C.PK => C
I am going to repartition B to A.PK now. In the first example without a widened
key.
Then it stays B.PK => B but partitioned by A.PK accordingly.
then I can do the join with A and get
B.PK => joined<B,A>
as of your previous comment:
{quote}
Then a join result of
{a="a1", joined = join("a1-pre", "c1")}
{quote}
Note the Key stays B.PK (unwindened).
Now I am going to repartition based on C.PK still maintaining
B.PK => joined<B,A>
as the topic layout.
Now, shit hits the fan. As I am doing my aggregation to become
C,PK => List<Joined<A,B>>
How would this aggregator looks now?
{code:java}
List<Joined<A,B>> apply(B key, Joined<A,B> value, List<Joined<A,B>> current)
{
Map m = listToMap(current, bKeyExtractorValueMapper<List<Joined<A,B>,B.PK>);
if(value == null)
{
m.remove(key)
}else
{
m.put(key,value)
}
return m.entrySet.asList
}
{code}
This wouldn't be much different with logged Changes<Joined<A,B>> only the
remove and add would be to methods. The problem is, that it doesn't
look wrong. But this code now has race conditions. Think about an update to the
A.PK field of a B record that forces it to switch partitions.
(the C.PK value remains) then we publish a delete to the old partition and the
new value to the new partition. Then we do the join. then we repartition on the
non changed C.PK. This will make out code above see B.PK => null /remove B.PK
=> Joined<A,B> /add in no particular order. Hence the output is undefined. If
we had forcefully by api widened the key to be Joined<A.PK,B.PK> the error
would not happen and users would be aware of what happens on repartitioning. I
thought this through and it also happens with logging Change<>, as it is really
just another implementation.
I hope this finally clarifies that key widening I am talking about. If not,
maybe we should have a small skype or something.
My recommendation is further to not implement this joins as logged Changes<> as
it is just more resource intensive and less efficient also making the api more
complicated.
PS.: Hive has seen all join types with MapJoins, Skewed Joines, you name it.
all these are applicable to streams aswell. Maybe have them in the back of your
head.
> Support non-key joining in KTable
> ---------------------------------
>
> Key: KAFKA-3705
> URL: https://issues.apache.org/jira/browse/KAFKA-3705
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Reporter: Guozhang Wang
> Assignee: Liquan Pei
> Labels: api
> Fix For: 0.10.1.0
>
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users
> want to join a KTable A by key {{a}} with another KTable B by key {{b}} but
> with a "foreign key" {{a}}, and assuming they are read from two topics which
> are partitioned on {{a}} and {{b}} respectively, they need to do the
> following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB'
> is partitioned on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables are read from two topics which are already
> partitioned on {{a}}, users still need to do the pre-aggregation in order to
> make the two joining streams to be on the same key. This is a draw-back from
> programability and we should fix it.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)