[ 
https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357696#comment-15357696
 ] 

Guozhang Wang commented on KAFKA-3705:
--------------------------------------

Yeah this clarifies a lot. I see that you are not trying to creating a "table" 
inside Kafka Streams, but just want to re-partition the input binlog stream, 
and you are working on the lower-level APIs  (originally I was confused since 
you mention "KTable" which is only available in the higher-level DSL). And here 
is my understanding / suggestions:

1. the original Kafka topic that was directly piped from your MySQL bin-log has 
the form (key -> value)

a -> {a', other-fields}

and you want to repartition it into a new topic on field a', and also log 
compacted on a'.


2. so instead of representing a delete / update record as "a -> null" / "a -> 
a'-new, other-fields-new", you can send the messages in a different format 
(there are already a few tools including Kafka Connect which allows to 
represent your binlog entries with such flexibility):

a -> {pair{a'-old, a'-new}, other-fields-current-value }

So that deletion becomes:   a -> {pair{a'-old, null}, other-fields}


3. in this case, you can access the value field to extract a' for partitioning 
even for deletion cases, and also for an update record, you can then send two 
records as the following to the re-partition topic:


a'-old -> null,

a'-new -> other-fields.


Will that work for you?



> Support non-key joining in KTable
> ---------------------------------
>
>                 Key: KAFKA-3705
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3705
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Liquan Pei
>              Labels: api
>             Fix For: 0.10.1.0
>
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users 
> want to join a KTable A by key {{a}} with another KTable B by key {{b}} but 
> with a "foreign key" {{a}}, and assuming they are read from two topics which 
> are partitioned on {{a}} and {{b}} respectively, they need to do the 
> following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' 
> is partitioned on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables are read from two topics which are already 
> partitioned on {{a}}, users still need to do the pre-aggregation in order to 
> make the two joining streams to be on the same key. This is a draw-back from 
> programability and we should fix it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to