[ https://issues.apache.org/jira/browse/KAFKA-6398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Guozhang Wang updated KAFKA-6398: --------------------------------- Description: For any operator that generates a KTable, its {{valueGetterSupplier}} has three code path: 1. If the operator is a KTable source operator, using its materialized state store for value getter (note that currently we always materialize on KTable source). 2. If the operator is an aggregation operator, then its generated KTable should always be materialized so we just use its materialized state store. 3. Otherwise, we treat the value getter in a per-operator basis. For 3) above, what we SHOULD do is that, if the generated KTable is materialized, the value getter would just rely on its materialized state store to get the value; otherwise we just rely on the operator itself to define which parent's value getter to inherit and what computational logic to apply on-the-fly to get the value. For example, for {{KTable#filter()}} where the {{Materialized}} is not specified, in {{KTableFilterValueGetter}} we just get from parent's value getter and then apply the filter on the fly; and in addition we should let the future operators to be able to access its parent's materialized state store via {{connectProcessorAndStateStore}}. However, current code does not do this correctly: it 1) does not check if the result KTable is materialized or not, but always try to use its parent's value getter, and 2) it does not try to connect its parent's materialized store to the future operator. As a result, these operators such as {{KTable#filter}}, {{KTable#mapValues}}, and {{KTable#join(KTable)}} would result in TopologyException when building. The following is an example: ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ Using a non-materialized KTable in a stream-table join fails: {noformat} final KTable filteredKTable = builder.table("table-topic").filter(...); builder.stream("stream-topic").join(filteredKTable,...); {noformat} fails with {noformat} org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: StateStore null is not added yet. at org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStore(TopologyBuilder.java:1021) at org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStores(TopologyBuilder.java:949) at org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:621) at org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:577) at org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:563) {noformat} Adding a store name is not sufficient as workaround but fails differently: {noformat} final KTable filteredKTable = builder.table("table-topic").filter(..., "STORE-NAME"); builder.stream("stream-topic").join(filteredKTable,...); {noformat} error: {noformat} org.apache.kafka.streams.errors.StreamsException: failed to initialize processor KSTREAM-JOIN-0000000005 at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:113) at org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:339) at org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:153) Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Processor KSTREAM-JOIN-0000000005 has no access to StateStore KTABLE-SOURCE-STATE-STORE-0000000000 at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:69) at org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.init(KTableSourceValueGetterSupplier.java:45) at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.init(KTableFilter.java:121) at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:44) at org.apache.kafka.streams.processor.internals.ProcessorNode$2.run(ProcessorNode.java:53) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:111) {noformat} One can workaround by piping the result through a topic: {noformat} final KTable filteredKTable = builder.table("table-topic").filter(...).through("TOPIC");; builder.stream("stream-topic").join(filteredKTable,...); {noformat} ------------------------------------------------------------------------------------------------------------ Note that there is another minor orthogonal issue of {{KTable#filter}} itself that was: Using a non-materialized KTable in a stream-table join fails: {noformat} final KTable filteredKTable = builder.table("table-topic").filter(...); builder.stream("stream-topic").join(filteredKTable,...); {noformat} fails with {noformat} org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: StateStore null is not added yet. at org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStore(TopologyBuilder.java:1021) at org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStores(TopologyBuilder.java:949) at org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:621) at org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:577) at org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:563) {noformat} Adding a store name is not sufficient as workaround but fails differently: {noformat} final KTable filteredKTable = builder.table("table-topic").filter(..., "STORE-NAME"); builder.stream("stream-topic").join(filteredKTable,...); {noformat} error: {noformat} org.apache.kafka.streams.errors.StreamsException: failed to initialize processor KSTREAM-JOIN-0000000005 at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:113) at org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:339) at org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:153) Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Processor KSTREAM-JOIN-0000000005 has no access to StateStore KTABLE-SOURCE-STATE-STORE-0000000000 at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:69) at org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.init(KTableSourceValueGetterSupplier.java:45) at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.init(KTableFilter.java:121) at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:44) at org.apache.kafka.streams.processor.internals.ProcessorNode$2.run(ProcessorNode.java:53) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:111) {noformat} One can workaround by piping the result through a topic: {noformat} final KTable filteredKTable = builder.table("table-topic").filter(...).through("TOPIC");; builder.stream("stream-topic").join(filteredKTable,...); {noformat} > Non-aggregation KTable generation operator does not construct value getter > correctly > ------------------------------------------------------------------------------------ > > Key: KAFKA-6398 > URL: https://issues.apache.org/jira/browse/KAFKA-6398 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.11.0.1, 1.0.0 > Reporter: Matthias J. Sax > Assignee: Guozhang Wang > Priority: Critical > Labels: bug > > For any operator that generates a KTable, its {{valueGetterSupplier}} has > three code path: > 1. If the operator is a KTable source operator, using its materialized state > store for value getter (note that currently we always materialize on KTable > source). > 2. If the operator is an aggregation operator, then its generated KTable > should always be materialized so we just use its materialized state store. > 3. Otherwise, we treat the value getter in a per-operator basis. > For 3) above, what we SHOULD do is that, if the generated KTable is > materialized, the value getter would just rely on its materialized state > store to get the value; otherwise we just rely on the operator itself to > define which parent's value getter to inherit and what computational logic to > apply on-the-fly to get the value. For example, for {{KTable#filter()}} where > the {{Materialized}} is not specified, in {{KTableFilterValueGetter}} we just > get from parent's value getter and then apply the filter on the fly; and in > addition we should let the future operators to be able to access its parent's > materialized state store via {{connectProcessorAndStateStore}}. > However, current code does not do this correctly: it 1) does not check if the > result KTable is materialized or not, but always try to use its parent's > value getter, and 2) it does not try to connect its parent's materialized > store to the future operator. As a result, these operators such as > {{KTable#filter}}, {{KTable#mapValues}}, and {{KTable#join(KTable)}} would > result in TopologyException when building. The following is an example: > ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ > Using a non-materialized KTable in a stream-table join fails: > {noformat} > final KTable filteredKTable = builder.table("table-topic").filter(...); > builder.stream("stream-topic").join(filteredKTable,...); > {noformat} > fails with > {noformat} > org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology > building: StateStore null is not added yet. > at > org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStore(TopologyBuilder.java:1021) > at > org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStores(TopologyBuilder.java:949) > at > org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:621) > at > org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:577) > at > org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:563) > {noformat} > Adding a store name is not sufficient as workaround but fails differently: > {noformat} > final KTable filteredKTable = builder.table("table-topic").filter(..., > "STORE-NAME"); > builder.stream("stream-topic").join(filteredKTable,...); > {noformat} > error: > {noformat} > org.apache.kafka.streams.errors.StreamsException: failed to initialize > processor KSTREAM-JOIN-0000000005 > at > org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:113) > at > org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:339) > at > org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:153) > Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid > topology building: Processor KSTREAM-JOIN-0000000005 has no access to > StateStore KTABLE-SOURCE-STATE-STORE-0000000000 > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:69) > at > org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.init(KTableSourceValueGetterSupplier.java:45) > at > org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.init(KTableFilter.java:121) > at > org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:44) > at > org.apache.kafka.streams.processor.internals.ProcessorNode$2.run(ProcessorNode.java:53) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:111) > {noformat} > One can workaround by piping the result through a topic: > {noformat} > final KTable filteredKTable = > builder.table("table-topic").filter(...).through("TOPIC");; > builder.stream("stream-topic").join(filteredKTable,...); > {noformat} > ------------------------------------------------------------------------------------------------------------ > Note that there is another minor orthogonal issue of {{KTable#filter}} itself > that -- This message was sent by Atlassian JIRA (v6.4.14#64029)