[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable

2017-01-17 Thread Paul Vaughan (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15827015#comment-15827015
 ] 

Paul Vaughan commented on KAFKA-3705:
-

Healthcare data is intrinsically relational, but the bulk of the data is 
related to patients in the sense that if the patient did not exist, the data 
would not exist. For example, these FHIR Resources all depend on a Patient: 
Encounter, MedicationOrder, Observation, Condition, etc., while Practitioner, 
Organization, etc. exist independent of patients. This suggests that a good 
partitioning strategy for processing this data would be to partition by patient 
in order to get good concurrency with minimal repartitioning/distribution. Data 
that is independent of the patient would likely need to be distributed to all 
nodes, but that data is relatively small. 

Processing this data includes checking for referential integrity, dealing with 
out-of-order data (Encounters that are received before the Patient is 
received), and re-keying. For example, when an Encounter arrives, a downstream 
version of that Encounter needs to be created with the patient’s downstream 
key. Similarly, when a new Patient arrives it should be given a downstream key 
and any Encounters that reference this patient need to be updated and sent 
downstream.

But the data is also intrinsically keyed by something other than the patient. 
For example, an Encounter has a key and it is possible to get duplicate copies 
of an Encounter, either as corrections or simple duplicates. Thus it is 
desirable to use the intrinsic key with compacted topics, rather than using the 
patient as the Kafka topic key. While it is possible to key by one thing and 
partition by another using explicit partitioners, that seems both error prone 
and insufficient to keep the data only where it needs to be.

Specifically, the High-level Streaming DSL does not seem to support the latter 
point. Without the foreign key support discussed here, it is necessary to do 
aggregation and remapping that cause implicit repartitioning. It seems 
determined to move the data around. It is not clear to me whether this KIP 
would eliminate that problem. Note that I found the documentation frustrating 
in that this repartitioning was not apparent from the documentation – it was 
most apparent by looking at the set of topics that get implicitly created. 

I would like to see the ability to transform a set of related incoming topics 
into a set of downstream topics including re-keying and sometimes 
renormalization using the high-level Streaming DSL. This seems like it is a 
start towards that, but is it sufficient?


> Support non-key joining in KTable
> -
>
> Key: KAFKA-3705
> URL: https://issues.apache.org/jira/browse/KAFKA-3705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: api
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users 
> want to join a KTable A by key {{a}} with another KTable B by key {{b}} but 
> with a "foreign key" {{a}}, and assuming they are read from two topics which 
> are partitioned on {{a}} and {{b}} respectively, they need to do the 
> following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' 
> is partitioned on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables are read from two topics which are already 
> partitioned on {{a}}, users still need to do the pre-aggregation in order to 
> make the two joining streams to be on the same key. This is a draw-back from 
> programability and we should fix it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable

2017-01-17 Thread Jan Filipiak (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15826346#comment-15826346
 ] 

Jan Filipiak commented on KAFKA-3705:
-

AFAICS the table will be also filled by a different thread, IMO that makes it 
to hard to reason about the execution. If you look into the Table twice while 
processing a record, it might have different values. :-( GlobalKTable makes me 
sad 

> Support non-key joining in KTable
> -
>
> Key: KAFKA-3705
> URL: https://issues.apache.org/jira/browse/KAFKA-3705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: api
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users 
> want to join a KTable A by key {{a}} with another KTable B by key {{b}} but 
> with a "foreign key" {{a}}, and assuming they are read from two topics which 
> are partitioned on {{a}} and {{b}} respectively, they need to do the 
> following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' 
> is partitioned on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables are read from two topics which are already 
> partitioned on {{a}}, users still need to do the pre-aggregation in order to 
> make the two joining streams to be on the same key. This is a draw-back from 
> programability and we should fix it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable

2017-01-17 Thread Jan Filipiak (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15826332#comment-15826332
 ] 

Jan Filipiak commented on KAFKA-3705:
-

Hi, with regard of what I am trying here, the GlobalKTable is not usefull.
 It could be usefull if its source would also emmit change events. (I would 
still have todo the range lookup but i could save a repartition of my "bigger" 
table) The current design is not usuable here. :'(   

> Support non-key joining in KTable
> -
>
> Key: KAFKA-3705
> URL: https://issues.apache.org/jira/browse/KAFKA-3705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: api
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users 
> want to join a KTable A by key {{a}} with another KTable B by key {{b}} but 
> with a "foreign key" {{a}}, and assuming they are read from two topics which 
> are partitioned on {{a}} and {{b}} respectively, they need to do the 
> following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' 
> is partitioned on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables are read from two topics which are already 
> partitioned on {{a}}, users still need to do the pre-aggregation in order to 
> make the two joining streams to be on the same key. This is a draw-back from 
> programability and we should fix it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable

2017-01-17 Thread Jan Filipiak (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15826331#comment-15826331
 ] 

Jan Filipiak commented on KAFKA-3705:
-

Hi, with regard of what I am trying here, the GlobalKTable is not usefull.
 It could be usefull if its source would also emmit change events. (I would 
still have todo the range lookup but i could save a repartition of my "bigger" 
table) The current design is not usuable here. :'(   

> Support non-key joining in KTable
> -
>
> Key: KAFKA-3705
> URL: https://issues.apache.org/jira/browse/KAFKA-3705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: api
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users 
> want to join a KTable A by key {{a}} with another KTable B by key {{b}} but 
> with a "foreign key" {{a}}, and assuming they are read from two topics which 
> are partitioned on {{a}} and {{b}} respectively, they need to do the 
> following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' 
> is partitioned on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables are read from two topics which are already 
> partitioned on {{a}}, users still need to do the pre-aggregation in order to 
> make the two joining streams to be on the same key. This is a draw-back from 
> programability and we should fix it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable

2017-01-17 Thread Michael Noll (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15826064#comment-15826064
 ] 

Michael Noll commented on KAFKA-3705:
-

[~jfilipiak]: Now that support for global KTables is around the corner (see 
KIP-99 at 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=67633649), 
would that serve all / most of your needs here?  I am aware that "non-key 
joining in KTable" and "global KTables" is not a full overlap, but still the 
overlap is quite significant.

> Support non-key joining in KTable
> -
>
> Key: KAFKA-3705
> URL: https://issues.apache.org/jira/browse/KAFKA-3705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: api
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users 
> want to join a KTable A by key {{a}} with another KTable B by key {{b}} but 
> with a "foreign key" {{a}}, and assuming they are read from two topics which 
> are partitioned on {{a}} and {{b}} respectively, they need to do the 
> following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' 
> is partitioned on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables are read from two topics which are already 
> partitioned on {{a}}, users still need to do the pre-aggregation in order to 
> make the two joining streams to be on the same key. This is a draw-back from 
> programability and we should fix it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable

2016-07-27 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15396508#comment-15396508
 ] 

Guozhang Wang commented on KAFKA-3705:
--

Yeah currently we do not have a windowed-stream table join, and hence the 
stream is not materialized but only the table. We can add this join type though 
in the next release if we feel it is a common request.

> Support non-key joining in KTable
> -
>
> Key: KAFKA-3705
> URL: https://issues.apache.org/jira/browse/KAFKA-3705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Liquan Pei
>  Labels: api
> Fix For: 0.10.1.0
>
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users 
> want to join a KTable A by key {{a}} with another KTable B by key {{b}} but 
> with a "foreign key" {{a}}, and assuming they are read from two topics which 
> are partitioned on {{a}} and {{b}} respectively, they need to do the 
> following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' 
> is partitioned on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables are read from two topics which are already 
> partitioned on {{a}}, users still need to do the pre-aggregation in order to 
> make the two joining streams to be on the same key. This is a draw-back from 
> programability and we should fix it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable

2016-07-27 Thread Jan Filipiak (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15396498#comment-15396498
 ] 

Jan Filipiak commented on KAFKA-3705:
-

The change doesn't seem to be in all the places, I thought about going 
Ktable,T> but also settled with Ktable the other thing would 
just be generic overkill even though one can hide it from the user by having a 
PairSerde or something. 

Regarding your idea, I kind of fail to see how an update to table1 or table2 
would be reflected in the output with regards to republish, isn't the only 
option there to have the stream materialized based on a window? Need to take a 
deeper look though.

> Support non-key joining in KTable
> -
>
> Key: KAFKA-3705
> URL: https://issues.apache.org/jira/browse/KAFKA-3705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Liquan Pei
>  Labels: api
> Fix For: 0.10.1.0
>
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users 
> want to join a KTable A by key {{a}} with another KTable B by key {{b}} but 
> with a "foreign key" {{a}}, and assuming they are read from two topics which 
> are partitioned on {{a}} and {{b}} respectively, they need to do the 
> following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' 
> is partitioned on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables are read from two topics which are already 
> partitioned on {{a}}, users still need to do the pre-aggregation in order to 
> make the two joining streams to be on the same key. This is a draw-back from 
> programability and we should fix it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable

2016-07-27 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15396491#comment-15396491
 ] 

Guozhang Wang commented on KAFKA-3705:
--

We are fixing this issue right now on the consumer layer with KIP-62: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread

> Support non-key joining in KTable
> -
>
> Key: KAFKA-3705
> URL: https://issues.apache.org/jira/browse/KAFKA-3705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Liquan Pei
>  Labels: api
> Fix For: 0.10.1.0
>
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users 
> want to join a KTable A by key {{a}} with another KTable B by key {{b}} but 
> with a "foreign key" {{a}}, and assuming they are read from two topics which 
> are partitioned on {{a}} and {{b}} respectively, they need to do the 
> following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' 
> is partitioned on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables are read from two topics which are already 
> partitioned on {{a}}, users still need to do the pre-aggregation in order to 
> make the two joining streams to be on the same key. This is a draw-back from 
> programability and we should fix it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable

2016-07-27 Thread Jan Filipiak (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15395478#comment-15395478
 ] 

Jan Filipiak commented on KAFKA-3705:
-

Something that starts happening to us is that for low cardinality columns on 
the join, the prefix scan on the rocks can return a big amount of values. That 
leads to to much time spent between poll() and us loosing group membership. One 
could check the need for a poll() on the consumer while context.forward() 
maybe. The fix with setting session time-out very high, that we are currently 
using is not that good IMO

> Support non-key joining in KTable
> -
>
> Key: KAFKA-3705
> URL: https://issues.apache.org/jira/browse/KAFKA-3705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Liquan Pei
>  Labels: api
> Fix For: 0.10.1.0
>
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users 
> want to join a KTable A by key {{a}} with another KTable B by key {{b}} but 
> with a "foreign key" {{a}}, and assuming they are read from two topics which 
> are partitioned on {{a}} and {{b}} respectively, they need to do the 
> following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' 
> is partitioned on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables are read from two topics which are already 
> partitioned on {{a}}, users still need to do the pre-aggregation in order to 
> make the two joining streams to be on the same key. This is a draw-back from 
> programability and we should fix it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable

2016-07-06 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15365365#comment-15365365
 ] 

Guozhang Wang commented on KAFKA-3705:
--

[~jfilipiak] I am convinced that this combo-key is necessary to avoid out of 
ordering after talking with you offline. And I have updated the design proposal 
wiki accordingly: 
https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Non-key+KTable-KTable+Joins,
 feel free to take a look.

Just a random thought as for your use case specifically: are relation A, B, and 
C all need to be captured as a KTable (i.e. the records as binlog / etc from 
some database table)? If the one with foreign key can be captured just a stream 
(i.e. KStream), then what you can do is to re-model your computation as 
{{(stream Join table1) Join table2}}, where {stream Join table returns a 
stream}. And in Kafka Streams DSL you can just do 
{{stream.selectKey(table1.key).join(table1).selectKey(table2.key).join(table2)}}.

> Support non-key joining in KTable
> -
>
> Key: KAFKA-3705
> URL: https://issues.apache.org/jira/browse/KAFKA-3705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Liquan Pei
>  Labels: api
> Fix For: 0.10.1.0
>
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users 
> want to join a KTable A by key {{a}} with another KTable B by key {{b}} but 
> with a "foreign key" {{a}}, and assuming they are read from two topics which 
> are partitioned on {{a}} and {{b}} respectively, they need to do the 
> following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' 
> is partitioned on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables are read from two topics which are already 
> partitioned on {{a}}, users still need to do the pre-aggregation in order to 
> make the two joining streams to be on the same key. This is a draw-back from 
> programability and we should fix it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable

2016-07-01 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15359835#comment-15359835
 ] 

Guozhang Wang commented on KAFKA-3705:
--

Yes, this does finally clarify your scenario, thanks!.

I think the change<> pair can still help in your case, because it has the 
benefit that for aggregations for example, you have the clear information that 
"subtract the old value, and add the new value" instead of depending on whether 
the returned value is null. For example, the Streams DSL defines the 
aggregation operator in the following way (note that in your customized 
implementation you do not need to strictly follow the same pattern, but just to 
illustrate this idea):

{code}
 KTable aggregate(Initializer initializer,
   Aggregator adder,
   Aggregator substractor, ...);
{code}


Let me try again with your example code and the aggregation pattern in the 
above Streams DSL, and if you do not agree let's have a small skype chat :)


1. Suppose your current value in Table B is

B.PK => B.V.old, which contains A.PK.old, C.PK, etc.

And when you join tables A and B, you repartition the stream B by A.PK while 
still maintaining the message format as B.BK => B.V, and the join result is in 
the format of: 

B.PK => join


2. Now suppose you have an update on Table B, as B.PK => B.V.new, which 
contains A.PK.new, C.PK (same value), etc. And suppose it is represented as a 
change pair of {old, new}, i.e.

B.PK => {B.V.old, B.V.new}, or more specifically:

B.PK => {, }


3. When you repartition it based on A.PK value, this will result in two pairs 
sending to potentially two different partitions, as:

B.PK => {B.V.old, null}   (sent to partition1)

B.PK => {null, B.V.new}(sent to partition2)


4. These two records will be joined independently at two processors, each 
fetching one of the re-partitioned topic partition, and the result is:

B.PK => {joined(B.V.old, A.V.old), null}   (here A.V.old corresponds to the 
value for key A.PK.old in Table A)

B.PK => {null, joined(B.V.new, A.V.new)}   (here A.V.new corresponds to the 
value for key A.PK.new in Table A)


and then they will be sent to the second topic that is partitioned on C.PK, and 
since their C.PK value is the same, they will be sent to the same partition, 
but in arbitrary order.


5. The aggregation function consumes from the second re-partition topic based 
on C.PK, and does the aggregation by 1) call a subtract function on the old 
value of the pair, and then 2) call an add function on the new value of the 
pair, and if the value is null, skip that call. And more specifically the 
subtract / add functions look like:

{code}
List> subtractor.apply(C key, Joined value, List> 
current)
{
   current.remove(key)

   return m.entrySet.asList
}

List> adder.apply(C key, Joined value, List> 
current)
{
   current.put(key, value)

   return m.entrySet.asList
}
{code}

And based on the order these two records are received, we will either call 
{{subtract(C.PK, joined(B.V.old, A.V.old), current)}} first, and then 
{{add(C.PK, joined(B.V.new, A.V.new), current}}, or vice versa, and either way 
it is correct, since {{B.V.old}} and {{B.V.new}} are different keys.

> Support non-key joining in KTable
> -
>
> Key: KAFKA-3705
> URL: https://issues.apache.org/jira/browse/KAFKA-3705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Liquan Pei
>  Labels: api
> Fix For: 0.10.1.0
>
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users 
> want to join a KTable A by key {{a}} with another KTable B by key {{b}} but 
> with a "foreign key" {{a}}, and assuming they are read from two topics which 
> are partitioned on {{a}} and {{b}} respectively, they need to do the 
> following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' 
> is partitioned on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables are read from two topics which are already 
> partitioned on {{a}}, users still need to do the pre-aggregation in order to 
> make the two joining streams to be on the same key. This is a draw-back from 
> programability and we should fix it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable

2016-07-01 Thread Jan Filipiak (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15358566#comment-15358566
 ] 

Jan Filipiak commented on KAFKA-3705:
-

Hi, yes that is kinda where I am coming from. I completely understand where you 
are. 
Doing the change log case ( logging Change<> objects) is just one 
implementation of this repartitioning and mine is another one. I am very 
familiar with my approach as I wrote some Samza apps using this approach. It 
has many benefits that may or may not be of interest. (repartition-topics can 
also be used to bootstrap, fewer copies of the data (no need to make state HA, 
see previous) etc.). What we are still missing here is a mutual understanding 
of what I think keywidening does and how to expose that to users in a non 
insane manner.

Maybe I try it with your Json syntax. This is the very example we have and 
where this tickets feature would allow me to build it in the dsl level of the 
api.

So lets say I have 3 tables. A, B, C, i want to reach a point where I have C => 
> this will then be read by our application servers and 
servers them as a faster way to retrieves this than lets say the original 
mysql. B has foreign keys in A and C.

 
All tables start of as one topic. keyed by this tables primary key
Topic mysq__jadajadajada_A
A.PK => A
Topic mysq_B
B.PK => B
Topic mysq_C
C.PK => C

I am going to repartition B to A.PK now. In the first example without a widened 
key.
Then it stays B.PK => B but partitioned by A.PK accordingly.

then I can do the join with A and get
B.PK => joined

as of your previous comment:
{quote}
Then a join result of
{a="a1", joined = join("a1-pre", "c1")} 
{quote}
Note the Key stays B.PK (unwindened).
Now I am going to repartition based on C.PK still maintaining
B.PK => joined
as the topic layout. 
Now, shit hits the fan. As I am doing my aggregation to become 
C,PK => List>

How would this aggregator looks now?

{code:java}
List> apply(B key, Joined value, List> current)
{
   Map m = listToMap(current, bKeyExtractorValueMapper,B.PK>);
   if(value == null)
   { 
  m.remove(key)
   }else
   {
 m.put(key,value)
   }
   return m.entrySet.asList

}
{code}

This wouldn't be much different with logged Changes> only the 
remove and add would be to methods. The problem is, that it doesn't
look wrong. But this code now has race conditions. Think about an update to the 
A.PK field of a B record that forces it to switch partitions.
(the C.PK value remains) then we publish a delete to the old partition and the 
new value to the new partition. Then we do the join. then we repartition on the 
non changed C.PK. This will make out code above see B.PK => null /remove B.PK 
=> Joined /add in no particular order. Hence the output is undefined. If 
we had forcefully by api widened the key to be Joined the error 
would not happen and users would be aware of what happens on repartitioning. I 
thought this through and it also happens with logging Change<>, as it is really 
just another implementation.

I hope this finally clarifies that key widening I am talking about. If not, 
maybe we should have a small skype or something. 
My recommendation is further to not implement this joins as logged Changes<> as 
it is just more resource intensive and less efficient also making the api more 
complicated.

PS.: Hive has seen all join types with MapJoins, Skewed Joines, you name it. 
all these are applicable to streams aswell. Maybe have them in the back of your 
head.







> Support non-key joining in KTable
> -
>
> Key: KAFKA-3705
> URL: https://issues.apache.org/jira/browse/KAFKA-3705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Liquan Pei
>  Labels: api
> Fix For: 0.10.1.0
>
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users 
> want to join a KTable A by key {{a}} with another KTable B by key {{b}} but 
> with a "foreign key" {{a}}, and assuming they are read from two topics which 
> are partitioned on {{a}} and {{b}} respectively, they need to do the 
> following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' 
> is partitioned on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables are read from two topics which are already 
> partitioned on {{a}}, users still need to do the pre-aggregation in order to 
> make the two joining streams to be on the same key. This is a draw-back from 
> programability and we should fix it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable

2016-06-30 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357696#comment-15357696
 ] 

Guozhang Wang commented on KAFKA-3705:
--

Yeah this clarifies a lot. I see that you are not trying to creating a "table" 
inside Kafka Streams, but just want to re-partition the input binlog stream, 
and you are working on the lower-level APIs  (originally I was confused since 
you mention "KTable" which is only available in the higher-level DSL). And here 
is my understanding / suggestions:

1. the original Kafka topic that was directly piped from your MySQL bin-log has 
the form (key -> value)

a -> {a', other-fields}

and you want to repartition it into a new topic on field a', and also log 
compacted on a'.


2. so instead of representing a delete / update record as "a -> null" / "a -> 
a'-new, other-fields-new", you can send the messages in a different format 
(there are already a few tools including Kafka Connect which allows to 
represent your binlog entries with such flexibility):

a -> {pair{a'-old, a'-new}, other-fields-current-value }

So that deletion becomes:   a -> {pair{a'-old, null}, other-fields}


3. in this case, you can access the value field to extract a' for partitioning 
even for deletion cases, and also for an update record, you can then send two 
records as the following to the re-partition topic:


a'-old -> null,

a'-new -> other-fields.


Will that work for you?



> Support non-key joining in KTable
> -
>
> Key: KAFKA-3705
> URL: https://issues.apache.org/jira/browse/KAFKA-3705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Liquan Pei
>  Labels: api
> Fix For: 0.10.1.0
>
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users 
> want to join a KTable A by key {{a}} with another KTable B by key {{b}} but 
> with a "foreign key" {{a}}, and assuming they are read from two topics which 
> are partitioned on {{a}} and {{b}} respectively, they need to do the 
> following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' 
> is partitioned on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables are read from two topics which are already 
> partitioned on {{a}}, users still need to do the pre-aggregation in order to 
> make the two joining streams to be on the same key. This is a draw-back from 
> programability and we should fix it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable

2016-06-27 Thread Jan Filipiak (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15352103#comment-15352103
 ] 

Jan Filipiak commented on KAFKA-3705:
-

I will just shoot a quick reply now, time somehow became sparse recently. 
Anyhow. The bottom line of our misunderstandings is always the same thing. My 
bad that I didn't see the wiki page, if that Range-query interface is addressed 
that's nice :D.

Point 3 is the one that causes the most confusion I guess. In the repartition 
case we follow different pathes, where I am not sure that I was able to 
communicate mine well enough. I <3 the idea of having everything a derived 
store. ITE all this is beeing used to tail -F mysql-.bin | kafka | XXX | 
redis, therefore Redis become a derived store of mysql wich can be used for 
NoSql style reads. I infact am such a great fan of this concept that I tend to 
treat everything a derived store. For me this means a repartitioned topic is a 
derived store of the source topic. This stands in contrast to make a changelog 
out of it and materialize the changelog in say RocksDb. This leads to the 
"problem" that the changelog topic is not a derived store anymore. Wich gives 
me a personally bad feeling, it just pushes me out of my comfort zone. 
Confluent peeps seem to be in their comfort zone with change logging topics. In 
my narrative shit hits the fan when the property of beeing a derived store is 
lost. It leads to all the nasty things like beeing in the need of change 
logging your say RocksDbs as the intermidate topic wont hold stuff forever. 

In contrast to having a change-logging topic that I re-materialize and then 
changecapture again, I prefer todo the change capturing first and only maintain 
the state to wich downstream partitions a record is currently published. This 
works clean and nicely but brings with it what I call "key widening". Say I 
have KTable A and i want to repartition it to A' so that the topic containing 
A' is a derived store & logcompacted. Then I cant use Key todo this for 2 
reasons. The Stream partition, can only access the key to determine the 
partition to delete from  (deletes come as null values), wich means the fields 
going to determine the partitions need to be in the key no matter what. Snippet:
{code:java}

topology.addSink(name, repartitionTopicName, new 
StreamPartitioner(){
private Serializer intermediateSerializer = 
intermediateSerde.serializer();
@Override
public Integer partition(K key, VR value, int 
numPartitions) {
KL newKey = intermideateKeyExtractor.apply(key);
//Copied from Default Partitioner, didn't want 
to create a CLUSTER object here to reuse it.
return 
(Utils.murmur2(intermediateSerializer.serialize(repartitionTopicName, newKey)) 
% numPartitions )& 0x7fff;
}

}, repartitionProcessorName);
{code}

As you can see the result Key K contains the KL ( the key of the not 
repatitioned table).

the second reason why this key must be there is that one needs to be able to 
build a derived stream A''. But since in A' a record can "move" from partition 
X to Y there is a race condition between the "insert" in Y and the delete in X. 
The repartitioner Processor repartitioning for A'' needs to treat them as 
different keys. If it would be the same key the delete would wipe the new value 
maybe. This puts downstream consumers of A'' also in the wired position that at 
any point in time there can be as many A-keys with the same value as there are 
A' partitons -1 or a specific A key might vanish completly and then reappear. 
Wich is sometimes wanky to work around in the end application. But there is 
enough strategies to solve at least the multiple Akeys case, not so much for 
the complete fanish case. I hope this clarrifies stuff. 





> Support non-key joining in KTable
> -
>
> Key: KAFKA-3705
> URL: https://issues.apache.org/jira/browse/KAFKA-3705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Liquan Pei
>  Labels: api
> Fix For: 0.10.1.0
>
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users 
> want to join a KTable A by key {{a}} with another KTable B by key {{b}} but 
> with a "foreign key" {{a}}, and assuming they are read from two topics which 
> are partitioned on {{a}} and {{b}} respectively, they need to do the 
> following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' 
> is partitioned on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two

[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable

2016-06-20 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15340814#comment-15340814
 ] 

Guozhang Wang commented on KAFKA-3705:
--

Thanks for the feedbacks!

Re 1: Not sure I fully understand this. I thought you can pass a 
{{StreamPartitioner}} when calling {{addSink}} which should be sufficient?

Re 2: We are aware of this, and as discussed in the wiki our current proposal 
is that we can use sth. similar to what you mentioned as {{range(K1 prefix)}} 
and check if {{key.startsWith(prefix)}} to stop iterating. There are some 
optimizations with prefix seeking in RocksDB but we need to contribute back to 
RocksDB's JNI to make use of it.

https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Non-key+KTable-KTable+Joins#Discussion:Non-keyKTable-KTableJoins-Simpleapproach:seekwithkeydirectly

Re 3: The idea is that for the repatitioning, we are going to first transform 
the old key-value pair  into >, but when sending the 
key-value pair to the re-partition topic, specify the {{StreamPartitioner}} to 
partition based on combo  (remember its assign API takes both the key 
and value), and let the joiner after the repartitioning to be applied on  
only.

When old value needs to be sent as well, we are going to send the {new, old} 
pair separately as two record: >, and >, and still partition on combo  and . These two 
records may be sent to two different partitions and hence processed by two 
different processors, which are expected behavior. Does that look reasonable to 
you?

> Support non-key joining in KTable
> -
>
> Key: KAFKA-3705
> URL: https://issues.apache.org/jira/browse/KAFKA-3705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Liquan Pei
>  Labels: api
> Fix For: 0.10.1.0
>
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users 
> want to join a KTable A by key {{a}} with another KTable B by key {{b}} but 
> with a "foreign key" {{a}}, and assuming they are read from two topics which 
> are partitioned on {{a}} and {{b}} respectively, they need to do the 
> following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' 
> is partitioned on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables are read from two topics which are already 
> partitioned on {{a}}, users still need to do the pre-aggregation in order to 
> make the two joining streams to be on the same key. This is a draw-back from 
> programability and we should fix it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable

2016-06-20 Thread Jan Filipiak (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15340779#comment-15340779
 ] 

Jan Filipiak commented on KAFKA-3705:
-

A few things I came accross building the current implementation based on the 
processor API.

1. Partitioning
I ended up with the need of passing an additional ValueMapper into the 
method. I had to use it in the Sinks partitioner to extract the 
_partition/join-key_ from the key that is used for the repartition topic. It 
had to be extracted from the key as I still need to be able to pass nullvalues 
to the correct partition for deletes. This came from not knowing the number of 
partitions in the processor but only in the partitoner, this made the "API" 
kinda complicated. 

2. Range Select
This ValueMapper mentioned above also had to be passed into the 
RocksDBIterator. Havin KeyValueIterator range(K from, K to) is not 
"natural" for prefix range querries. KeyValueIterator range(K1 prefix) 
where Serde needs to produce prefixbytes of Serde

3. Key expansion
After a join in this fashion, the key is what I started refering to as widened. 
Say you have KTable and it is the table that needs to be repartitioned 
and KP is the repartition key, then, independently on the other table the new 
Key of the table must include KR and AK, wich is a wired thing compared to the 
traditonal relational database way. Imagin having a result table as 
KTable,Pair> then the used to be unique key AK is not unique 
anymore, the processor might see the insert in the one partition before the 
delete in the other (eg when the rows KP was update). I think this should be 
embrased, because that is how it is. It should just be apparent for the user 
maybe as it needs to be dealt with in downstream processors.

Unrelated to the topic of joining, the processor api not necessarily 
comfortable, I appreaceate the beauty of the threading model but stiching 
graphs together based on processornames and strings is more tricky than I 
tought. Anyhow really nice stream processing framework. It feels and looks so 
much better than what is out there spark or storm. Watching their desprate 
attempts to put state in is a joy. Nice work. As soon as our implementation is 
hardend in production, Ill probably can share.


> Support non-key joining in KTable
> -
>
> Key: KAFKA-3705
> URL: https://issues.apache.org/jira/browse/KAFKA-3705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Liquan Pei
>  Labels: api
> Fix For: 0.10.1.0
>
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users 
> want to join a KTable A by key {{a}} with another KTable B by key {{b}} but 
> with a "foreign key" {{a}}, and assuming they are read from two topics which 
> are partitioned on {{a}} and {{b}} respectively, they need to do the 
> following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' 
> is partitioned on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables are read from two topics which are already 
> partitioned on {{a}}, users still need to do the pre-aggregation in order to 
> make the two joining streams to be on the same key. This is a draw-back from 
> programability and we should fix it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable

2016-06-03 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15314643#comment-15314643
 ] 

Guozhang Wang commented on KAFKA-3705:
--

Another thing we may want to consider is to use a different memtable option in 
RocksDB: 
https://github.com/facebook/rocksdb/wiki/Hash-based-memtable-implementations

> Support non-key joining in KTable
> -
>
> Key: KAFKA-3705
> URL: https://issues.apache.org/jira/browse/KAFKA-3705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Liquan Pei
>  Labels: api
> Fix For: 0.10.1.0
>
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users 
> want to join a KTable A by key {{a}} with another KTable B by key {{b}} but 
> with a "foreign key" {{a}}, and assuming they are read from two topics which 
> are partitioned on {{a}} and {{b}} respectively, they need to do the 
> following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' 
> is partitioned on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables are read from two topics which are already 
> partitioned on {{a}}, users still need to do the pre-aggregation in order to 
> make the two joining streams to be on the same key. This is a draw-back from 
> programability and we should fix it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)