[ https://issues.apache.org/jira/browse/KAFKA-7502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lee Dongjin updated KAFKA-7502: ------------------------------- Priority: Major (was: Minor) > Cleanup KTable materialization logic in a single place > ------------------------------------------------------ > > Key: KAFKA-7502 > URL: https://issues.apache.org/jira/browse/KAFKA-7502 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Guozhang Wang > Assignee: Lee Dongjin > Priority: Major > > Today since we pre-create all the `KTableXXX` operator along with the logical > node, we are effectively duplicating the logic to determine whether the > resulted KTable should be materialized. More specifically, the > materialization principle today is that: > 1) If users specified Materialized in the DSL and it contains a queryable > name. We always materialize. > 2) If users specified Materialized in the DSL but not contains a queryable > name, or if users do not specify a Materialized object at all, Streams may > choose to materialize or not. But in any cases, even if the KTable is > materialized it will not be queryable since there's no queryable name (i.e. > only storeName is not null, but queryableName is null): > 2.a) If the resulted KTable is from an aggregation, we always materialize > since it is needed for storing the aggregation (i.e. we use the > MaterializedInternal constructor with nameProvider != null). > 2.b) If the resulted KTable is from a source topic, we delay the > materialization until the downstream operator requires this KTable to be > materialized or send-old-values (see `KTableSourceNode` and `KTableSource`). > 2.c) If the resulted KTable if from a join, we always materialize. However > this can be optimized similar to 2.b) but is orthogonal to this ticket (see > `KTableImpl#buildJoin` where we always use constructor with nameProvider != > null). > 2.d) If the resulted KTable is from a stateless operation like filter / > mapValues, we never materialize. > ------------ > Now, in all of these cases, we have logical node like "KTableKTableJoinNode", > as well as physical node like `ProcessorNode`. Ideally we should always > create the logical Plan (i.e. the StreamsGraph), and then optimize it if > necessary, and then generate the physical plan (i.e. the Topology), however > today we create some physical nodes beforehand, and the above logic is hence > duplicated in the creation of both physical nodes and logical nodes. For > example, in `KTableKTableJoinNode` we check if Materialized is null for > adding a state store, and in `KTableImpl#doJoin` we check if materialized is > specified (case 2.c) above). > Another example is in TableProcessorNode which is used for 2.d) above, in > which it includes the logic whereas its caller, `KTableImpl#doFilter` for > example, also contains the logic when deciding to pass `queryableName` > parameter to `KTableProcessorSupplier`. > This is bug-vulnerable since we may update the logic in one class but forgot > to update the other class. > -------------- > What we want to have is a cleaner code path similar to what we have for 2.b), > such that when creating the logical nodes we keep track of whether 1) > materialized is specified, and 2) queryable name is provided. And during > optimization phase, we may change the inner physical ProcessorBuilder's > parameters like queryable name etc, and then when it is time to generate the > physical node, we can just blindly take the parameters and go for it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)