[ 
https://issues.apache.org/jira/browse/KAFKA-7502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-7502:
---------------------------------
    Description: 
Today since we pre-create all the `KTableXXX` operator along with the logical 
node, we are effectively duplicating the logic to determine whether the 
resulted KTable should be materialized. More specifically, the materialization 
principle today is that:

1) If users specified Materialized in the DSL and it contains a queryable name. 
We always materialize.
2) If users specified Materialized in the DSL but not contains a queryable 
name, or if users do not specify a Materialized object at all, Streams may 
choose to materialize or not. But in any cases, even if the KTable is 
materialized it will not be queryable since there's no queryable name (i.e. 
only storeName is not null, but queryableName is null):
2.a) If the resulted KTable is from an aggregation, we always materialize since 
it is needed for storing the aggregation (i.e. we use the MaterializedInternal 
constructor with nameProvider != null).
2.b) If the resulted KTable is from a source topic, we delay the 
materialization until the downstream operator requires this KTable to be 
materialized or send-old-values (see `KTableSourceNode` and `KTableSource`).
2.c) If the resulted KTable if from a join, we always materialize if users 
creates a Materialized object even without a queryable name. However this can 
be optimized similar to 2.b) but is orthogonal to this ticket (see 
`KTableImpl#buildJoin` where we always use constructor with nameProvider != 
null).
2.d) If the resulted KTable is from a stateless operation like filter / 
mapValues, we never materialize.

------------

Now, in all of these cases, we have logical node like "KTableKTableJoinNode", 
as well as physical node like `ProcessorNode`. Ideally we should always create 
the logical Plan (i.e. the StreamsGraph), and then optimize it if necessary, 
and then generate the physical plan (i.e. the Topology), however today we 
create some physical nodes beforehand, and the above logic is hence duplicated 
in the creation of both physical nodes and logical nodes. For example, in 
`KTableKTableJoinNode` we check if Materialized is null for adding a state 
store, and in `KTableImpl#doJoin` we check if materialized is specified (case 
2.c) above). 

Another example is in TableProcessorNode which is used for 2.d) above, in which 
it includes the logic whereas its caller, `KTableImpl#doFilter` for example, 
also contains the logic when deciding to pass `queryableName` parameter to 
`KTableProcessorSupplier`.

This is bug-vulnerable since we may update the logic in one class but forgot to 
update the other class.

--------------

What we want to have is a cleaner code path similar to what we have for 2.b), 
such that when creating the logical nodes we keep track of whether 1) 
materialized is specified, and 2) queryable name is provided. And during 
optimization phase, we may change the inner physical ProcessorBuilder's 
parameters like queryable name etc, and then when it is time to generate the 
physical node, we can just blindly take the parameters and go for it.


  was:
Today since we pre-create all the `KTableXXX` operator along with the logical 
node, we are effectively duplicating the logic to determine whether the 
resulted KTable should be materialized. More specifically, the materialization 
principle today is that:

1) If users specified Materialized in the DSL and it contains a queryable name. 
We always materialize.
2) If users specified Materialized in the DSL but not contains a queryable 
name, or if users do not specify a Materialized object at all, Streams may 
choose to materialize or not. But in any cases, even if the KTable is 
materialized it will not be queryable since there's no queryable name (i.e. 
only storeName is not null, but queryableName is null):
2.a) If the resulted KTable is from an aggregation, we always materialize since 
it is needed for storing the aggregation (i.e. we use the MaterializedInternal 
constructor with nameProvider != null).
2.b) If the resulted KTable is from a source topic, we delay the 
materialization until the downstream operator requires this KTable to be 
materialized or send-old-values (see `KTableSourceNode` and `KTableSource`).
2.c) If the resulted KTable if from a join, we always materialize. However this 
can be optimized similar to 2.b) but is orthogonal to this ticket (see 
`KTableImpl#buildJoin` where we always use constructor with nameProvider != 
null).
2.d) If the resulted KTable is from a stateless operation like filter / 
mapValues, we never materialize.

------------

Now, in all of these cases, we have logical node like "KTableKTableJoinNode", 
as well as physical node like `ProcessorNode`. Ideally we should always create 
the logical Plan (i.e. the StreamsGraph), and then optimize it if necessary, 
and then generate the physical plan (i.e. the Topology), however today we 
create some physical nodes beforehand, and the above logic is hence duplicated 
in the creation of both physical nodes and logical nodes. For example, in 
`KTableKTableJoinNode` we check if Materialized is null for adding a state 
store, and in `KTableImpl#doJoin` we check if materialized is specified (case 
2.c) above). 

Another example is in TableProcessorNode which is used for 2.d) above, in which 
it includes the logic whereas its caller, `KTableImpl#doFilter` for example, 
also contains the logic when deciding to pass `queryableName` parameter to 
`KTableProcessorSupplier`.

This is bug-vulnerable since we may update the logic in one class but forgot to 
update the other class.

--------------

What we want to have is a cleaner code path similar to what we have for 2.b), 
such that when creating the logical nodes we keep track of whether 1) 
materialized is specified, and 2) queryable name is provided. And during 
optimization phase, we may change the inner physical ProcessorBuilder's 
parameters like queryable name etc, and then when it is time to generate the 
physical node, we can just blindly take the parameters and go for it.



> Cleanup KTable materialization logic in a single place
> ------------------------------------------------------
>
>                 Key: KAFKA-7502
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7502
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Lee Dongjin
>            Priority: Major
>
> Today since we pre-create all the `KTableXXX` operator along with the logical 
> node, we are effectively duplicating the logic to determine whether the 
> resulted KTable should be materialized. More specifically, the 
> materialization principle today is that:
> 1) If users specified Materialized in the DSL and it contains a queryable 
> name. We always materialize.
> 2) If users specified Materialized in the DSL but not contains a queryable 
> name, or if users do not specify a Materialized object at all, Streams may 
> choose to materialize or not. But in any cases, even if the KTable is 
> materialized it will not be queryable since there's no queryable name (i.e. 
> only storeName is not null, but queryableName is null):
> 2.a) If the resulted KTable is from an aggregation, we always materialize 
> since it is needed for storing the aggregation (i.e. we use the 
> MaterializedInternal constructor with nameProvider != null).
> 2.b) If the resulted KTable is from a source topic, we delay the 
> materialization until the downstream operator requires this KTable to be 
> materialized or send-old-values (see `KTableSourceNode` and `KTableSource`).
> 2.c) If the resulted KTable if from a join, we always materialize if users 
> creates a Materialized object even without a queryable name. However this can 
> be optimized similar to 2.b) but is orthogonal to this ticket (see 
> `KTableImpl#buildJoin` where we always use constructor with nameProvider != 
> null).
> 2.d) If the resulted KTable is from a stateless operation like filter / 
> mapValues, we never materialize.
> ------------
> Now, in all of these cases, we have logical node like "KTableKTableJoinNode", 
> as well as physical node like `ProcessorNode`. Ideally we should always 
> create the logical Plan (i.e. the StreamsGraph), and then optimize it if 
> necessary, and then generate the physical plan (i.e. the Topology), however 
> today we create some physical nodes beforehand, and the above logic is hence 
> duplicated in the creation of both physical nodes and logical nodes. For 
> example, in `KTableKTableJoinNode` we check if Materialized is null for 
> adding a state store, and in `KTableImpl#doJoin` we check if materialized is 
> specified (case 2.c) above). 
> Another example is in TableProcessorNode which is used for 2.d) above, in 
> which it includes the logic whereas its caller, `KTableImpl#doFilter` for 
> example, also contains the logic when deciding to pass `queryableName` 
> parameter to `KTableProcessorSupplier`.
> This is bug-vulnerable since we may update the logic in one class but forgot 
> to update the other class.
> --------------
> What we want to have is a cleaner code path similar to what we have for 2.b), 
> such that when creating the logical nodes we keep track of whether 1) 
> materialized is specified, and 2) queryable name is provided. And during 
> optimization phase, we may change the inner physical ProcessorBuilder's 
> parameters like queryable name etc, and then when it is time to generate the 
> physical node, we can just blindly take the parameters and go for it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to