[ 
https://issues.apache.org/jira/browse/KAFKA-7502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin updated KAFKA-7502:
-------------------------------
    Priority: Major  (was: Minor)

> Cleanup KTable materialization logic in a single place
> ------------------------------------------------------
>
>                 Key: KAFKA-7502
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7502
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Lee Dongjin
>            Priority: Major
>
> Today since we pre-create all the `KTableXXX` operator along with the logical 
> node, we are effectively duplicating the logic to determine whether the 
> resulted KTable should be materialized. More specifically, the 
> materialization principle today is that:
> 1) If users specified Materialized in the DSL and it contains a queryable 
> name. We always materialize.
> 2) If users specified Materialized in the DSL but not contains a queryable 
> name, or if users do not specify a Materialized object at all, Streams may 
> choose to materialize or not. But in any cases, even if the KTable is 
> materialized it will not be queryable since there's no queryable name (i.e. 
> only storeName is not null, but queryableName is null):
> 2.a) If the resulted KTable is from an aggregation, we always materialize 
> since it is needed for storing the aggregation (i.e. we use the 
> MaterializedInternal constructor with nameProvider != null).
> 2.b) If the resulted KTable is from a source topic, we delay the 
> materialization until the downstream operator requires this KTable to be 
> materialized or send-old-values (see `KTableSourceNode` and `KTableSource`).
> 2.c) If the resulted KTable if from a join, we always materialize. However 
> this can be optimized similar to 2.b) but is orthogonal to this ticket (see 
> `KTableImpl#buildJoin` where we always use constructor with nameProvider != 
> null).
> 2.d) If the resulted KTable is from a stateless operation like filter / 
> mapValues, we never materialize.
> ------------
> Now, in all of these cases, we have logical node like "KTableKTableJoinNode", 
> as well as physical node like `ProcessorNode`. Ideally we should always 
> create the logical Plan (i.e. the StreamsGraph), and then optimize it if 
> necessary, and then generate the physical plan (i.e. the Topology), however 
> today we create some physical nodes beforehand, and the above logic is hence 
> duplicated in the creation of both physical nodes and logical nodes. For 
> example, in `KTableKTableJoinNode` we check if Materialized is null for 
> adding a state store, and in `KTableImpl#doJoin` we check if materialized is 
> specified (case 2.c) above). 
> Another example is in TableProcessorNode which is used for 2.d) above, in 
> which it includes the logic whereas its caller, `KTableImpl#doFilter` for 
> example, also contains the logic when deciding to pass `queryableName` 
> parameter to `KTableProcessorSupplier`.
> This is bug-vulnerable since we may update the logic in one class but forgot 
> to update the other class.
> --------------
> What we want to have is a cleaner code path similar to what we have for 2.b), 
> such that when creating the logical nodes we keep track of whether 1) 
> materialized is specified, and 2) queryable name is provided. And during 
> optimization phase, we may change the inner physical ProcessorBuilder's 
> parameters like queryable name etc, and then when it is time to generate the 
> physical node, we can just blindly take the parameters and go for it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to