Guozhang Wang created KAFKA-13371:
-------------------------------------

             Summary: Consider consolidating Joined / StreamJoined / TableJoined
                 Key: KAFKA-13371
                 URL: https://issues.apache.org/jira/browse/KAFKA-13371
             Project: Kafka
          Issue Type: Improvement
          Components: streams
            Reporter: Guozhang Wang


This is an idea while reviewing KAFKA-13261 (adding TabledJoined). We have now 
three control objects: Joined, StreamJoined, TableJoined. All of them extends 
NamedOperations and hence has the `name` field inherited which would be used 
for the processor node's name and potentially store names. In addition to that

* Joined: used in stream-table joins. Contains key and two value serdes used 
for serializing the bytes for repartitioning (however since today we only 
repartition one side if needed, the other value serde is never used). 
* StreamJoined: used in stream-stream joins. It includes the serdes, AND also 
the store suppliers and other control variables on the store names.
* TableJoined: used in table-table foreign key joins. It does not include any 
serdes but includes the partitioner information.

The main difference between these different constructs are:

* KTables themselves have embedded a materialized mechanism via 
`valueGetterSupplier` whenever they are created, either from source, or from 
aggregate / join operators, so they do not need extra materialization 
indicators when participated in a follow-up join --- i.e. they either are 
already materialized from the operators that generate them, or they will 
"grandfather" back to the upstream KTable on the fly with a logical view when 
that view is being fetched via the `ValueGetterSupplier`. On the other hand, 
KStreams do not have materialization mechanism inherently and hence operators 
that do need to materialize the streams then need to provide such methods.
* Table-table foreign-key join has a special needs for partitioners.

[~vvcephei] has a good proposal for 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar and 
as part of that proposal we could consider adding partitioner for source 
streams / tables and inherit throughout the topology pipeline. Following that 
idea, we can consider consolidating the above "Joined" objects by isolating the 
materialization / partitioner variables. More specifically, here's a concrete 
proposal:

1) `StreamsBuilder.table/stream` would pass in an optional partitioner.
2) And similarly all operators that changes the key would allow an optional 
partitioner:
2.a) `KStream.repartition/groupBy` and `KTable.groupBy` would allow an optional 
partitioner in `Repartitioned`, as piggy-backed we would also deprecate 
`Grouped` with `Repartitioned` since the latter would subsume the former.
2.b) `KStream.map/flatMap/selectKey` stays as is, and similar to serdes, these 
operators would stop the inheritance of partitioners of the upstream entities.
3) `Repartition` would also add the key/value serdes used for serializing for 
the repartition topics.
4) `KStream.join(KTable)` and `KStream.join(KStream)` would pass in an optional 
`Repartitioned` in addition to `Joined` which can be used to encode the 
partitioner info.
5) Foreign-key `KTable.join(KTable)` would pass in an optional `Repartitioned` 
which can be used to encode the partitioner info.
7) As a result of all above points, we can then reduce `StreamJoined` / 
`TableJoined` / `Joined` since all their enwrapped control objects are not 
separated in `Repartitioned` and `Materialized`: note that for `StreamJoined`, 
the store suppliers / names / configs would now be wrapped in two Materialized 
objects which would still not be exposed for IQ.







--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to